micromegas_analytics/lakehouse/
thread_spans_view.rs1use super::{
2 blocks_view::BlocksView,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 jit_partitions::{
5 JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
6 },
7 lakehouse_context::LakehouseContext,
8 partition_cache::PartitionCache,
9 partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10 view::{PartitionSpec, View, ViewMetadata},
11 view_factory::ViewMaker,
12};
13use crate::{
14 call_tree::make_call_tree,
15 lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
16 metadata::{find_process, find_stream},
17 response_writer::ResponseWriter,
18 span_table::{SpanRecordBuilder, get_spans_schema},
19 time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "thread_spans";
33const SCHEMA_VERSION: u8 = 0;
34lazy_static::lazy_static! {
35 static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
36 static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
37}
38
39#[derive(Debug)]
41pub struct ThreadSpansViewMaker {}
42
43impl ViewMaker for ThreadSpansViewMaker {
44 fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
45 Ok(Arc::new(ThreadSpansView::new(stream_id)?))
46 }
47
48 fn get_schema_hash(&self) -> Vec<u8> {
49 vec![SCHEMA_VERSION]
50 }
51
52 fn get_schema(&self) -> Arc<Schema> {
53 Arc::new(get_spans_schema())
54 }
55}
56
57#[derive(Debug)]
59pub struct ThreadSpansView {
60 view_set_name: Arc<String>,
61 view_instance_id: Arc<String>,
62 stream_id: sqlx::types::Uuid,
63}
64
65impl ThreadSpansView {
66 pub fn new(view_instance_id: &str) -> Result<Self> {
67 if view_instance_id == "global" {
68 anyhow::bail!("the global view is not implemented for thread spans");
69 }
70
71 Ok(Self {
72 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73 view_instance_id: Arc::new(String::from(view_instance_id)),
74 stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
75 })
76 }
77}
78
79#[span_fn]
80async fn append_call_tree(
81 record_builder: &mut SpanRecordBuilder,
82 convert_ticks: &ConvertTicks,
83 blocks: &[BlockMetadata],
84 blob_storage: Arc<BlobStorage>,
85 stream: &crate::metadata::StreamMetadata,
86) -> Result<()> {
87 let call_tree = make_call_tree(
88 blocks,
89 convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
90 convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
91 None,
92 blob_storage,
93 convert_ticks.clone(),
94 stream,
95 )
96 .await
97 .with_context(|| "make_call_tree")?;
98 record_builder
99 .append_call_tree(&call_tree)
100 .with_context(|| "adding call tree to span record builder")?;
101 Ok(())
102}
103
104#[span_fn]
106async fn write_partition(
107 lake: Arc<DataLakeConnection>,
108 view_meta: ViewMetadata,
109 schema: Arc<Schema>,
110 convert_ticks: &ConvertTicks,
111 spec: &SourceDataBlocksInMemory,
112) -> Result<()> {
113 let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
114 info!("nb_events: {nb_events}");
115 if spec.blocks.is_empty() {
116 anyhow::bail!("empty partition spec");
117 }
118 let min_insert_time = spec.blocks[0].block.insert_time;
121 let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
122
123 let (tx, rx) = tokio::sync::mpsc::channel(1);
124 let null_response_writer = Arc::new(ResponseWriter::new(None));
125 let join_handle = spawn_with_context(write_partition_from_rows(
126 lake.clone(),
127 view_meta,
128 schema,
129 TimeRange::new(min_insert_time, max_insert_time),
130 spec.block_ids_hash.clone(),
131 rx,
132 null_response_writer,
133 ));
134
135 let build_result: Result<PartitionRowSet> = async {
136 let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
137 let mut blocks_to_process = vec![];
138 let mut last_end = None;
139 for block in &spec.blocks {
140 if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
141 last_end = Some(block.block.end_ticks);
142 blocks_to_process.push(block.block.clone());
143 } else {
144 append_call_tree(
145 &mut record_builder,
146 convert_ticks,
147 &blocks_to_process,
148 lake.blob_storage.clone(),
149 &block.stream,
150 )
151 .await?;
152 last_end = Some(block.block.end_ticks);
153 blocks_to_process = vec![block.block.clone()];
154 }
155 }
156 if !blocks_to_process.is_empty() {
157 append_call_tree(
158 &mut record_builder,
159 convert_ticks,
160 &blocks_to_process,
161 lake.blob_storage.clone(),
162 &spec.blocks[0].stream,
163 )
164 .await?;
165 }
166 let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
167 let max_time_row =
168 convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
169 let rows = record_builder
170 .finish()
171 .with_context(|| "record_builder.finish()")?;
172 info!("writing {} rows", rows.num_rows());
173 Ok(PartitionRowSet {
174 rows_time_range: TimeRange::new(min_time_row, max_time_row),
175 rows,
176 })
177 }
178 .await;
179
180 match build_result {
181 Ok(row_set) => {
182 tx.send(Ok(row_set)).await?;
183 drop(tx);
184 join_handle.await??;
185 Ok(())
186 }
187 Err(e) => {
188 warn!(
189 "aborting thread-spans partition write for block {:?}: {e:?}",
190 spec.block_ids_hash
191 );
192 let _ = tx
193 .send(Err(anyhow::anyhow!("thread-spans build aborted")))
194 .await;
195 drop(tx);
196 match join_handle.await {
197 Ok(Ok(())) => {}
198 Ok(Err(writer_err)) => {
199 debug!("thread-spans writer task error during abort: {writer_err:?}");
200 }
201 Err(join_err) => {
202 warn!("thread-spans writer task panicked during abort: {join_err:?}");
203 }
204 }
205 Err(e)
206 }
207 }
208}
209#[span_fn]
211async fn update_partition(
212 lake: Arc<DataLakeConnection>,
213 view_meta: ViewMetadata,
214 schema: Arc<Schema>,
215 convert_ticks: &ConvertTicks,
216 spec: &SourceDataBlocksInMemory,
217) -> Result<()> {
218 if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
219 return Ok(());
220 }
221 write_partition(lake, view_meta, schema, convert_ticks, spec)
222 .await
223 .with_context(|| "write_partition")?;
224
225 Ok(())
226}
227
228#[async_trait]
229impl View for ThreadSpansView {
230 fn get_view_set_name(&self) -> Arc<String> {
231 self.view_set_name.clone()
232 }
233
234 fn get_view_instance_id(&self) -> Arc<String> {
235 self.view_instance_id.clone()
236 }
237
238 async fn make_batch_partition_spec(
239 &self,
240 _lakehouse: Arc<LakehouseContext>,
241 _existing_partitions: Arc<PartitionCache>,
242 _insert_range: TimeRange,
243 ) -> Result<Arc<dyn PartitionSpec>> {
244 anyhow::bail!("not implemented")
245 }
246
247 fn get_file_schema_hash(&self) -> Vec<u8> {
248 vec![SCHEMA_VERSION]
249 }
250
251 fn get_file_schema(&self) -> Arc<Schema> {
252 Arc::new(get_spans_schema())
253 }
254
255 #[span_fn]
256 async fn jit_update(
257 &self,
258 lakehouse: Arc<LakehouseContext>,
259 query_range: Option<TimeRange>,
260 ) -> Result<()> {
261 if query_range.is_none() {
262 anyhow::bail!("query range mandatory for thread spans view");
263 }
264 let query_range = query_range.unwrap();
265 let stream = Arc::new(
266 find_stream(&lakehouse.lake().db_pool, self.stream_id)
267 .await
268 .with_context(|| "find_stream")?,
269 );
270 let process = Arc::new(
271 find_process(&lakehouse.lake().db_pool, &stream.process_id)
272 .await
273 .with_context(|| "find_process")?,
274 );
275 let convert_ticks =
276 make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
277 let blocks_view = BlocksView::new()?;
278 let partitions = generate_stream_jit_partitions(
279 &JitPartitionConfig::default(),
280 lakehouse.clone(),
281 &blocks_view,
282 &query_range,
283 stream.clone(),
284 process.clone(),
285 )
286 .await
287 .with_context(|| "generate_stream_jit_partitions")?;
288 for part in &partitions {
289 update_partition(
290 lakehouse.lake().clone(),
291 ViewMetadata {
292 view_set_name: self.get_view_set_name(),
293 view_instance_id: self.get_view_instance_id(),
294 file_schema_hash: self.get_file_schema_hash(),
295 },
296 self.get_file_schema(),
297 &convert_ticks,
298 part,
299 )
300 .await
301 .with_context(|| "update_partition")?;
302 }
303 Ok(())
304 }
305
306 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
307 Ok(vec![
308 Expr::BinaryExpr(BinaryExpr::new(
309 col("begin").into(),
310 Operator::LtEq,
311 Expr::Literal(datetime_to_scalar(end), None).into(),
312 )),
313 Expr::BinaryExpr(BinaryExpr::new(
314 col("end").into(),
315 Operator::GtEq,
316 Expr::Literal(datetime_to_scalar(begin), None).into(),
317 )),
318 ])
319 }
320
321 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
322 Arc::new(NamedColumnsTimeBounds::new(
323 MIN_TIME_COLUMN.clone(),
324 MAX_TIME_COLUMN.clone(),
325 ))
326 }
327
328 fn get_update_group(&self) -> Option<i32> {
329 None
330 }
331}