micromegas_analytics/lakehouse/
net_spans_view.rs

1use super::{
2    blocks_view::BlocksView,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    jit_partitions::{
5        JitPartitionConfig, generate_process_jit_partitions, is_jit_partition_up_to_date,
6    },
7    lakehouse_context::LakehouseContext,
8    partition_cache::PartitionCache,
9    partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10    view::{PartitionSpec, View, ViewMetadata},
11    view_factory::{ViewFactory, ViewMaker},
12};
13use crate::{
14    lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
15    metadata::{StreamMetadata, find_process_with_latest_timing},
16    net_span_tree::make_net_span_tree,
17    net_spans_table::{NetSpanRecordBuilder, net_spans_table_schema},
18    response_writer::ResponseWriter,
19    time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_latest_timing},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "net_spans";
33const SCHEMA_VERSION: u8 = 0;
34const NET_STREAM_TAG: &str = "net";
35
36lazy_static::lazy_static! {
37    static ref BEGIN_TIME_COLUMN: Arc<String> = Arc::new(String::from("begin_time"));
38    static ref END_TIME_COLUMN: Arc<String> = Arc::new(String::from("end_time"));
39}
40
41/// A `ViewMaker` for creating `NetSpansView` instances.
42#[derive(Debug)]
43pub struct NetSpansViewMaker {
44    view_factory: Arc<ViewFactory>,
45}
46
47impl NetSpansViewMaker {
48    pub fn new(view_factory: Arc<ViewFactory>) -> Self {
49        Self { view_factory }
50    }
51}
52
53impl ViewMaker for NetSpansViewMaker {
54    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
55        Ok(Arc::new(NetSpansView::new(
56            view_instance_id,
57            self.view_factory.clone(),
58        )?))
59    }
60
61    fn get_schema_hash(&self) -> Vec<u8> {
62        vec![SCHEMA_VERSION]
63    }
64
65    fn get_schema(&self) -> Arc<Schema> {
66        Arc::new(net_spans_table_schema())
67    }
68}
69
70/// A view of network bandwidth spans (Connection / Object / Property / RPC).
71#[derive(Debug)]
72pub struct NetSpansView {
73    view_set_name: Arc<String>,
74    view_instance_id: Arc<String>,
75    process_id: sqlx::types::Uuid,
76    view_factory: Arc<ViewFactory>,
77}
78
79impl NetSpansView {
80    pub fn new(view_instance_id: &str, view_factory: Arc<ViewFactory>) -> Result<Self> {
81        if view_instance_id == "global" {
82            anyhow::bail!("NetSpansView does not support global view access");
83        }
84        let process_id = Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?;
85        Ok(Self {
86            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
87            view_instance_id: Arc::new(view_instance_id.into()),
88            process_id,
89            view_factory,
90        })
91    }
92}
93
94#[span_fn]
95async fn append_net_span_tree(
96    record_builder: &mut NetSpanRecordBuilder,
97    convert_ticks: &ConvertTicks,
98    blocks: &[BlockMetadata],
99    blob_storage: Arc<BlobStorage>,
100    stream: &StreamMetadata,
101    process_id: Arc<String>,
102) -> Result<()> {
103    make_net_span_tree(
104        blocks,
105        record_builder,
106        blob_storage,
107        stream,
108        process_id,
109        convert_ticks.clone(),
110    )
111    .await
112    .with_context(|| "make_net_span_tree")
113}
114
115/// Writes a partition from a set of blocks.
116#[span_fn]
117async fn write_partition(
118    lake: Arc<DataLakeConnection>,
119    view_meta: ViewMetadata,
120    schema: Arc<Schema>,
121    convert_ticks: &ConvertTicks,
122    spec: &SourceDataBlocksInMemory,
123    process_id: Arc<String>,
124) -> Result<()> {
125    let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
126    info!("nb_events: {nb_events}");
127    if spec.blocks.is_empty() {
128        anyhow::bail!("empty partition spec");
129    }
130    let min_insert_time = spec.blocks[0].block.insert_time;
131    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
132
133    let (tx, rx) = tokio::sync::mpsc::channel(1);
134    let null_response_writer = Arc::new(ResponseWriter::new(None));
135    let join_handle = spawn_with_context(write_partition_from_rows(
136        lake.clone(),
137        view_meta,
138        schema,
139        TimeRange::new(min_insert_time, max_insert_time),
140        spec.block_ids_hash.clone(),
141        rx,
142        null_response_writer,
143    ));
144
145    let build_result: Result<Option<PartitionRowSet>> = async {
146        let mut record_builder = NetSpanRecordBuilder::with_capacity(nb_events / 2);
147        let stream = spec.blocks[0].stream.clone();
148        for b in &spec.blocks {
149            anyhow::ensure!(
150                b.stream.stream_id == stream.stream_id,
151                "net_spans partition contains multiple streams ({} and {}); expected one per process",
152                stream.stream_id,
153                b.stream.stream_id,
154            );
155        }
156        let mut blocks_to_process: Vec<BlockMetadata> = vec![];
157        let mut last_end: Option<i64> = None;
158        for block in &spec.blocks {
159            let contiguous = last_end
160                .map(|e| block.block.begin_ticks == e)
161                .unwrap_or(true);
162            if !contiguous {
163                append_net_span_tree(
164                    &mut record_builder,
165                    convert_ticks,
166                    &blocks_to_process,
167                    lake.blob_storage.clone(),
168                    &stream,
169                    process_id.clone(),
170                )
171                .await?;
172                blocks_to_process = vec![];
173            }
174            blocks_to_process.push(block.block.clone());
175            last_end = Some(block.block.end_ticks);
176        }
177        if !blocks_to_process.is_empty() {
178            append_net_span_tree(
179                &mut record_builder,
180                convert_ticks,
181                &blocks_to_process,
182                lake.blob_storage.clone(),
183                &stream,
184                process_id.clone(),
185            )
186            .await?;
187        }
188        let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
189        let max_time_row =
190            convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
191        let rows_time_range = record_builder
192            .get_time_range()
193            .unwrap_or(TimeRange::new(min_time_row, max_time_row));
194        let nb_rows = record_builder.len();
195        let rows = record_builder
196            .finish()
197            .with_context(|| "record_builder.finish()")?;
198        info!("writing {nb_rows} rows");
199        if nb_rows > 0 {
200            Ok(Some(PartitionRowSet { rows_time_range, rows }))
201        } else {
202            Ok(None)
203        }
204    }
205    .await;
206
207    match build_result {
208        Ok(Some(row_set)) => {
209            tx.send(Ok(row_set)).await?;
210            drop(tx);
211            join_handle.await??;
212            Ok(())
213        }
214        Ok(None) => {
215            drop(tx);
216            join_handle.await??;
217            Ok(())
218        }
219        Err(e) => {
220            warn!(
221                "aborting net-spans partition write for block {:?}: {e:?}",
222                spec.block_ids_hash
223            );
224            let _ = tx
225                .send(Err(anyhow::anyhow!("net-spans build aborted")))
226                .await;
227            drop(tx);
228            match join_handle.await {
229                Ok(Ok(())) => {}
230                Ok(Err(writer_err)) => {
231                    debug!("net-spans writer task error during abort: {writer_err:?}");
232                }
233                Err(join_err) => {
234                    warn!("net-spans writer task panicked during abort: {join_err:?}");
235                }
236            }
237            Err(e)
238        }
239    }
240}
241
242/// Rebuilds the partition if it's missing or out of date.
243#[span_fn]
244async fn update_partition(
245    lake: Arc<DataLakeConnection>,
246    view_meta: ViewMetadata,
247    schema: Arc<Schema>,
248    convert_ticks: &ConvertTicks,
249    spec: &SourceDataBlocksInMemory,
250    process_id: Arc<String>,
251) -> Result<()> {
252    if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
253        return Ok(());
254    }
255    write_partition(lake, view_meta, schema, convert_ticks, spec, process_id)
256        .await
257        .with_context(|| "write_partition")?;
258    Ok(())
259}
260
261#[async_trait]
262impl View for NetSpansView {
263    fn get_view_set_name(&self) -> Arc<String> {
264        self.view_set_name.clone()
265    }
266
267    fn get_view_instance_id(&self) -> Arc<String> {
268        self.view_instance_id.clone()
269    }
270
271    async fn make_batch_partition_spec(
272        &self,
273        _lakehouse: Arc<LakehouseContext>,
274        _existing_partitions: Arc<PartitionCache>,
275        _insert_range: TimeRange,
276    ) -> Result<Arc<dyn PartitionSpec>> {
277        anyhow::bail!("NetSpansView does not support batch partition specs")
278    }
279
280    fn get_file_schema_hash(&self) -> Vec<u8> {
281        vec![SCHEMA_VERSION]
282    }
283
284    fn get_file_schema(&self) -> Arc<Schema> {
285        Arc::new(net_spans_table_schema())
286    }
287
288    #[span_fn]
289    async fn jit_update(
290        &self,
291        lakehouse: Arc<LakehouseContext>,
292        query_range: Option<TimeRange>,
293    ) -> Result<()> {
294        let (process, last_block_end_ticks, last_block_end_time) = find_process_with_latest_timing(
295            lakehouse.clone(),
296            self.view_factory.clone(),
297            &self.process_id,
298            query_range,
299        )
300        .await
301        .with_context(|| "find_process_with_latest_timing")?;
302
303        let process = Arc::new(process);
304        let query_range =
305            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, last_block_end_time));
306
307        let convert_ticks = make_time_converter_from_latest_timing(
308            &process,
309            last_block_end_ticks,
310            last_block_end_time,
311        )
312        .with_context(|| "make_time_converter_from_latest_timing")?;
313
314        let blocks_view = BlocksView::new()?;
315        let all_partitions = generate_process_jit_partitions(
316            &JitPartitionConfig::default(),
317            lakehouse.clone(),
318            &blocks_view,
319            &query_range,
320            process.clone(),
321            NET_STREAM_TAG,
322        )
323        .await
324        .with_context(|| "generate_process_jit_partitions")?;
325
326        let process_id_str = Arc::new(self.process_id.to_string());
327        for part in &all_partitions {
328            update_partition(
329                lakehouse.lake().clone(),
330                ViewMetadata {
331                    view_set_name: self.get_view_set_name(),
332                    view_instance_id: self.get_view_instance_id(),
333                    file_schema_hash: self.get_file_schema_hash(),
334                },
335                self.get_file_schema(),
336                &convert_ticks,
337                part,
338                process_id_str.clone(),
339            )
340            .await
341            .with_context(|| "update_partition")?;
342        }
343        Ok(())
344    }
345
346    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
347        Ok(vec![
348            Expr::BinaryExpr(BinaryExpr::new(
349                col("begin_time").into(),
350                Operator::LtEq,
351                Expr::Literal(datetime_to_scalar(end), None).into(),
352            )),
353            Expr::BinaryExpr(BinaryExpr::new(
354                col("end_time").into(),
355                Operator::GtEq,
356                Expr::Literal(datetime_to_scalar(begin), None).into(),
357            )),
358        ])
359    }
360
361    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
362        Arc::new(NamedColumnsTimeBounds::new(
363            BEGIN_TIME_COLUMN.clone(),
364            END_TIME_COLUMN.clone(),
365        ))
366    }
367
368    fn get_update_group(&self) -> Option<i32> {
369        None
370    }
371}