micromegas_analytics/lakehouse/
merge.rs

1use super::{
2    lakehouse_context::LakehouseContext,
3    partition::Partition,
4    partition_cache::PartitionCache,
5    partition_source_data::hash_to_object_count,
6    partitioned_table_provider::PartitionedTableProvider,
7    query::make_session_context,
8    session_configurator::SessionConfigurator,
9    view::View,
10    view_factory::ViewFactory,
11    write_partition::{PartitionRowSet, write_partition_from_rows},
12};
13use crate::{response_writer::Logger, time::TimeRange};
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use datafusion::{
17    arrow::datatypes::Schema, execution::SendableRecordBatchStream, prelude::*, sql::TableReference,
18};
19use futures::stream::StreamExt;
20use micromegas_tracing::prelude::*;
21use std::fmt::Debug;
22use std::sync::Arc;
23use xxhash_rust::xxh32::xxh32;
24
25/// A trait for merging partitions.
26#[async_trait]
27pub trait PartitionMerger: Send + Sync + Debug {
28    /// Executes the merge query.
29    async fn execute_merge_query(
30        &self,
31        lakehouse: Arc<LakehouseContext>,
32        partitions_to_merge: Arc<Vec<Partition>>,
33        partitions_all_views: Arc<PartitionCache>,
34        insert_range: TimeRange,
35    ) -> Result<SendableRecordBatchStream>;
36}
37
38/// A `PartitionMerger` that executes a SQL query to merge partitions.
39#[derive(Debug)]
40pub struct QueryMerger {
41    view_factory: Arc<ViewFactory>,
42    session_configurator: Arc<dyn SessionConfigurator>,
43    file_schema: Arc<Schema>,
44    query: Arc<String>,
45}
46
47impl QueryMerger {
48    pub fn new(
49        view_factory: Arc<ViewFactory>,
50        session_configurator: Arc<dyn SessionConfigurator>,
51        file_schema: Arc<Schema>,
52        query: Arc<String>,
53    ) -> Self {
54        Self {
55            view_factory,
56            session_configurator,
57            file_schema,
58            query,
59        }
60    }
61}
62
63#[async_trait]
64impl PartitionMerger for QueryMerger {
65    async fn execute_merge_query(
66        &self,
67        lakehouse: Arc<LakehouseContext>,
68        partitions_to_merge: Arc<Vec<Partition>>,
69        partitions_all_views: Arc<PartitionCache>,
70        insert_range: TimeRange,
71    ) -> Result<SendableRecordBatchStream> {
72        let reader_factory = lakehouse.reader_factory().clone();
73        let ctx = make_session_context(
74            lakehouse.clone(),
75            partitions_all_views,
76            Some(insert_range),
77            self.view_factory.clone(),
78            self.session_configurator.clone(),
79        )
80        .await?;
81        let src_table = PartitionedTableProvider::new(
82            self.file_schema.clone(),
83            reader_factory,
84            partitions_to_merge,
85        );
86        ctx.register_table(
87            TableReference::Bare {
88                table: "source".into(),
89            },
90            Arc::new(src_table),
91        )?;
92
93        ctx.sql(&self.query)
94            .await?
95            .execute_stream()
96            .await
97            .with_context(|| "merged_df.execute_stream")
98    }
99}
100
101fn partition_set_stats(
102    view: Arc<dyn View>,
103    filtered_partitions: &[Partition],
104) -> Result<(i64, i64)> {
105    let mut sum_size: i64 = 0;
106    let mut source_hash: i64 = 0;
107    let latest_file_schema_hash = view.get_file_schema_hash();
108    for p in filtered_partitions {
109        // for some time all the hashes will actually be the number of events in the source data
110        // when views have different hash algos, we should delegate to the view the creation of the merged hash
111        source_hash = if p.source_data_hash.len() == std::mem::size_of::<i64>() {
112            source_hash + hash_to_object_count(&p.source_data_hash)?
113        } else {
114            //previous hash algo
115            xxh32(&p.source_data_hash, source_hash as u32).into()
116        };
117
118        sum_size += p.file_size;
119
120        if p.view_metadata.file_schema_hash != latest_file_schema_hash {
121            anyhow::bail!(
122                "incompatible file schema with [{},{}]",
123                p.begin_insert_time().to_rfc3339(),
124                p.end_insert_time().to_rfc3339()
125            );
126        }
127    }
128    Ok((sum_size, source_hash))
129}
130
131/// Creates a merged partition from a set of existing partitions.
132pub async fn create_merged_partition(
133    partitions_to_merge: Arc<PartitionCache>,
134    partitions_all_views: Arc<PartitionCache>,
135    lakehouse: Arc<LakehouseContext>,
136    view: Arc<dyn View>,
137    insert_range: TimeRange,
138    logger: Arc<dyn Logger>,
139) -> Result<()> {
140    let view_set_name = &view.get_view_set_name();
141    let view_instance_id = &view.get_view_instance_id();
142    let desc = format!(
143        "[{}, {}] {view_set_name} {view_instance_id}",
144        insert_range.begin.to_rfc3339(),
145        insert_range.end.to_rfc3339()
146    );
147    // we are not looking for intersecting partitions, but only those that fit completely in the range
148    // otherwise we'd get duplicated records
149    let mut filtered_partitions = partitions_to_merge
150        .filter_inside_range(view_set_name, view_instance_id, insert_range)
151        .partitions;
152    if filtered_partitions.len() != partitions_to_merge.len() {
153        warn!("partitions_to_merge was not filtered properly");
154    }
155    if filtered_partitions.len() < 2 {
156        logger
157            .write_log_entry(format!("{desc}: not enough partitions to merge"))
158            .await
159            .with_context(|| "writing log")?;
160        return Ok(());
161    }
162    let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)
163        .with_context(|| "partition_set_stats")?;
164    logger
165        .write_log_entry(format!(
166            "{desc}: merging {} partitions sum_size={sum_size}",
167            filtered_partitions.len()
168        ))
169        .await
170        .with_context(|| "write_log_entry")?;
171    filtered_partitions.sort_by_key(|p| p.begin_insert_time());
172    let mut merged_stream = view
173        .merge_partitions(
174            lakehouse.clone(),
175            Arc::new(filtered_partitions),
176            partitions_all_views,
177            insert_range,
178        )
179        .await
180        .with_context(|| "view.merge_partitions")?;
181    let (tx, rx) = tokio::sync::mpsc::channel(1);
182    let view_copy = view.clone();
183    let lake = lakehouse.lake().clone();
184    let join_handle = spawn_with_context(write_partition_from_rows(
185        lake,
186        view_copy.get_meta(),
187        view_copy.get_file_schema(),
188        insert_range,
189        source_hash.to_le_bytes().to_vec(),
190        rx,
191        logger.clone(),
192    ));
193    let compute_time_bounds = view.get_time_bounds();
194    let ctx =
195        SessionContext::new_with_config_rt(SessionConfig::default(), lakehouse.runtime().clone());
196    let stream_result: Result<()> = async {
197        while let Some(rb_res) = merged_stream.next().await {
198            let rb = rb_res.with_context(|| "receiving record_batch from stream")?;
199            let event_time_range = compute_time_bounds
200                .get_time_bounds(ctx.read_batch(rb.clone()).with_context(|| "read_batch")?)
201                .await?;
202            tx.send(Ok(PartitionRowSet::new(event_time_range, rb)))
203                .await
204                .with_context(|| "sending partition row set")?;
205        }
206        Ok(())
207    }
208    .await;
209
210    match stream_result {
211        Ok(()) => {
212            drop(tx);
213            join_handle.await??;
214            Ok(())
215        }
216        Err(e) => {
217            warn!("aborting merge partition write for {desc}: {e:?}");
218            let _ = tx.send(Err(anyhow::anyhow!("merge stream aborted"))).await;
219            drop(tx);
220            match join_handle.await {
221                Ok(Ok(())) => {}
222                Ok(Err(writer_err)) => {
223                    debug!("merge writer task error during abort: {writer_err:?}");
224                }
225                Err(join_err) => {
226                    warn!("merge writer task panicked during abort: {join_err:?}");
227                }
228            }
229            Err(e)
230        }
231    }
232}