micromegas_analytics/lakehouse/
sql_batch_view.rs

1use super::{
2    batch_update::PartitionCreationStrategy,
3    dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4    lakehouse_context::LakehouseContext,
5    materialized_view::MaterializedView,
6    merge::{PartitionMerger, QueryMerger},
7    partition::Partition,
8    partition_cache::{NullPartitionProvider, PartitionCache},
9    query::make_session_context,
10    session_configurator::SessionConfigurator,
11    sql_partition_spec::fetch_sql_partition_spec,
12    view::{PartitionSpec, View, ViewMetadata},
13    view_factory::ViewFactory,
14};
15use crate::{
16    record_batch_transformer::TrivialRecordBatchTransformer,
17    time::{TimeRange, datetime_to_scalar},
18};
19use anyhow::{Context, Result};
20use async_trait::async_trait;
21use chrono::{DateTime, TimeDelta, Utc};
22use datafusion::{
23    arrow::datatypes::Schema,
24    execution::{SendableRecordBatchStream, runtime_env::RuntimeEnv},
25    prelude::*,
26    sql::TableReference,
27};
28use micromegas_ingestion::data_lake_connection::DataLakeConnection;
29use micromegas_tracing::error;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::{hash::DefaultHasher, sync::Arc};
33
34/// A type alias for a function that creates a `PartitionMerger`.
35pub type MergerMaker =
36    dyn Fn(Arc<RuntimeEnv>, Arc<Schema>) -> Arc<dyn PartitionMerger> + Send + Sync;
37
38/// SQL-defined view updated in batch
39#[derive(Debug)]
40pub struct SqlBatchView {
41    view_set_name: Arc<String>,
42    view_instance_id: Arc<String>,
43    min_event_time_column: Arc<String>,
44    max_event_time_column: Arc<String>,
45    count_src_query: Arc<String>,
46    extract_query: Arc<String>,
47    merge_partitions_query: Arc<String>,
48    schema: Arc<Schema>,
49    merger: Arc<dyn PartitionMerger>,
50    view_factory: Arc<ViewFactory>,
51    session_configurator: Arc<dyn SessionConfigurator>,
52    update_group: Option<i32>,
53    max_partition_delta_from_source: TimeDelta,
54    max_partition_delta_from_merge: TimeDelta,
55}
56
57impl SqlBatchView {
58    #[expect(clippy::too_many_arguments)]
59    /// # Arguments
60    ///
61    /// * `runtime` - datafusion runtime
62    /// * `view_set_name` - name of the table
63    /// * `min_event_time_column` - min(column) should result in the first timestamp in a dataframe
64    /// * `max_event_time_column` - max(column) should result in the last timestamp in a dataframe
65    /// * `count_src_query` - used to count the rows of the underlying data to know if a cached partition is up to date
66    /// * `extract_query` - used to extract the source data into a cached partition
67    /// * `merge_partitions_query` - used to merge multiple partitions into a single one (and user queries which are one multiple partitions by default)
68    /// * `lake` - data lake
69    /// * `view_factory` - all views accessible to the `count_src_query`
70    /// * `session_configurator` - configurator for registering custom tables (e.g., JSON files)
71    /// * `update_group` - tells the daemon which view should be materialized and in what order
72    pub async fn new(
73        runtime: Arc<RuntimeEnv>,
74        view_set_name: Arc<String>,
75        min_event_time_column: Arc<String>,
76        max_event_time_column: Arc<String>,
77        count_src_query: Arc<String>,
78        extract_query: Arc<String>,
79        merge_partitions_query: Arc<String>,
80        lake: Arc<DataLakeConnection>,
81        view_factory: Arc<ViewFactory>,
82        session_configurator: Arc<dyn SessionConfigurator>,
83        update_group: Option<i32>,
84        max_partition_delta_from_source: TimeDelta,
85        max_partition_delta_from_merge: TimeDelta,
86        merger_maker: Option<&MergerMaker>,
87    ) -> Result<Self> {
88        let null_part_provider = Arc::new(NullPartitionProvider {});
89        let lakehouse = Arc::new(LakehouseContext::new(lake.clone(), runtime.clone()));
90        let ctx = make_session_context(
91            lakehouse,
92            null_part_provider,
93            None,
94            view_factory.clone(),
95            session_configurator.clone(),
96        )
97        .await
98        .with_context(|| "make_session_context")?;
99        let now_str = Utc::now().to_rfc3339();
100        let sql = extract_query
101            .replace("{begin}", &now_str)
102            .replace("{end}", &now_str);
103        let extracted_df = ctx.sql(&sql).await?;
104        let schema = extracted_df.schema().inner().clone();
105        let session_configurator_for_merger = session_configurator.clone();
106        let merger = merger_maker.unwrap_or(&|_runtime, schema| {
107            let merge_query = Arc::new(merge_partitions_query.replace("{source}", "source"));
108            Arc::new(QueryMerger::new(
109                view_factory.clone(),
110                session_configurator_for_merger.clone(),
111                schema,
112                merge_query,
113            ))
114        })(runtime.clone(), schema.clone());
115
116        Ok(Self {
117            view_set_name,
118            view_instance_id: Arc::new(String::from("global")),
119            min_event_time_column,
120            max_event_time_column,
121            count_src_query,
122            extract_query,
123            merge_partitions_query,
124            schema,
125            merger,
126            view_factory,
127            session_configurator,
128            update_group,
129            max_partition_delta_from_source,
130            max_partition_delta_from_merge,
131        })
132    }
133}
134
135#[async_trait]
136impl View for SqlBatchView {
137    fn get_view_set_name(&self) -> Arc<String> {
138        self.view_set_name.clone()
139    }
140
141    fn get_view_instance_id(&self) -> Arc<String> {
142        self.view_instance_id.clone()
143    }
144
145    async fn make_batch_partition_spec(
146        &self,
147        lakehouse: Arc<LakehouseContext>,
148        existing_partitions: Arc<PartitionCache>,
149        insert_range: TimeRange,
150    ) -> Result<Arc<dyn PartitionSpec>> {
151        let view_meta = ViewMetadata {
152            view_set_name: self.get_view_set_name(),
153            view_instance_id: self.get_view_instance_id(),
154            file_schema_hash: self.get_file_schema_hash(),
155        };
156        let partitions_in_range = Arc::new(existing_partitions.filter_insert_range(insert_range));
157        let ctx = make_session_context(
158            lakehouse,
159            partitions_in_range.clone(),
160            None,
161            self.view_factory.clone(),
162            self.session_configurator.clone(),
163        )
164        .await
165        .with_context(|| "make_session_context")?;
166
167        let count_src_sql = self
168            .count_src_query
169            .replace("{begin}", &insert_range.begin.to_rfc3339())
170            .replace("{end}", &insert_range.end.to_rfc3339());
171
172        let extract_sql = self
173            .extract_query
174            .replace("{begin}", &insert_range.begin.to_rfc3339())
175            .replace("{end}", &insert_range.end.to_rfc3339());
176
177        Ok(Arc::new(
178            fetch_sql_partition_spec(
179                ctx,
180                Arc::new(TrivialRecordBatchTransformer {}),
181                self.get_time_bounds(),
182                self.schema.clone(),
183                count_src_sql,
184                extract_sql,
185                view_meta,
186                insert_range,
187            )
188            .await
189            .with_context(|| "fetch_sql_partition_spec")?,
190        ))
191    }
192
193    fn get_file_schema_hash(&self) -> Vec<u8> {
194        let mut hasher = DefaultHasher::new();
195        self.schema.hash(&mut hasher);
196        hasher.finish().to_le_bytes().to_vec()
197    }
198
199    fn get_file_schema(&self) -> Arc<Schema> {
200        self.schema.clone()
201    }
202
203    async fn jit_update(
204        &self,
205        _lakehouse: Arc<LakehouseContext>,
206        _query_range: Option<TimeRange>,
207    ) -> Result<()> {
208        Ok(())
209    }
210
211    fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
212        Ok(vec![
213            col(&*self.min_event_time_column).lt_eq(lit(datetime_to_scalar(end))),
214            col(&*self.max_event_time_column).gt_eq(lit(datetime_to_scalar(begin))),
215        ])
216    }
217
218    fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
219        Arc::new(NamedColumnsTimeBounds::new(
220            self.min_event_time_column.clone(),
221            self.max_event_time_column.clone(),
222        ))
223    }
224
225    async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
226        let view_name = self.get_view_set_name().to_string();
227        let partitions_table_name = format!("__{view_name}__partitions");
228        ctx.register_table(
229            TableReference::Bare {
230                table: partitions_table_name.clone().into(),
231            },
232            Arc::new(table),
233        )?;
234        let df = ctx
235            .sql(
236                &self
237                    .merge_partitions_query
238                    .replace("{source}", &partitions_table_name),
239            )
240            .await?;
241        ctx.register_table(
242            TableReference::Bare {
243                table: view_name.into(),
244            },
245            df.into_view(),
246        )?;
247        Ok(())
248    }
249
250    async fn merge_partitions(
251        &self,
252        lakehouse: Arc<LakehouseContext>,
253        partitions_to_merge: Arc<Vec<Partition>>,
254        partitions_all_views: Arc<PartitionCache>,
255        insert_range: TimeRange,
256    ) -> Result<SendableRecordBatchStream> {
257        let res = self
258            .merger
259            .execute_merge_query(
260                lakehouse,
261                partitions_to_merge,
262                partitions_all_views,
263                insert_range,
264            )
265            .await;
266        if let Err(e) = &res {
267            error!("{e:?}");
268        }
269        res
270    }
271
272    fn get_update_group(&self) -> Option<i32> {
273        self.update_group
274    }
275
276    fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
277        match strategy {
278            PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
279                self.max_partition_delta_from_source
280            }
281            PartitionCreationStrategy::MergeExisting(_partitions) => {
282                self.max_partition_delta_from_merge
283            }
284        }
285    }
286}