micromegas_analytics/lakehouse/
async_events_view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 blocks_view::BlocksView,
4 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
5 jit_partitions::{JitPartitionConfig, write_partition_from_blocks},
6 lakehouse_context::LakehouseContext,
7 partition_cache::PartitionCache,
8 view::{PartitionSpec, View, ViewMetadata},
9 view_factory::{ViewFactory, ViewMaker},
10};
11use crate::{
12 async_events_table::async_events_table_schema,
13 lakehouse::jit_partitions::{generate_process_jit_partitions, is_jit_partition_up_to_date},
14 metadata::find_process_with_latest_timing,
15 time::{TimeRange, datetime_to_scalar, make_time_converter_from_latest_timing},
16};
17use anyhow::{Context, Result};
18use async_trait::async_trait;
19use chrono::{DateTime, TimeDelta, Utc};
20use datafusion::{
21 arrow::datatypes::Schema,
22 logical_expr::{Between, Expr, col},
23};
24use micromegas_tracing::prelude::*;
25use std::sync::Arc;
26use uuid::Uuid;
27
28use super::async_events_block_processor::AsyncEventsBlockProcessor;
29
30const VIEW_SET_NAME: &str = "async_events";
31const SCHEMA_VERSION: u8 = 2; lazy_static::lazy_static! {
33 static ref TIME_COLUMN: Arc<String> = Arc::new(String::from("time"));
34}
35
36#[derive(Debug)]
38pub struct AsyncEventsViewMaker {
39 view_factory: Arc<ViewFactory>,
40}
41
42impl AsyncEventsViewMaker {
43 pub fn new(view_factory: Arc<ViewFactory>) -> Self {
44 Self { view_factory }
45 }
46}
47
48impl ViewMaker for AsyncEventsViewMaker {
49 fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
50 Ok(Arc::new(AsyncEventsView::new(
51 view_instance_id,
52 self.view_factory.clone(),
53 )?))
54 }
55
56 fn get_schema_hash(&self) -> Vec<u8> {
57 vec![SCHEMA_VERSION]
58 }
59
60 fn get_schema(&self) -> Arc<Schema> {
61 Arc::new(async_events_table_schema())
62 }
63}
64
65#[derive(Debug)]
67pub struct AsyncEventsView {
68 view_set_name: Arc<String>,
69 view_instance_id: Arc<String>,
70 process_id: Option<sqlx::types::Uuid>,
71 view_factory: Arc<ViewFactory>,
72}
73
74impl AsyncEventsView {
75 pub fn new(view_instance_id: &str, view_factory: Arc<ViewFactory>) -> Result<Self> {
76 if view_instance_id == "global" {
77 anyhow::bail!("AsyncEventsView does not support global view access");
78 }
79
80 let process_id =
81 Some(Uuid::parse_str(view_instance_id).with_context(|| "Uuid::parse_str")?);
82
83 Ok(Self {
84 view_set_name: Arc::new(String::from(VIEW_SET_NAME)),
85 view_instance_id: Arc::new(view_instance_id.into()),
86 process_id,
87 view_factory,
88 })
89 }
90}
91
92#[async_trait]
93impl View for AsyncEventsView {
94 fn get_view_set_name(&self) -> Arc<String> {
95 self.view_set_name.clone()
96 }
97
98 fn get_view_instance_id(&self) -> Arc<String> {
99 self.view_instance_id.clone()
100 }
101
102 async fn make_batch_partition_spec(
103 &self,
104 _lakehouse: Arc<LakehouseContext>,
105 _existing_partitions: Arc<PartitionCache>,
106 _insert_range: TimeRange,
107 ) -> Result<Arc<dyn PartitionSpec>> {
108 anyhow::bail!("AsyncEventsView does not support batch partition specs");
109 }
110
111 fn get_file_schema_hash(&self) -> Vec<u8> {
112 vec![SCHEMA_VERSION]
113 }
114
115 fn get_file_schema(&self) -> Arc<Schema> {
116 Arc::new(async_events_table_schema())
117 }
118
119 #[span_fn]
120 async fn jit_update(
121 &self,
122 lakehouse: Arc<LakehouseContext>,
123 query_range: Option<TimeRange>,
124 ) -> Result<()> {
125 let (process, last_block_end_ticks, last_block_end_time) = find_process_with_latest_timing(
126 lakehouse.clone(),
127 self.view_factory.clone(),
128 &self
129 .process_id
130 .with_context(|| "getting a view's process_id")?,
131 query_range,
132 )
133 .await
134 .with_context(|| "find_process_with_latest_timing")?;
135
136 let process = Arc::new(process);
137 let query_range =
138 query_range.unwrap_or_else(|| TimeRange::new(process.start_time, last_block_end_time));
139
140 let convert_ticks = Arc::new(
142 make_time_converter_from_latest_timing(
143 &process,
144 last_block_end_ticks,
145 last_block_end_time,
146 )
147 .with_context(|| "make_time_converter_from_latest_timing")?,
148 );
149
150 let blocks_view = BlocksView::new()?;
152 let all_partitions = generate_process_jit_partitions(
153 &JitPartitionConfig::default(),
154 lakehouse.clone(),
155 &blocks_view,
156 &query_range,
157 process.clone(),
158 "cpu",
159 )
160 .await
161 .with_context(|| "generate_process_jit_partitions")?;
162 let view_meta = ViewMetadata {
163 view_set_name: self.get_view_set_name(),
164 view_instance_id: self.get_view_instance_id(),
165 file_schema_hash: self.get_file_schema_hash(),
166 };
167
168 for part in all_partitions {
169 if !is_jit_partition_up_to_date(&lakehouse.lake().db_pool, view_meta.clone(), &part)
170 .await?
171 {
172 write_partition_from_blocks(
173 lakehouse.lake().clone(),
174 view_meta.clone(),
175 self.get_file_schema(),
176 part,
177 Arc::new(AsyncEventsBlockProcessor::new(convert_ticks.clone())),
178 )
179 .await
180 .with_context(|| "write_partition_from_blocks")?;
181 }
182 }
183 Ok(())
184 }
185
186 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
187 Ok(vec![Expr::Between(Between::new(
188 col("time").into(),
189 false,
190 Expr::Literal(datetime_to_scalar(begin), None).into(),
191 Expr::Literal(datetime_to_scalar(end), None).into(),
192 ))])
193 }
194
195 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
196 Arc::new(NamedColumnsTimeBounds::new(
197 TIME_COLUMN.clone(),
198 TIME_COLUMN.clone(),
199 ))
200 }
201
202 fn get_update_group(&self) -> Option<i32> {
203 None }
205
206 fn get_max_partition_time_delta(&self, _strategy: &PartitionCreationStrategy) -> TimeDelta {
207 TimeDelta::hours(1)
208 }
209}