micromegas_analytics/lakehouse/
thread_spans_view.rs

1use super::{
2    blocks_view::BlocksView,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    jit_partitions::{
5        JitPartitionConfig, generate_stream_jit_partitions, is_jit_partition_up_to_date,
6    },
7    lakehouse_context::LakehouseContext,
8    partition_cache::PartitionCache,
9    partition_source_data::{SourceDataBlocksInMemory, hash_to_object_count},
10    view::{PartitionSpec, View, ViewMetadata},
11    view_factory::ViewMaker,
12};
13use crate::{
14    call_tree::make_call_tree,
15    lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
16    metadata::{find_process, find_stream},
17    response_writer::ResponseWriter,
18    span_table::{SpanRecordBuilder, get_spans_schema},
19    time::{ConvertTicks, TimeRange, datetime_to_scalar, make_time_converter_from_db},
20};
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
25use datafusion::{arrow::datatypes::Schema, logical_expr::expr_fn::col};
26use micromegas_ingestion::data_lake_connection::DataLakeConnection;
27use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
28use micromegas_tracing::prelude::*;
29use std::sync::Arc;
30use uuid::Uuid;
31
32const VIEW_SET_NAME: &str = "thread_spans";
33const SCHEMA_VERSION: u8 = 0;
34lazy_static::lazy_static! {
35    static ref MIN_TIME_COLUMN: Arc<String> = Arc::new( String::from("begin"));
36    static ref MAX_TIME_COLUMN: Arc<String> = Arc::new( String::from("end"));
37}
38
39/// A `ViewMaker` for creating `ThreadSpansView` instances.
40#[derive(Debug)]
41pub struct ThreadSpansViewMaker {}
42
43impl ViewMaker for ThreadSpansViewMaker {
44    fn make_view(&self, stream_id: &str) -> Result<Arc<dyn View>> {
45        Ok(Arc::new(ThreadSpansView::new(stream_id)?))
46    }
47
48    fn get_schema_hash(&self) -> Vec<u8> {
49        vec![SCHEMA_VERSION]
50    }
51
52    fn get_schema(&self) -> Arc<Schema> {
53        Arc::new(get_spans_schema())
54    }
55}
56
57/// A view of thread spans.
58#[derive(Debug)]
59pub struct ThreadSpansView {
60    view_set_name: Arc<String>,
61    view_instance_id: Arc<String>,
62    stream_id: sqlx::types::Uuid,
63}
64
65impl ThreadSpansView {
66    pub fn new(view_instance_id: &str) -> Result<Self> {
67        if view_instance_id == "global" {
68            anyhow::bail!("the global view is not implemented for thread spans");
69        }
70
71        Ok(Self {
72            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
73            view_instance_id: Arc::new(String::from(view_instance_id)),
74            stream_id: Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?,
75        })
76    }
77}
78
79#[span_fn]
80async fn append_call_tree(
81    record_builder: &mut SpanRecordBuilder,
82    convert_ticks: &ConvertTicks,
83    blocks: &[BlockMetadata],
84    blob_storage: Arc<BlobStorage>,
85    stream: &crate::metadata::StreamMetadata,
86) -> Result<()> {
87    let call_tree = make_call_tree(
88        blocks,
89        convert_ticks.delta_ticks_to_ns(blocks[0].begin_ticks),
90        convert_ticks.delta_ticks_to_ns(blocks[blocks.len() - 1].end_ticks),
91        None,
92        blob_storage,
93        convert_ticks.clone(),
94        stream,
95    )
96    .await
97    .with_context(|| "make_call_tree")?;
98    record_builder
99        .append_call_tree(&call_tree)
100        .with_context(|| "adding call tree to span record builder")?;
101    Ok(())
102}
103
104/// Writes a partition from a set of blocks.
105#[span_fn]
106async fn write_partition(
107    lake: Arc<DataLakeConnection>,
108    view_meta: ViewMetadata,
109    schema: Arc<Schema>,
110    convert_ticks: &ConvertTicks,
111    spec: &SourceDataBlocksInMemory,
112) -> Result<()> {
113    let nb_events = hash_to_object_count(&spec.block_ids_hash)? as usize;
114    info!("nb_events: {nb_events}");
115    let mut record_builder = SpanRecordBuilder::with_capacity(nb_events / 2);
116    let mut blocks_to_process = vec![];
117    let mut last_end = None;
118    if spec.blocks.is_empty() {
119        anyhow::bail!("empty partition spec");
120    }
121    // for jit partitions, we assume that the blocks were registered in order
122    // since they are built based on begin_ticks, not insert_time
123    let min_insert_time = spec.blocks[0].block.insert_time;
124    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
125
126    let (tx, rx) = tokio::sync::mpsc::channel(1);
127    let null_response_writer = Arc::new(ResponseWriter::new(None));
128    let join_handle = spawn_with_context(write_partition_from_rows(
129        lake.clone(),
130        view_meta,
131        schema,
132        TimeRange::new(min_insert_time, max_insert_time),
133        spec.block_ids_hash.clone(),
134        rx,
135        null_response_writer,
136    ));
137
138    for block in &spec.blocks {
139        if block.block.begin_ticks == last_end.unwrap_or(block.block.begin_ticks) {
140            last_end = Some(block.block.end_ticks);
141            blocks_to_process.push(block.block.clone());
142        } else {
143            append_call_tree(
144                &mut record_builder,
145                convert_ticks,
146                &blocks_to_process,
147                lake.blob_storage.clone(),
148                &block.stream,
149            )
150            .await?;
151            last_end = Some(block.block.end_ticks);
152            blocks_to_process = vec![block.block.clone()];
153        }
154    }
155    if !blocks_to_process.is_empty() {
156        append_call_tree(
157            &mut record_builder,
158            convert_ticks,
159            &blocks_to_process,
160            lake.blob_storage.clone(),
161            &spec.blocks[0].stream,
162        )
163        .await?;
164    }
165    let min_time_row = convert_ticks.delta_ticks_to_time(spec.blocks[0].block.begin_ticks);
166    let max_time_row =
167        convert_ticks.delta_ticks_to_time(spec.blocks[spec.blocks.len() - 1].block.end_ticks);
168    let rows = record_builder
169        .finish()
170        .with_context(|| "record_builder.finish()")?;
171    info!("writing {} rows", rows.num_rows());
172    tx.send(PartitionRowSet {
173        rows_time_range: TimeRange::new(min_time_row, max_time_row),
174        rows,
175    })
176    .await?;
177    drop(tx);
178    join_handle.await??;
179    Ok(())
180}
181/// Rebuilds the partition if it's missing or out of date.
182#[span_fn]
183async fn update_partition(
184    lake: Arc<DataLakeConnection>,
185    view_meta: ViewMetadata,
186    schema: Arc<Schema>,
187    convert_ticks: &ConvertTicks,
188    spec: &SourceDataBlocksInMemory,
189) -> Result<()> {
190    if is_jit_partition_up_to_date(&lake.db_pool, view_meta.clone(), spec).await? {
191        return Ok(());
192    }
193    write_partition(lake, view_meta, schema, convert_ticks, spec)
194        .await
195        .with_context(|| "write_partition")?;
196
197    Ok(())
198}
199
200#[async_trait]
201impl View for ThreadSpansView {
202    fn get_view_set_name(&self) -> Arc<String> {
203        self.view_set_name.clone()
204    }
205
206    fn get_view_instance_id(&self) -> Arc<String> {
207        self.view_instance_id.clone()
208    }
209
210    async fn make_batch_partition_spec(
211        &self,
212        _lakehouse: Arc<LakehouseContext>,
213        _existing_partitions: Arc<PartitionCache>,
214        _insert_range: TimeRange,
215    ) -> Result<Arc<dyn PartitionSpec>> {
216        anyhow::bail!("not implemented")
217    }
218
219    fn get_file_schema_hash(&self) -> Vec<u8> {
220        vec![SCHEMA_VERSION]
221    }
222
223    fn get_file_schema(&self) -> Arc<Schema> {
224        Arc::new(get_spans_schema())
225    }
226
227    #[span_fn]
228    async fn jit_update(
229        &self,
230        lakehouse: Arc<LakehouseContext>,
231        query_range: Option<TimeRange>,
232    ) -> Result<()> {
233        if query_range.is_none() {
234            anyhow::bail!("query range mandatory for thread spans view");
235        }
236        let query_range = query_range.unwrap();
237        let stream = Arc::new(
238            find_stream(&lakehouse.lake().db_pool, self.stream_id)
239                .await
240                .with_context(|| "find_stream")?,
241        );
242        let process = Arc::new(
243            find_process(&lakehouse.lake().db_pool, &stream.process_id)
244                .await
245                .with_context(|| "find_process")?,
246        );
247        let convert_ticks =
248            make_time_converter_from_db(&lakehouse.lake().db_pool, &process).await?;
249        let blocks_view = BlocksView::new()?;
250        let partitions = generate_stream_jit_partitions(
251            &JitPartitionConfig::default(),
252            lakehouse.clone(),
253            &blocks_view,
254            &query_range,
255            stream.clone(),
256            process.clone(),
257        )
258        .await
259        .with_context(|| "generate_stream_jit_partitions")?;
260        for part in &partitions {
261            update_partition(
262                lakehouse.lake().clone(),
263                ViewMetadata {
264                    view_set_name: self.get_view_set_name(),
265                    view_instance_id: self.get_view_instance_id(),
266                    file_schema_hash: self.get_file_schema_hash(),
267                },
268                self.get_file_schema(),
269                &convert_ticks,
270                part,
271            )
272            .await
273            .with_context(|| "update_partition")?;
274        }
275        Ok(())
276    }
277
278    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
279        Ok(vec![
280            Expr::BinaryExpr(BinaryExpr::new(
281                col("begin").into(),
282                Operator::LtEq,
283                Expr::Literal(datetime_to_scalar(end), None).into(),
284            )),
285            Expr::BinaryExpr(BinaryExpr::new(
286                col("end").into(),
287                Operator::GtEq,
288                Expr::Literal(datetime_to_scalar(begin), None).into(),
289            )),
290        ])
291    }
292
293    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
294        Arc::new(NamedColumnsTimeBounds::new(
295            MIN_TIME_COLUMN.clone(),
296            MAX_TIME_COLUMN.clone(),
297        ))
298    }
299
300    fn get_update_group(&self) -> Option<i32> {
301        None
302    }
303}