micromegas_analytics/lakehouse/
thread_spans_view.rs1use super::{
2 blocks_view::BlocksView,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 jit_partitions::{
5 JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
6 },
7 lakehouse_context::LakehouseContext,
8 partition_cache::PartitionCache,
9 partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10 view::{PartitionSpec, View, ViewMetadata},
11 view_factory::ViewMaker,
12};
13use crate::{
14 call_tree::make_call_tree,
15 lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
16 metadata::{find_process, find_stream},
17 response_writer::ResponseWriter,
18 span_table::{SpanRecordBuilder, get_spans_schema},
19 time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "thread_spans";
33const SCHEMA_VERSION: u8 = 0;
34lazy_static::lazy_static! {
35 static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
36 static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
37}
38
39#[derive(Debug)]
41pub struct ThreadSpansViewMaker {}
42
43impl ViewMaker for ThreadSpansViewMaker {
44 fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
45 Ok(Arc::new(ThreadSpansView::new(stream_id)?))
46 }
47
48 fn get_schema_hash(&self) -> Vec<u8> {
49 vec![SCHEMA_VERSION]
50 }
51
52 fn get_schema(&self) -> Arc<Schema> {
53 Arc::new(get_spans_schema())
54 }
55}
56
57#[derive(Debug)]
59pub struct ThreadSpansView {
60 view_set_name: Arc<String>,
61 view_instance_id: Arc<String>,
62 stream_id: sqlx::types::Uuid,
63}
64
65impl ThreadSpansView {
66 pub fn new(view_instance_id: &str) -> Result<Self> {
67 if view_instance_id == "global" {
68 anyhow::bail!("the global view is not implemented for thread spans");
69 }
70
71 Ok(Self {
72 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73 view_instance_id: Arc::new(String::from(view_instance_id)),
74 stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
75 })
76 }
77}
78
79#[span_fn]
80async fn append_call_tree(
81 record_builder: &mut SpanRecordBuilder,
82 convert_ticks: &ConvertTicks,
83 blocks: &[BlockMetadata],
84 blob_storage: Arc<BlobStorage>,
85 stream: &crate::metadata::StreamMetadata,
86) -> Result<()> {
87 let call_tree = make_call_tree(
88 blocks,
89 convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
90 convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
91 None,
92 blob_storage,
93 convert_ticks.clone(),
94 stream,
95 )
96 .await
97 .with_context(|| "make_call_tree")?;
98 record_builder
99 .append_call_tree(&call_tree)
100 .with_context(|| "adding call tree to span record builder")?;
101 Ok(())
102}
103
104#[span_fn]
106async fn write_partition(
107 lake: Arc<DataLakeConnection>,
108 view_meta: ViewMetadata,
109 schema: Arc<Schema>,
110 convert_ticks: &ConvertTicks,
111 spec: &SourceDataBlocksInMemory,
112) -> Result<()> {
113 let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
114 info!("nb_events: {nb_events}");
115 let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
116 let mut blocks_to_process = vec![];
117 let mut last_end = None;
118 if spec.blocks.is_empty() {
119 anyhow::bail!("empty partition spec");
120 }
121 let min_insert_time = spec.blocks[0].block.insert_time;
124 let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
125
126 let (tx, rx) = tokio::sync::mpsc::channel(1);
127 let null_response_writer = Arc::new(ResponseWriter::new(None));
128 let join_handle = spawn_with_context(write_partition_from_rows(
129 lake.clone(),
130 view_meta,
131 schema,
132 TimeRange::new(min_insert_time, max_insert_time),
133 spec.block_ids_hash.clone(),
134 rx,
135 null_response_writer,
136 ));
137
138 for block in &spec.blocks {
139 if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
140 last_end = Some(block.block.end_ticks);
141 blocks_to_process.push(block.block.clone());
142 } else {
143 append_call_tree(
144 &mut record_builder,
145 convert_ticks,
146 &blocks_to_process,
147 lake.blob_storage.clone(),
148 &block.stream,
149 )
150 .await?;
151 last_end = Some(block.block.end_ticks);
152 blocks_to_process = vec![block.block.clone()];
153 }
154 }
155 if !blocks_to_process.is_empty() {
156 append_call_tree(
157 &mut record_builder,
158 convert_ticks,
159 &blocks_to_process,
160 lake.blob_storage.clone(),
161 &spec.blocks[0].stream,
162 )
163 .await?;
164 }
165 let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
166 let max_time_row =
167 convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
168 let rows = record_builder
169 .finish()
170 .with_context(|| "record_builder.finish()")?;
171 info!("writing {} rows", rows.num_rows());
172 tx.send(PartitionRowSet {
173 rows_time_range: TimeRange::new(min_time_row, max_time_row),
174 rows,
175 })
176 .await?;
177 drop(tx);
178 join_handle.await??;
179 Ok(())
180}
181#[span_fn]
183async fn update_partition(
184 lake: Arc<DataLakeConnection>,
185 view_meta: ViewMetadata,
186 schema: Arc<Schema>,
187 convert_ticks: &ConvertTicks,
188 spec: &SourceDataBlocksInMemory,
189) -> Result<()> {
190 if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
191 return Ok(());
192 }
193 write_partition(lake, view_meta, schema, convert_ticks, spec)
194 .await
195 .with_context(|| "write_partition")?;
196
197 Ok(())
198}
199
200#[async_trait]
201impl View for ThreadSpansView {
202 fn get_view_set_name(&self) -> Arc<String> {
203 self.view_set_name.clone()
204 }
205
206 fn get_view_instance_id(&self) -> Arc<String> {
207 self.view_instance_id.clone()
208 }
209
210 async fn make_batch_partition_spec(
211 &self,
212 _lakehouse: Arc<LakehouseContext>,
213 _existing_partitions: Arc<PartitionCache>,
214 _insert_range: TimeRange,
215 ) -> Result<Arc<dyn PartitionSpec>> {
216 anyhow::bail!("not implemented")
217 }
218
219 fn get_file_schema_hash(&self) -> Vec<u8> {
220 vec![SCHEMA_VERSION]
221 }
222
223 fn get_file_schema(&self) -> Arc<Schema> {
224 Arc::new(get_spans_schema())
225 }
226
227 #[span_fn]
228 async fn jit_update(
229 &self,
230 lakehouse: Arc<LakehouseContext>,
231 query_range: Option<TimeRange>,
232 ) -> Result<()> {
233 if query_range.is_none() {
234 anyhow::bail!("query range mandatory for thread spans view");
235 }
236 let query_range = query_range.unwrap();
237 let stream = Arc::new(
238 find_stream(&lakehouse.lake().db_pool, self.stream_id)
239 .await
240 .with_context(|| "find_stream")?,
241 );
242 let process = Arc::new(
243 find_process(&lakehouse.lake().db_pool, &stream.process_id)
244 .await
245 .with_context(|| "find_process")?,
246 );
247 let convert_ticks =
248 make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
249 let blocks_view = BlocksView::new()?;
250 let partitions = generate_stream_jit_partitions(
251 &JitPartitionConfig::default(),
252 lakehouse.clone(),
253 &blocks_view,
254 &query_range,
255 stream.clone(),
256 process.clone(),
257 )
258 .await
259 .with_context(|| "generate_stream_jit_partitions")?;
260 for part in &partitions {
261 update_partition(
262 lakehouse.lake().clone(),
263 ViewMetadata {
264 view_set_name: self.get_view_set_name(),
265 view_instance_id: self.get_view_instance_id(),
266 file_schema_hash: self.get_file_schema_hash(),
267 },
268 self.get_file_schema(),
269 &convert_ticks,
270 part,
271 )
272 .await
273 .with_context(|| "update_partition")?;
274 }
275 Ok(())
276 }
277
278 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
279 Ok(vec![
280 Expr::BinaryExpr(BinaryExpr::new(
281 col("begin").into(),
282 Operator::LtEq,
283 Expr::Literal(datetime_to_scalar(end), None).into(),
284 )),
285 Expr::BinaryExpr(BinaryExpr::new(
286 col("end").into(),
287 Operator::GtEq,
288 Expr::Literal(datetime_to_scalar(begin), None).into(),
289 )),
290 ])
291 }
292
293 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
294 Arc::new(NamedColumnsTimeBounds::new(
295 MIN_TIME_COLUMN.clone(),
296 MAX_TIME_COLUMN.clone(),
297 ))
298 }
299
300 fn get_update_group(&self) -> Option<i32> {
301 None
302 }
303}