micromegas_analytics/lakehouse/
batch_partition_merger.rs1use super::{
2 lakehouse_context::LakehouseContext, merge::PartitionMerger, partition::Partition,
3 partition_cache::PartitionCache, session_configurator::SessionConfigurator,
4 view_factory::ViewFactory,
5};
6use crate::{
7 lakehouse::{
8 partitioned_table_provider::PartitionedTableProvider, query::make_session_context,
9 },
10 time::{TimeRange, datetime_to_scalar},
11};
12use anyhow::Result;
13use async_trait::async_trait;
14use chrono::{DateTime, TimeDelta, Utc};
15use datafusion::{
16 arrow::datatypes::Schema, error::DataFusionError, execution::SendableRecordBatchStream,
17 physical_plan::stream::RecordBatchReceiverStreamBuilder, sql::TableReference,
18};
19use futures::TryStreamExt;
20use futures::{StreamExt, stream};
21use micromegas_tracing::prelude::*;
22use std::sync::Arc;
23
24struct PartitionStats {
26 pub num_rows: i64,
27 pub min_event_time: DateTime<Utc>,
28 pub max_event_time: DateTime<Utc>,
29}
30
31fn compute_partition_stats(partitions: &[Partition]) -> Result<PartitionStats> {
32 let non_empty: Vec<_> = partitions.iter().filter(|p| !p.is_empty()).collect();
34
35 if non_empty.is_empty() {
36 anyhow::bail!(
37 "compute_partition_stats given only empty partitions (should be filtered at caller)"
38 );
39 }
40
41 let first = non_empty.first().unwrap();
42 let first_event_range = first
43 .event_time_range
44 .ok_or_else(|| anyhow::anyhow!("non-empty partition has no event_time_range"))?;
45
46 let state = PartitionStats {
47 num_rows: first.num_rows,
48 min_event_time: first_event_range.begin,
49 max_event_time: first_event_range.end,
50 };
51
52 non_empty
53 .iter()
54 .skip(1)
55 .try_fold(state, |state, part| -> Result<PartitionStats> {
56 let event_range = part
57 .event_time_range
58 .ok_or_else(|| anyhow::anyhow!("non-empty partition has no event_time_range"))?;
59 Ok(PartitionStats {
60 num_rows: state.num_rows + part.num_rows,
61 min_event_time: state.min_event_time.min(event_range.begin),
62 max_event_time: state.max_event_time.max(event_range.end),
63 })
64 })
65}
66
67#[derive(Debug)]
70pub struct BatchPartitionMerger {
71 file_schema: Arc<Schema>,
73 view_factory: Arc<ViewFactory>,
75 session_configurator: Arc<dyn SessionConfigurator>,
77 merge_batch_query: String,
79 approx_nb_rows_per_batch: i64,
81}
82
83impl BatchPartitionMerger {
84 pub fn new(
85 file_schema: Arc<Schema>,
86 view_factory: Arc<ViewFactory>,
87 session_configurator: Arc<dyn SessionConfigurator>,
88 merge_batch_query: String,
89 approx_nb_rows_per_batch: i64,
90 ) -> Self {
91 Self {
92 file_schema,
93 view_factory,
94 session_configurator,
95 merge_batch_query,
96 approx_nb_rows_per_batch,
97 }
98 }
99}
100
101#[async_trait]
102impl PartitionMerger for BatchPartitionMerger {
103 #[span_fn]
104 async fn execute_merge_query(
105 &self,
106 lakehouse: Arc<LakehouseContext>,
107 partitions_to_merge: Arc<Vec<Partition>>,
108 partitions_all_views: Arc<PartitionCache>,
109 insert_range: TimeRange,
110 ) -> Result<SendableRecordBatchStream> {
111 info!("execute_merge_query");
112
113 if partitions_to_merge.iter().all(|p| p.is_empty()) {
115 debug!("all partitions are empty, returning empty stream");
116 let builder = RecordBatchReceiverStreamBuilder::new(self.file_schema.clone(), 1);
117 return Ok(builder.build());
118 }
119
120 let stats = compute_partition_stats(partitions_to_merge.as_ref())?;
121 let nb_batches = ((stats.num_rows / self.approx_nb_rows_per_batch) + 1) as i32;
122 let batch_time_delta = ((stats.max_event_time - stats.min_event_time) / nb_batches)
123 + TimeDelta::nanoseconds(1);
124
125 let file_schema = self.file_schema.clone();
126 let reader_factory = lakehouse.reader_factory().clone();
127 let ctx = make_session_context(
128 lakehouse.clone(),
129 partitions_all_views,
130 Some(insert_range),
131 self.view_factory.clone(),
132 self.session_configurator.clone(),
133 )
134 .await?;
135 let src_table =
136 PartitionedTableProvider::new(file_schema, reader_factory, partitions_to_merge);
137 ctx.register_table(
138 TableReference::Bare {
139 table: "source".into(),
140 },
141 Arc::new(src_table),
142 )?;
143 let df_template = ctx.sql(&self.merge_batch_query).await.map_err(|e| {
144 DataFusionError::Execution(format!("building template for merge query: {e:?}"))
145 })?;
146
147 let mut builder = RecordBatchReceiverStreamBuilder::new(self.file_schema.clone(), 10);
148 let sender = builder.tx();
149 builder.spawn(async move {
150 let mut streams_stream = stream::iter(0..nb_batches)
151 .map(|i| {
152 let begin = stats.min_event_time + (batch_time_delta * i);
153 let end = begin + batch_time_delta;
154 debug!("merging batch {begin} {end}");
155 df_template
156 .clone()
157 .with_param_values(vec![
158 ("begin", datetime_to_scalar(begin)),
159 ("end", datetime_to_scalar(end)),
160 ])
161 .map(|df| async {
162 spawn_with_context(df.execute_stream())
163 .await
164 .map_err(|e| DataFusionError::External(e.into()))
165 })
166 })
167 .try_buffered(2);
168
169 while let Some(stream_res) = streams_stream.next().await {
170 let mut merge_stream = stream_res??;
171 let sender = sender.clone();
172 while let Some(rb_res) = merge_stream.next().await {
173 if let Err(e) = sender.send(rb_res).await {
174 error!("sending record batch: {e:?}");
175 }
176 }
177 }
178 Ok(())
179 });
180 debug!("building merge stream");
181 Ok(builder.build())
182 }
183}