micromegas_analytics/lakehouse/
batch_partition_merger.rs

1use super::{
2    lakehouse_context::LakehouseContext, merge::PartitionMerger, partition::Partition,
3    partition_cache::PartitionCache, session_configurator::SessionConfigurator,
4    view_factory::ViewFactory,
5};
6use crate::{
7    lakehouse::{
8        partitioned_table_provider::PartitionedTableProvider, query::make_session_context,
9    },
10    time::{TimeRange, datetime_to_scalar},
11};
12use anyhow::Result;
13use async_trait::async_trait;
14use chrono::{DateTime, TimeDelta, Utc};
15use datafusion::{
16    arrow::datatypes::Schema, error::DataFusionError, execution::SendableRecordBatchStream,
17    physical_plan::stream::RecordBatchReceiverStreamBuilder, sql::TableReference,
18};
19use futures::TryStreamExt;
20use futures::{StreamExt, stream};
21use micromegas_tracing::prelude::*;
22use std::sync::Arc;
23
24/// Statistics about a set of partitions.
25struct PartitionStats {
26    pub num_rows: i64,
27    pub min_event_time: DateTime<Utc>,
28    pub max_event_time: DateTime<Utc>,
29}
30
31fn compute_partition_stats(partitions: &[Partition]) -> Result<PartitionStats> {
32    // Filter out empty partitions before computing stats
33    let non_empty: Vec<_> = partitions.iter().filter(|p| !p.is_empty()).collect();
34
35    if non_empty.is_empty() {
36        anyhow::bail!(
37            "compute_partition_stats given only empty partitions (should be filtered at caller)"
38        );
39    }
40
41    let first = non_empty.first().unwrap();
42    let first_event_range = first
43        .event_time_range
44        .ok_or_else(|| anyhow::anyhow!("non-empty partition has no event_time_range"))?;
45
46    let state = PartitionStats {
47        num_rows: first.num_rows,
48        min_event_time: first_event_range.begin,
49        max_event_time: first_event_range.end,
50    };
51
52    non_empty
53        .iter()
54        .skip(1)
55        .try_fold(state, |state, part| -> Result<PartitionStats> {
56            let event_range = part
57                .event_time_range
58                .ok_or_else(|| anyhow::anyhow!("non-empty partition has no event_time_range"))?;
59            Ok(PartitionStats {
60                num_rows: state.num_rows + part.num_rows,
61                min_event_time: state.min_event_time.min(event_range.begin),
62                max_event_time: state.max_event_time.max(event_range.end),
63            })
64        })
65}
66
67/// Merges multiple partitions by splitting the work in batches to use less memory.
68/// The batches are based on event times.
69#[derive(Debug)]
70pub struct BatchPartitionMerger {
71    /// file_schema: arrow schema of the parquet files
72    file_schema: Arc<Schema>,
73    /// view_factory: allows joins in merge query
74    view_factory: Arc<ViewFactory>,
75    /// session_configurator: allows custom tables in merge query
76    session_configurator: Arc<dyn SessionConfigurator>,
77    /// merge_batch_query: merge query with begin & end placeholders
78    merge_batch_query: String,
79    /// batch size to aim for
80    approx_nb_rows_per_batch: i64,
81}
82
83impl BatchPartitionMerger {
84    pub fn new(
85        file_schema: Arc<Schema>,
86        view_factory: Arc<ViewFactory>,
87        session_configurator: Arc<dyn SessionConfigurator>,
88        merge_batch_query: String,
89        approx_nb_rows_per_batch: i64,
90    ) -> Self {
91        Self {
92            file_schema,
93            view_factory,
94            session_configurator,
95            merge_batch_query,
96            approx_nb_rows_per_batch,
97        }
98    }
99}
100
101#[async_trait]
102impl PartitionMerger for BatchPartitionMerger {
103    #[span_fn]
104    async fn execute_merge_query(
105        &self,
106        lakehouse: Arc<LakehouseContext>,
107        partitions_to_merge: Arc<Vec<Partition>>,
108        partitions_all_views: Arc<PartitionCache>,
109        insert_range: TimeRange,
110    ) -> Result<SendableRecordBatchStream> {
111        info!("execute_merge_query");
112
113        // If all partitions are empty, return empty stream immediately
114        if partitions_to_merge.iter().all(|p| p.is_empty()) {
115            debug!("all partitions are empty, returning empty stream");
116            let builder = RecordBatchReceiverStreamBuilder::new(self.file_schema.clone(), 1);
117            return Ok(builder.build());
118        }
119
120        let stats = compute_partition_stats(partitions_to_merge.as_ref())?;
121        let nb_batches = ((stats.num_rows / self.approx_nb_rows_per_batch) + 1) as i32;
122        let batch_time_delta = ((stats.max_event_time - stats.min_event_time) / nb_batches)
123            + TimeDelta::nanoseconds(1);
124
125        let file_schema = self.file_schema.clone();
126        let reader_factory = lakehouse.reader_factory().clone();
127        let ctx = make_session_context(
128            lakehouse.clone(),
129            partitions_all_views,
130            Some(insert_range),
131            self.view_factory.clone(),
132            self.session_configurator.clone(),
133        )
134        .await?;
135        let src_table =
136            PartitionedTableProvider::new(file_schema, reader_factory, partitions_to_merge);
137        ctx.register_table(
138            TableReference::Bare {
139                table: "source".into(),
140            },
141            Arc::new(src_table),
142        )?;
143        let df_template = ctx.sql(&self.merge_batch_query).await.map_err(|e| {
144            DataFusionError::Execution(format!("building template for merge query: {e:?}"))
145        })?;
146
147        let mut builder = RecordBatchReceiverStreamBuilder::new(self.file_schema.clone(), 10);
148        let sender = builder.tx();
149        builder.spawn(async move {
150            let mut streams_stream = stream::iter(0..nb_batches)
151                .map(|i| {
152                    let begin = stats.min_event_time + (batch_time_delta * i);
153                    let end = begin + batch_time_delta;
154                    debug!("merging batch {begin} {end}");
155                    df_template
156                        .clone()
157                        .with_param_values(vec![
158                            ("begin", datetime_to_scalar(begin)),
159                            ("end", datetime_to_scalar(end)),
160                        ])
161                        .map(|df| async {
162                            spawn_with_context(df.execute_stream())
163                                .await
164                                .map_err(|e| DataFusionError::External(e.into()))
165                        })
166                })
167                .try_buffered(2);
168
169            while let Some(stream_res) = streams_stream.next().await {
170                let mut merge_stream = stream_res??;
171                let sender = sender.clone();
172                while let Some(rb_res) = merge_stream.next().await {
173                    if let Err(e) = sender.send(rb_res).await {
174                        error!("sending record batch: {e:?}");
175                    }
176                }
177            }
178            Ok(())
179        });
180        debug!("building merge stream");
181        Ok(builder.build())
182    }
183}