micromegas_analytics/lakehouse/
sql_batch_view.rs1use super::{
2 batch_update::PartitionCreationStrategy,
3 dataframe_time_bounds::{DataFrameTimeBounds, NamedColumnsTimeBounds},
4 lakehouse_context::LakehouseContext,
5 materialized_view::MaterializedView,
6 merge::{PartitionMerger, QueryMerger},
7 partition::Partition,
8 partition_cache::{NullPartitionProvider, PartitionCache},
9 query::make_session_context,
10 session_configurator::SessionConfigurator,
11 sql_partition_spec::fetch_sql_partition_spec,
12 view::{PartitionSpec, View, ViewMetadata},
13 view_factory::ViewFactory,
14};
15use crate::{
16 record_batch_transformer::TrivialRecordBatchTransformer,
17 time::{TimeRange, datetime_to_scalar},
18};
19use anyhow::{Context, Result};
20use async_trait::async_trait;
21use chrono::{DateTime, TimeDelta, Utc};
22use datafusion::{
23 arrow::datatypes::Schema,
24 execution::{SendableRecordBatchStream, runtime_env::RuntimeEnv},
25 prelude::*,
26 sql::TableReference,
27};
28use micromegas_ingestion::data_lake_connection::DataLakeConnection;
29use micromegas_tracing::error;
30use std::hash::Hash;
31use std::hash::Hasher;
32use std::{hash::DefaultHasher, sync::Arc};
33
34pub type MergerMaker =
36 dyn Fn(Arc<RuntimeEnv>, Arc<Schema>) -> Arc<dyn PartitionMerger> + Send + Sync;
37
38#[derive(Debug)]
40pub struct SqlBatchView {
41 view_set_name: Arc<String>,
42 view_instance_id: Arc<String>,
43 min_event_time_column: Arc<String>,
44 max_event_time_column: Arc<String>,
45 count_src_query: Arc<String>,
46 extract_query: Arc<String>,
47 merge_partitions_query: Arc<String>,
48 schema: Arc<Schema>,
49 merger: Arc<dyn PartitionMerger>,
50 view_factory: Arc<ViewFactory>,
51 session_configurator: Arc<dyn SessionConfigurator>,
52 update_group: Option<i32>,
53 max_partition_delta_from_source: TimeDelta,
54 max_partition_delta_from_merge: TimeDelta,
55}
56
57impl SqlBatchView {
58 #[expect(clippy::too_many_arguments)]
59 pub async fn new(
73 runtime: Arc<RuntimeEnv>,
74 view_set_name: Arc<String>,
75 min_event_time_column: Arc<String>,
76 max_event_time_column: Arc<String>,
77 count_src_query: Arc<String>,
78 extract_query: Arc<String>,
79 merge_partitions_query: Arc<String>,
80 lake: Arc<DataLakeConnection>,
81 view_factory: Arc<ViewFactory>,
82 session_configurator: Arc<dyn SessionConfigurator>,
83 update_group: Option<i32>,
84 max_partition_delta_from_source: TimeDelta,
85 max_partition_delta_from_merge: TimeDelta,
86 merger_maker: Option<&MergerMaker>,
87 ) -> Result<Self> {
88 let null_part_provider = Arc::new(NullPartitionProvider {});
89 let lakehouse = Arc::new(LakehouseContext::new(lake.clone(), runtime.clone()));
90 let ctx = make_session_context(
91 lakehouse,
92 null_part_provider,
93 None,
94 view_factory.clone(),
95 session_configurator.clone(),
96 )
97 .await
98 .with_context(|| "make_session_context")?;
99 let now_str = Utc::now().to_rfc3339();
100 let sql = extract_query
101 .replace("{begin}", &now_str)
102 .replace("{end}", &now_str);
103 let extracted_df = ctx.sql(&sql).await?;
104 let schema = extracted_df.schema().inner().clone();
105 let session_configurator_for_merger = session_configurator.clone();
106 let merger = merger_maker.unwrap_or(&|_runtime, schema| {
107 let merge_query = Arc::new(merge_partitions_query.replace("{source}", "source"));
108 Arc::new(QueryMerger::new(
109 view_factory.clone(),
110 session_configurator_for_merger.clone(),
111 schema,
112 merge_query,
113 ))
114 })(runtime.clone(), schema.clone());
115
116 Ok(Self {
117 view_set_name,
118 view_instance_id: Arc::new(String::from("global")),
119 min_event_time_column,
120 max_event_time_column,
121 count_src_query,
122 extract_query,
123 merge_partitions_query,
124 schema,
125 merger,
126 view_factory,
127 session_configurator,
128 update_group,
129 max_partition_delta_from_source,
130 max_partition_delta_from_merge,
131 })
132 }
133}
134
135#[async_trait]
136impl View for SqlBatchView {
137 fn get_view_set_name(&self) -> Arc<String> {
138 self.view_set_name.clone()
139 }
140
141 fn get_view_instance_id(&self) -> Arc<String> {
142 self.view_instance_id.clone()
143 }
144
145 async fn make_batch_partition_spec(
146 &self,
147 lakehouse: Arc<LakehouseContext>,
148 existing_partitions: Arc<PartitionCache>,
149 insert_range: TimeRange,
150 ) -> Result<Arc<dyn PartitionSpec>> {
151 let view_meta = ViewMetadata {
152 view_set_name: self.get_view_set_name(),
153 view_instance_id: self.get_view_instance_id(),
154 file_schema_hash: self.get_file_schema_hash(),
155 };
156 let partitions_in_range = Arc::new(existing_partitions.filter_insert_range(insert_range));
157 let ctx = make_session_context(
158 lakehouse,
159 partitions_in_range.clone(),
160 None,
161 self.view_factory.clone(),
162 self.session_configurator.clone(),
163 )
164 .await
165 .with_context(|| "make_session_context")?;
166
167 let count_src_sql = self
168 .count_src_query
169 .replace("{begin}", &insert_range.begin.to_rfc3339())
170 .replace("{end}", &insert_range.end.to_rfc3339());
171
172 let extract_sql = self
173 .extract_query
174 .replace("{begin}", &insert_range.begin.to_rfc3339())
175 .replace("{end}", &insert_range.end.to_rfc3339());
176
177 Ok(Arc::new(
178 fetch_sql_partition_spec(
179 ctx,
180 Arc::new(TrivialRecordBatchTransformer {}),
181 self.get_time_bounds(),
182 self.schema.clone(),
183 count_src_sql,
184 extract_sql,
185 view_meta,
186 insert_range,
187 )
188 .await
189 .with_context(|| "fetch_sql_partition_spec")?,
190 ))
191 }
192
193 fn get_file_schema_hash(&self) -> Vec<u8> {
194 let mut hasher = DefaultHasher::new();
195 self.schema.hash(&mut hasher);
196 hasher.finish().to_le_bytes().to_vec()
197 }
198
199 fn get_file_schema(&self) -> Arc<Schema> {
200 self.schema.clone()
201 }
202
203 async fn jit_update(
204 &self,
205 _lakehouse: Arc<LakehouseContext>,
206 _query_range: Option<TimeRange>,
207 ) -> Result<()> {
208 Ok(())
209 }
210
211 fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
212 Ok(vec![
213 col(&*self.min_event_time_column).lt_eq(lit(datetime_to_scalar(end))),
214 col(&*self.max_event_time_column).gt_eq(lit(datetime_to_scalar(begin))),
215 ])
216 }
217
218 fn get_time_bounds(&self) -> Arc<dyn DataFrameTimeBounds> {
219 Arc::new(NamedColumnsTimeBounds::new(
220 self.min_event_time_column.clone(),
221 self.max_event_time_column.clone(),
222 ))
223 }
224
225 async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> {
226 let view_name = self.get_view_set_name().to_string();
227 let partitions_table_name = format!("__{view_name}__partitions");
228 ctx.register_table(
229 TableReference::Bare {
230 table: partitions_table_name.clone().into(),
231 },
232 Arc::new(table),
233 )?;
234 let df = ctx
235 .sql(
236 &self
237 .merge_partitions_query
238 .replace("{source}", &partitions_table_name),
239 )
240 .await?;
241 ctx.register_table(
242 TableReference::Bare {
243 table: view_name.into(),
244 },
245 df.into_view(),
246 )?;
247 Ok(())
248 }
249
250 async fn merge_partitions(
251 &self,
252 lakehouse: Arc<LakehouseContext>,
253 partitions_to_merge: Arc<Vec<Partition>>,
254 partitions_all_views: Arc<PartitionCache>,
255 insert_range: TimeRange,
256 ) -> Result<SendableRecordBatchStream> {
257 let res = self
258 .merger
259 .execute_merge_query(
260 lakehouse,
261 partitions_to_merge,
262 partitions_all_views,
263 insert_range,
264 )
265 .await;
266 if let Err(e) = &res {
267 error!("{e:?}");
268 }
269 res
270 }
271
272 fn get_update_group(&self) -> Option<i32> {
273 self.update_group
274 }
275
276 fn get_max_partition_time_delta(&self, strategy: &PartitionCreationStrategy) -> TimeDelta {
277 match strategy {
278 PartitionCreationStrategy::Abort | PartitionCreationStrategy::CreateFromSource => {
279 self.max_partition_delta_from_source
280 }
281 PartitionCreationStrategy::MergeExisting(_partitions) => {
282 self.max_partition_delta_from_merge
283 }
284 }
285 }
286}