micromegas_analytics/lakehouse/
net_spans_view.rs1use super::{
2 blocks_view::BlocksView,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 jit_partitions::{
5 JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
6 },
7 lakehouse_context::LakehouseContext,
8 partition_cache::PartitionCache,
9 partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10 view::{PartitionSpec, View, ViewMetadata},
11 view_factory::{ViewFactory, ViewMaker},
12};
13use crate::{
14 lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
15 metadata::{StreamMetadata, find_process_with_latest_timing},
16 net_span_tree::make_net_span_tree,
17 net_spans_table::{NetSpanRecordBuilder, net_spans_table_schema},
18 response_writer::ResponseWriter,
19 time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_latest_timing},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "net_spans";
33const SCHEMA_VERSION: u8 = 0;
34const NET_STREAM_TAG: &str = "net";
35
36lazy_static::lazy_static! {
37 static ref BEGIN_TIME_COLUMN: Arc<String> = Arc::new(String::from("begin_time"));
38 static ref END_TIME_COLUMN: Arc<String> = Arc::new(String::from("end_time"));
39}
40
41#[derive(Debug)]
43pub struct NetSpansViewMaker {
44 view_factory: Arc<ViewFactory>,
45}
46
47impl NetSpansViewMaker {
48 pub fn new(view_factory: Arc<ViewFactory>) -> Self {
49 Self { view_factory }
50 }
51}
52
53impl ViewMaker for NetSpansViewMaker {
54 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
55 Ok(Arc::new(NetSpansView::new(
56 view_instance_id,
57 self.view_factory.clone(),
58 )?))
59 }
60
61 fn get_schema_hash(&self) -> Vec<u8> {
62 vec![SCHEMA_VERSION]
63 }
64
65 fn get_schema(&self) -> Arc<Schema> {
66 Arc::new(net_spans_table_schema())
67 }
68}
69
70#[derive(Debug)]
72pub struct NetSpansView {
73 view_set_name: Arc<String>,
74 view_instance_id: Arc<String>,
75 process_id: sqlx::types::Uuid,
76 view_factory: Arc<ViewFactory>,
77}
78
79impl NetSpansView {
80 pub fn new(view_instance_id: &str, view_factory: Arc<ViewFactory>) -> Result<Self> {
81 if view_instance_id == "global" {
82 anyhow::bail!("NetSpansView does not support global view access");
83 }
84 let process_id = Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?;
85 Ok(Self {
86 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
87 view_instance_id: Arc::new(view_instance_id.into()),
88 process_id,
89 view_factory,
90 })
91 }
92}
93
94#[span_fn]
95async fn append_net_span_tree(
96 record_builder: &mut NetSpanRecordBuilder,
97 convert_ticks: &ConvertTicks,
98 blocks: &[BlockMetadata],
99 blob_storage: Arc<BlobStorage>,
100 stream: &StreamMetadata,
101 process_id: Arc<String>,
102) -> Result<()> {
103 make_net_span_tree(
104 blocks,
105 record_builder,
106 blob_storage,
107 stream,
108 process_id,
109 convert_ticks.clone(),
110 )
111 .await
112 .with_context(|| "make_net_span_tree")
113}
114
115#[span_fn]
117async fn write_partition(
118 lake: Arc<DataLakeConnection>,
119 view_meta: ViewMetadata,
120 schema: Arc<Schema>,
121 convert_ticks: &ConvertTicks,
122 spec: &SourceDataBlocksInMemory,
123 process_id: Arc<String>,
124) -> Result<()> {
125 let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
126 info!("nb_events: {nb_events}");
127 if spec.blocks.is_empty() {
128 anyhow::bail!("empty partition spec");
129 }
130 let min_insert_time = spec.blocks[0].block.insert_time;
131 let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
132
133 let (tx, rx) = tokio::sync::mpsc::channel(1);
134 let null_response_writer = Arc::new(ResponseWriter::new(None));
135 let join_handle = spawn_with_context(write_partition_from_rows(
136 lake.clone(),
137 view_meta,
138 schema,
139 TimeRange::new(min_insert_time, max_insert_time),
140 spec.block_ids_hash.clone(),
141 rx,
142 null_response_writer,
143 ));
144
145 let build_result: Result<Option<PartitionRowSet>> = async {
146 let mut record_builder = NetSpanRecordBuilder::with_capacity(nb_events / 2);
147 let stream = spec.blocks[0].stream.clone();
148 for b in &spec.blocks {
149 anyhow::ensure!(
150 b.stream.stream_id == stream.stream_id,
151 "net_spans partition contains multiple streams ({} and {}); expected one per process",
152 stream.stream_id,
153 b.stream.stream_id,
154 );
155 }
156 let mut blocks_to_process: Vec<BlockMetadata> = vec![];
157 let mut last_end: Option<i64> = None;
158 for block in &spec.blocks {
159 let contiguous = last_end
160 .map(|e| block.block.begin_ticks == e)
161 .unwrap_or(true);
162 if !contiguous {
163 append_net_span_tree(
164 &mut record_builder,
165 convert_ticks,
166 &blocks_to_process,
167 lake.blob_storage.clone(),
168 &stream,
169 process_id.clone(),
170 )
171 .await?;
172 blocks_to_process = vec![];
173 }
174 blocks_to_process.push(block.block.clone());
175 last_end = Some(block.block.end_ticks);
176 }
177 if !blocks_to_process.is_empty() {
178 append_net_span_tree(
179 &mut record_builder,
180 convert_ticks,
181 &blocks_to_process,
182 lake.blob_storage.clone(),
183 &stream,
184 process_id.clone(),
185 )
186 .await?;
187 }
188 let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
189 let max_time_row =
190 convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
191 let rows_time_range = record_builder
192 .get_time_range()
193 .unwrap_or(TimeRange::new(min_time_row, max_time_row));
194 let nb_rows = record_builder.len();
195 let rows = record_builder
196 .finish()
197 .with_context(|| "record_builder.finish()")?;
198 info!("writing {nb_rows} rows");
199 if nb_rows > 0 {
200 Ok(Some(PartitionRowSet { rows_time_range, rows }))
201 } else {
202 Ok(None)
203 }
204 }
205 .await;
206
207 match build_result {
208 Ok(Some(row_set)) => {
209 tx.send(Ok(row_set)).await?;
210 drop(tx);
211 join_handle.await??;
212 Ok(())
213 }
214 Ok(None) => {
215 drop(tx);
216 join_handle.await??;
217 Ok(())
218 }
219 Err(e) => {
220 warn!(
221 "aborting net-spans partition write for block {:?}: {e:?}",
222 spec.block_ids_hash
223 );
224 let _ = tx
225 .send(Err(anyhow::anyhow!("net-spans build aborted")))
226 .await;
227 drop(tx);
228 match join_handle.await {
229 Ok(Ok(())) => {}
230 Ok(Err(writer_err)) => {
231 debug!("net-spans writer task error during abort: {writer_err:?}");
232 }
233 Err(join_err) => {
234 warn!("net-spans writer task panicked during abort: {join_err:?}");
235 }
236 }
237 Err(e)
238 }
239 }
240}
241
242#[span_fn]
244async fn update_partition(
245 lake: Arc<DataLakeConnection>,
246 view_meta: ViewMetadata,
247 schema: Arc<Schema>,
248 convert_ticks: &ConvertTicks,
249 spec: &SourceDataBlocksInMemory,
250 process_id: Arc<String>,
251) -> Result<()> {
252 if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
253 return Ok(());
254 }
255 write_partition(lake, view_meta, schema, convert_ticks, spec, process_id)
256 .await
257 .with_context(|| "write_partition")?;
258 Ok(())
259}
260
261#[async_trait]
262impl View for NetSpansView {
263 fn get_view_set_name(&self) -> Arc<String> {
264 self.view_set_name.clone()
265 }
266
267 fn get_view_instance_id(&self) -> Arc<String> {
268 self.view_instance_id.clone()
269 }
270
271 async fn make_batch_partition_spec(
272 &self,
273 _lakehouse: Arc<LakehouseContext>,
274 _existing_partitions: Arc<PartitionCache>,
275 _insert_range: TimeRange,
276 ) -> Result<Arc<dyn PartitionSpec>> {
277 anyhow::bail!("NetSpansView does not support batch partition specs")
278 }
279
280 fn get_file_schema_hash(&self) -> Vec<u8> {
281 vec![SCHEMA_VERSION]
282 }
283
284 fn get_file_schema(&self) -> Arc<Schema> {
285 Arc::new(net_spans_table_schema())
286 }
287
288 #[span_fn]
289 async fn jit_update(
290 &self,
291 lakehouse: Arc<LakehouseContext>,
292 query_range: Option<TimeRange>,
293 ) -> Result<()> {
294 let (process, last_block_end_ticks, last_block_end_time) = find_process_with_latest_timing(
295 lakehouse.clone(),
296 self.view_factory.clone(),
297 &self.process_id,
298 query_range,
299 )
300 .await
301 .with_context(|| "find_process_with_latest_timing")?;
302
303 let process = Arc::new(process);
304 let query_range =
305 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, last_block_end_time));
306
307 let convert_ticks = make_time_converter_from_latest_timing(
308 &process,
309 last_block_end_ticks,
310 last_block_end_time,
311 )
312 .with_context(|| "make_time_converter_from_latest_timing")?;
313
314 let blocks_view = BlocksView::new()?;
315 let all_partitions = generate_process_jit_partitions(
316 &JitPartitionConfig::default(),
317 lakehouse.clone(),
318 &blocks_view,
319 &query_range,
320 process.clone(),
321 NET_STREAM_TAG,
322 )
323 .await
324 .with_context(|| "generate_process_jit_partitions")?;
325
326 let process_id_str = Arc::new(self.process_id.to_string());
327 for part in &all_partitions {
328 update_partition(
329 lakehouse.lake().clone(),
330 ViewMetadata {
331 view_set_name: self.get_view_set_name(),
332 view_instance_id: self.get_view_instance_id(),
333 file_schema_hash: self.get_file_schema_hash(),
334 },
335 self.get_file_schema(),
336 &convert_ticks,
337 part,
338 process_id_str.clone(),
339 )
340 .await
341 .with_context(|| "update_partition")?;
342 }
343 Ok(())
344 }
345
346 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
347 Ok(vec![
348 Expr::BinaryExpr(BinaryExpr::new(
349 col("begin_time").into(),
350 Operator::LtEq,
351 Expr::Literal(datetime_to_scalar(end), None).into(),
352 )),
353 Expr::BinaryExpr(BinaryExpr::new(
354 col("end_time").into(),
355 Operator::GtEq,
356 Expr::Literal(datetime_to_scalar(begin), None).into(),
357 )),
358 ])
359 }
360
361 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
362 Arc::new(NamedColumnsTimeBounds::new(
363 BEGIN_TIME_COLUMN.clone(),
364 END_TIME_COLUMN.clone(),
365 ))
366 }
367
368 fn get_update_group(&self) -> Option<i32> {
369 None
370 }
371}