micromegas_analytics/lakehouse/
async_events_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    block_partition_spec::{BlockProcessor, BlockProcessorMap},
4    blocks_view::BlocksView,
5    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
6    jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
7    lakehouse_context::LakehouseContext,
8    partition_cache::PartitionCache,
9    view::{PartitionSpec, View, ViewMetadata},
10    view_factory::{ViewFactory, ViewMaker},
11};
12use crate::{
13    async_events_table::async_events_table_schema,
14    lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
15    metadata::find_process_with_latest_timing,
16    time::{TimeRange, datetime_to_scalar, make_time_converter_from_latest_timing},
17};
18use anyhow::{Context, Result};
19use async_trait::async_trait;
20use chrono::{DateTime, TimeDelta, Utc};
21use datafusion::{
22    arrow::datatypes::Schema,
23    logical_expr::{Between, Expr, col},
24};
25use micromegas_ingestion::web_ingestion_service::FORMAT_TRANSIT;
26use micromegas_tracing::prelude::*;
27use std::collections::HashMap;
28use std::sync::Arc;
29use uuid::Uuid;
30
31use super::async_events_block_processor::AsyncEventsBlockProcessor;
32
33const VIEW_SET_NAME: &str = "async_events";
34const SCHEMA_VERSION: u8 = 2; // Version 2: added hash field
35lazy_static::lazy_static! {
36    static ref TIME_COLUMN: Arc<String> = Arc::new(String::from("time"));
37}
38
39/// A `ViewMaker` for creating `AsyncEventsView` instances.
40#[derive(Debug)]
41pub struct AsyncEventsViewMaker {
42    view_factory: Arc<ViewFactory>,
43}
44
45impl AsyncEventsViewMaker {
46    pub fn new(view_factory: Arc<ViewFactory>) -> Self {
47        Self { view_factory }
48    }
49}
50
51impl ViewMaker for AsyncEventsViewMaker {
52    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
53        Ok(Arc::new(AsyncEventsView::new(
54            view_instance_id,
55            self.view_factory.clone(),
56        )?))
57    }
58
59    fn get_schema_hash(&self) -> Vec<u8> {
60        vec![SCHEMA_VERSION]
61    }
62
63    fn get_schema(&self) -> Arc<Schema> {
64        Arc::new(async_events_table_schema())
65    }
66}
67
68/// A view of async span events.
69#[derive(Debug)]
70pub struct AsyncEventsView {
71    view_set_name: Arc<String>,
72    view_instance_id: Arc<String>,
73    process_id: Option<sqlx::types::Uuid>,
74    view_factory: Arc<ViewFactory>,
75}
76
77impl AsyncEventsView {
78    pub fn new(view_instance_id: &str, view_factory: Arc<ViewFactory>) -> Result<Self> {
79        if view_instance_id == "global" {
80            anyhow::bail!("AsyncEventsView does not support global view access");
81        }
82
83        let process_id =
84            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?);
85
86        Ok(Self {
87            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
88            view_instance_id: Arc::new(view_instance_id.into()),
89            process_id,
90            view_factory,
91        })
92    }
93}
94
95#[async_trait]
96impl View for AsyncEventsView {
97    fn get_view_set_name(&self) -> Arc<String> {
98        self.view_set_name.clone()
99    }
100
101    fn get_view_instance_id(&self) -> Arc<String> {
102        self.view_instance_id.clone()
103    }
104
105    async fn make_batch_partition_spec(
106        &self,
107        _lakehouse: Arc<LakehouseContext>,
108        _existing_partitions: Arc<PartitionCache>,
109        _insert_range: TimeRange,
110    ) -> Result<Arc<dyn PartitionSpec>> {
111        anyhow::bail!("AsyncEventsView does not support batch partition specs");
112    }
113
114    fn get_file_schema_hash(&self) -> Vec<u8> {
115        vec![SCHEMA_VERSION]
116    }
117
118    fn get_file_schema(&self) -> Arc<Schema> {
119        Arc::new(async_events_table_schema())
120    }
121
122    #[span_fn]
123    async fn jit_update(
124        &self,
125        lakehouse: Arc<LakehouseContext>,
126        query_range: Option<TimeRange>,
127    ) -> Result<()> {
128        let (process, last_block_end_ticks, last_block_end_time) = find_process_with_latest_timing(
129            lakehouse.clone(),
130            self.view_factory.clone(),
131            &self
132                .process_id
133                .with_context(|| "getting a view's process_id")?,
134            query_range,
135        )
136        .await
137        .with_context(|| "find_process_with_latest_timing")?;
138
139        let process = Arc::new(process);
140        let query_range =
141            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, last_block_end_time));
142
143        // Create a consistent ConvertTicks using the latest timing information
144        let convert_ticks = Arc::new(
145            make_time_converter_from_latest_timing(
146                &process,
147                last_block_end_ticks,
148                last_block_end_time,
149            )
150            .with_context(|| "make_time_converter_from_latest_timing")?,
151        );
152
153        // Use process-based partition generation to get all streams for this process
154        let blocks_view = BlocksView::new()?;
155        let all_partitions = generate_process_jit_partitions(
156            &JitPartitionConfig::default(),
157            lakehouse.clone(),
158            &blocks_view,
159            &query_range,
160            process.clone(),
161            "cpu",
162        )
163        .await
164        .with_context(|| "generate_process_jit_partitions")?;
165        let view_meta = ViewMetadata {
166            view_set_name: self.get_view_set_name(),
167            view_instance_id: self.get_view_instance_id(),
168            file_schema_hash: self.get_file_schema_hash(),
169        };
170
171        // async_events only sources native transit streams (tagged "cpu");
172        // OTel spans go to the dedicated otel_spans view.
173        let mut block_processors: BlockProcessorMap = HashMap::new();
174        block_processors.insert(
175            FORMAT_TRANSIT,
176            Arc::new(AsyncEventsBlockProcessor::new(convert_ticks.clone()))
177                as Arc<dyn BlockProcessor>,
178        );
179        let block_processors = Arc::new(block_processors);
180
181        for part in all_partitions {
182            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
183                .await?
184            {
185                write_partition_from_blocks(
186                    lakehouse.lake().clone(),
187                    view_meta.clone(),
188                    self.get_file_schema(),
189                    part,
190                    block_processors.clone(),
191                )
192                .await
193                .with_context(|| "write_partition_from_blocks")?;
194            }
195        }
196        Ok(())
197    }
198
199    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
200        Ok(vec![Expr::Between(Between::new(
201            col("time").into(),
202            false,
203            Expr::Literal(datetime_to_scalar(begin), None).into(),
204            Expr::Literal(datetime_to_scalar(end), None).into(),
205        ))])
206    }
207
208    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
209        Arc::new(NamedColumnsTimeBounds::new(
210            TIME_COLUMN.clone(),
211            TIME_COLUMN.clone(),
212        ))
213    }
214
215    fn get_update_group(&self) -> Option<i32> {
216        None // Process-specific views don't use update groups
217    }
218
219    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
220        TimeDelta::hours(1)
221    }
222}