micromegas_analytics/lakehouse/
async_events_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    blocks_view::BlocksView,
4    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
5    jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
6    lakehouse_context::LakehouseContext,
7    partition_cache::PartitionCache,
8    view::{PartitionSpec, View, ViewMetadata},
9    view_factory::{ViewFactory, ViewMaker},
10};
11use crate::{
12    async_events_table::async_events_table_schema,
13    lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
14    metadata::find_process_with_latest_timing,
15    time::{TimeRange, datetime_to_scalar, make_time_converter_from_latest_timing},
16};
17use anyhow::{Context, Result};
18use async_trait::async_trait;
19use chrono::{DateTime, TimeDelta, Utc};
20use datafusion::{
21    arrow::datatypes::Schema,
22    logical_expr::{Between, Expr, col},
23};
24use micromegas_tracing::prelude::*;
25use std::sync::Arc;
26use uuid::Uuid;
27
28use super::async_events_block_processor::AsyncEventsBlockProcessor;
29
30const VIEW_SET_NAME: &str = "async_events";
31const SCHEMA_VERSION: u8 = 2; // Version 2: added hash field
32lazy_static::lazy_static! {
33    static ref TIME_COLUMN: Arc<String> = Arc::new(String::from("time"));
34}
35
36/// A `ViewMaker` for creating `AsyncEventsView` instances.
37#[derive(Debug)]
38pub struct AsyncEventsViewMaker {
39    view_factory: Arc<ViewFactory>,
40}
41
42impl AsyncEventsViewMaker {
43    pub fn new(view_factory: Arc<ViewFactory>) -> Self {
44        Self { view_factory }
45    }
46}
47
48impl ViewMaker for AsyncEventsViewMaker {
49    fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
50        Ok(Arc::new(AsyncEventsView::new(
51            view_instance_id,
52            self.view_factory.clone(),
53        )?))
54    }
55
56    fn get_schema_hash(&self) -> Vec<u8> {
57        vec![SCHEMA_VERSION]
58    }
59
60    fn get_schema(&self) -> Arc<Schema> {
61        Arc::new(async_events_table_schema())
62    }
63}
64
65/// A view of async span events.
66#[derive(Debug)]
67pub struct AsyncEventsView {
68    view_set_name: Arc<String>,
69    view_instance_id: Arc<String>,
70    process_id: Option<sqlx::types::Uuid>,
71    view_factory: Arc<ViewFactory>,
72}
73
74impl AsyncEventsView {
75    pub fn new(view_instance_id: &str, view_factory: Arc<ViewFactory>) -> Result<Self> {
76        if view_instance_id == "global" {
77            anyhow::bail!("AsyncEventsView does not support global view access");
78        }
79
80        let process_id =
81            Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?);
82
83        Ok(Self {
84            view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
85            view_instance_id: Arc::new(view_instance_id.into()),
86            process_id,
87            view_factory,
88        })
89    }
90}
91
92#[async_trait]
93impl View for AsyncEventsView {
94    fn get_view_set_name(&self) -> Arc<String> {
95        self.view_set_name.clone()
96    }
97
98    fn get_view_instance_id(&self) -> Arc<String> {
99        self.view_instance_id.clone()
100    }
101
102    async fn make_batch_partition_spec(
103        &self,
104        _lakehouse: Arc<LakehouseContext>,
105        _existing_partitions: Arc<PartitionCache>,
106        _insert_range: TimeRange,
107    ) -> Result<Arc<dyn PartitionSpec>> {
108        anyhow::bail!("AsyncEventsView does not support batch partition specs");
109    }
110
111    fn get_file_schema_hash(&self) -> Vec<u8> {
112        vec![SCHEMA_VERSION]
113    }
114
115    fn get_file_schema(&self) -> Arc<Schema> {
116        Arc::new(async_events_table_schema())
117    }
118
119    #[span_fn]
120    async fn jit_update(
121        &self,
122        lakehouse: Arc<LakehouseContext>,
123        query_range: Option<TimeRange>,
124    ) -> Result<()> {
125        let (process, last_block_end_ticks, last_block_end_time) = find_process_with_latest_timing(
126            lakehouse.clone(),
127            self.view_factory.clone(),
128            &self
129                .process_id
130                .with_context(|| "getting a view's process_id")?,
131            query_range,
132        )
133        .await
134        .with_context(|| "find_process_with_latest_timing")?;
135
136        let process = Arc::new(process);
137        let query_range =
138            query_range.unwrap_or_else(|| TimeRange::new(process.start_time, last_block_end_time));
139
140        // Create a consistent ConvertTicks using the latest timing information
141        let convert_ticks = Arc::new(
142            make_time_converter_from_latest_timing(
143                &process,
144                last_block_end_ticks,
145                last_block_end_time,
146            )
147            .with_context(|| "make_time_converter_from_latest_timing")?,
148        );
149
150        // Use process-based partition generation to get all streams for this process
151        let blocks_view = BlocksView::new()?;
152        let all_partitions = generate_process_jit_partitions(
153            &JitPartitionConfig::default(),
154            lakehouse.clone(),
155            &blocks_view,
156            &query_range,
157            process.clone(),
158            "cpu",
159        )
160        .await
161        .with_context(|| "generate_process_jit_partitions")?;
162        let view_meta = ViewMetadata {
163            view_set_name: self.get_view_set_name(),
164            view_instance_id: self.get_view_instance_id(),
165            file_schema_hash: self.get_file_schema_hash(),
166        };
167
168        for part in all_partitions {
169            if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
170                .await?
171            {
172                write_partition_from_blocks(
173                    lakehouse.lake().clone(),
174                    view_meta.clone(),
175                    self.get_file_schema(),
176                    part,
177                    Arc::new(AsyncEventsBlockProcessor::new(convert_ticks.clone())),
178                )
179                .await
180                .with_context(|| "write_partition_from_blocks")?;
181            }
182        }
183        Ok(())
184    }
185
186    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
187        Ok(vec![Expr::Between(Between::new(
188            col("time").into(),
189            false,
190            Expr::Literal(datetime_to_scalar(begin), None).into(),
191            Expr::Literal(datetime_to_scalar(end), None).into(),
192        ))])
193    }
194
195    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
196        Arc::new(NamedColumnsTimeBounds::new(
197            TIME_COLUMN.clone(),
198            TIME_COLUMN.clone(),
199        ))
200    }
201
202    fn get_update_group(&self) -> Option<i32> {
203        None // Process-specific views don't use update groups
204    }
205
206    fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
207        TimeDelta::hours(1)
208    }
209}