micromegas_analytics/lakehouse/
thread_spans_view.rs

1use super::{
2    blocks_view::BlocksView,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    jit_partitions::{
5        JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
6    },
7    lakehouse_context::LakehouseContext,
8    partition_cache::PartitionCache,
9    partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10    view::{PartitionSpec, View, ViewMetadata},
11    view_factory::ViewMaker,
12};
13use crate::{
14    call_tree::make_call_tree,
15    lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
16    metadata::{find_process, find_stream},
17    response_writer::ResponseWriter,
18    span_table::{SpanRecordBuilder, get_spans_schema},
19    time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "thread_spans";
33const SCHEMA_VERSION: u8 = 0;
34lazy_static::lazy_static! {
35    static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
36    static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
37}
38
39/// A `ViewMaker` for creating `ThreadSpansView` instances.
40#[derive(Debug)]
41pub struct ThreadSpansViewMaker {}
42
43impl ViewMaker for ThreadSpansViewMaker {
44    fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
45        Ok(Arc::new(ThreadSpansView::new(stream_id)?))
46    }
47
48    fn get_schema_hash(&self) -> Vec<u8> {
49        vec![SCHEMA_VERSION]
50    }
51
52    fn get_schema(&self) -> Arc<Schema> {
53        Arc::new(get_spans_schema())
54    }
55}
56
57/// A view of thread spans.
58#[derive(Debug)]
59pub struct ThreadSpansView {
60    view_set_name: Arc<String>,
61    view_instance_id: Arc<String>,
62    stream_id: sqlx::types::Uuid,
63}
64
65impl ThreadSpansView {
66    pub fn new(view_instance_id: &str) -> Result<Self> {
67        if view_instance_id == "global" {
68            anyhow::bail!("the global view is not implemented for thread spans");
69        }
70
71        Ok(Self {
72            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73            view_instance_id: Arc::new(String::from(view_instance_id)),
74            stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
75        })
76    }
77}
78
79#[span_fn]
80async fn append_call_tree(
81    record_builder: &mut SpanRecordBuilder,
82    convert_ticks: &ConvertTicks,
83    blocks: &[BlockMetadata],
84    blob_storage: Arc<BlobStorage>,
85    stream: &crate::metadata::StreamMetadata,
86) -> Result<()> {
87    let call_tree = make_call_tree(
88        blocks,
89        convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
90        convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
91        None,
92        blob_storage,
93        convert_ticks.clone(),
94        stream,
95    )
96    .await
97    .with_context(|| "make_call_tree")?;
98    record_builder
99        .append_call_tree(&call_tree)
100        .with_context(|| "adding call tree to span record builder")?;
101    Ok(())
102}
103
104/// Writes a partition from a set of blocks.
105#[span_fn]
106async fn write_partition(
107    lake: Arc<DataLakeConnection>,
108    view_meta: ViewMetadata,
109    schema: Arc<Schema>,
110    convert_ticks: &ConvertTicks,
111    spec: &SourceDataBlocksInMemory,
112) -> Result<()> {
113    let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
114    info!("nb_events: {nb_events}");
115    if spec.blocks.is_empty() {
116        anyhow::bail!("empty partition spec");
117    }
118    // for jit partitions, we assume that the blocks were registered in order
119    // since they are built based on begin_ticks, not insert_time
120    let min_insert_time = spec.blocks[0].block.insert_time;
121    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
122
123    let (tx, rx) = tokio::sync::mpsc::channel(1);
124    let null_response_writer = Arc::new(ResponseWriter::new(None));
125    let join_handle = spawn_with_context(write_partition_from_rows(
126        lake.clone(),
127        view_meta,
128        schema,
129        TimeRange::new(min_insert_time, max_insert_time),
130        spec.block_ids_hash.clone(),
131        rx,
132        null_response_writer,
133    ));
134
135    let build_result: Result<PartitionRowSet> = async {
136        let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
137        let mut blocks_to_process = vec![];
138        let mut last_end = None;
139        for block in &spec.blocks {
140            if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
141                last_end = Some(block.block.end_ticks);
142                blocks_to_process.push(block.block.clone());
143            } else {
144                append_call_tree(
145                    &mut record_builder,
146                    convert_ticks,
147                    &blocks_to_process,
148                    lake.blob_storage.clone(),
149                    &block.stream,
150                )
151                .await?;
152                last_end = Some(block.block.end_ticks);
153                blocks_to_process = vec![block.block.clone()];
154            }
155        }
156        if !blocks_to_process.is_empty() {
157            append_call_tree(
158                &mut record_builder,
159                convert_ticks,
160                &blocks_to_process,
161                lake.blob_storage.clone(),
162                &spec.blocks[0].stream,
163            )
164            .await?;
165        }
166        let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
167        let max_time_row =
168            convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
169        let rows = record_builder
170            .finish()
171            .with_context(|| "record_builder.finish()")?;
172        info!("writing {} rows", rows.num_rows());
173        Ok(PartitionRowSet {
174            rows_time_range: TimeRange::new(min_time_row, max_time_row),
175            rows,
176        })
177    }
178    .await;
179
180    match build_result {
181        Ok(row_set) => {
182            tx.send(Ok(row_set)).await?;
183            drop(tx);
184            join_handle.await??;
185            Ok(())
186        }
187        Err(e) => {
188            warn!(
189                "aborting thread-spans partition write for block {:?}: {e:?}",
190                spec.block_ids_hash
191            );
192            let _ = tx
193                .send(Err(anyhow::anyhow!("thread-spans build aborted")))
194                .await;
195            drop(tx);
196            match join_handle.await {
197                Ok(Ok(())) => {}
198                Ok(Err(writer_err)) => {
199                    debug!("thread-spans writer task error during abort: {writer_err:?}");
200                }
201                Err(join_err) => {
202                    warn!("thread-spans writer task panicked during abort: {join_err:?}");
203                }
204            }
205            Err(e)
206        }
207    }
208}
209/// Rebuilds the partition if it's missing or out of date.
210#[span_fn]
211async fn update_partition(
212    lake: Arc<DataLakeConnection>,
213    view_meta: ViewMetadata,
214    schema: Arc<Schema>,
215    convert_ticks: &ConvertTicks,
216    spec: &SourceDataBlocksInMemory,
217) -> Result<()> {
218    if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
219        return Ok(());
220    }
221    write_partition(lake, view_meta, schema, convert_ticks, spec)
222        .await
223        .with_context(|| "write_partition")?;
224
225    Ok(())
226}
227
228#[async_trait]
229impl View for ThreadSpansView {
230    fn get_view_set_name(&self) -> Arc<String> {
231        self.view_set_name.clone()
232    }
233
234    fn get_view_instance_id(&self) -> Arc<String> {
235        self.view_instance_id.clone()
236    }
237
238    async fn make_batch_partition_spec(
239        &self,
240        _lakehouse: Arc<LakehouseContext>,
241        _existing_partitions: Arc<PartitionCache>,
242        _insert_range: TimeRange,
243    ) -> Result<Arc<dyn PartitionSpec>> {
244        anyhow::bail!("not implemented")
245    }
246
247    fn get_file_schema_hash(&self) -> Vec<u8> {
248        vec![SCHEMA_VERSION]
249    }
250
251    fn get_file_schema(&self) -> Arc<Schema> {
252        Arc::new(get_spans_schema())
253    }
254
255    #[span_fn]
256    async fn jit_update(
257        &self,
258        lakehouse: Arc<LakehouseContext>,
259        query_range: Option<TimeRange>,
260    ) -> Result<()> {
261        if query_range.is_none() {
262            anyhow::bail!("query range mandatory for thread spans view");
263        }
264        let query_range = query_range.unwrap();
265        let stream = Arc::new(
266            find_stream(&lakehouse.lake().db_pool, self.stream_id)
267                .await
268                .with_context(|| "find_stream")?,
269        );
270        let process = Arc::new(
271            find_process(&lakehouse.lake().db_pool, &stream.process_id)
272                .await
273                .with_context(|| "find_process")?,
274        );
275        let convert_ticks =
276            make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
277        let blocks_view = BlocksView::new()?;
278        let partitions = generate_stream_jit_partitions(
279            &JitPartitionConfig::default(),
280            lakehouse.clone(),
281            &blocks_view,
282            &query_range,
283            stream.clone(),
284            process.clone(),
285        )
286        .await
287        .with_context(|| "generate_stream_jit_partitions")?;
288        for part in &partitions {
289            update_partition(
290                lakehouse.lake().clone(),
291                ViewMetadata {
292                    view_set_name: self.get_view_set_name(),
293                    view_instance_id: self.get_view_instance_id(),
294                    file_schema_hash: self.get_file_schema_hash(),
295                },
296                self.get_file_schema(),
297                &convert_ticks,
298                part,
299            )
300            .await
301            .with_context(|| "update_partition")?;
302        }
303        Ok(())
304    }
305
306    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
307        Ok(vec![
308            Expr::BinaryExpr(BinaryExpr::new(
309                col("begin").into(),
310                Operator::LtEq,
311                Expr::Literal(datetime_to_scalar(end), None).into(),
312            )),
313            Expr::BinaryExpr(BinaryExpr::new(
314                col("end").into(),
315                Operator::GtEq,
316                Expr::Literal(datetime_to_scalar(begin), None).into(),
317            )),
318        ])
319    }
320
321    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
322        Arc::new(NamedColumnsTimeBounds::new(
323            MIN_TIME_COLUMN.clone(),
324            MAX_TIME_COLUMN.clone(),
325        ))
326    }
327
328    fn get_update_group(&self) -> Option<i32> {
329        None
330    }
331}