micromegas_analytics/lakehouse/
merge.rs1use super::{
2 lakehouse_context::LakehouseContext,
3 partition::Partition,
4 partition_cache::PartitionCache,
5 partition_source_data::hash_to_object_count,
6 partitioned_table_provider::PartitionedTableProvider,
7 query::make_session_context,
8 session_configurator::SessionConfigurator,
9 view::View,
10 view_factory::ViewFactory,
11 write_partition::{PartitionRowSet, write_partition_from_rows},
12};
13use crate::{response_writer::Logger, time::TimeRange};
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use datafusion::{
17 arrow::datatypes::Schema, execution::SendableRecordBatchStream, prelude::*, sql::TableReference,
18};
19use futures::stream::StreamExt;
20use micromegas_tracing::prelude::*;
21use std::fmt::Debug;
22use std::sync::Arc;
23use xxhash_rust::xxh32::xxh32;
24
25#[async_trait]
27pub trait PartitionMerger: Send + Sync + Debug {
28 async fn execute_merge_query(
30 &self,
31 lakehouse: Arc<LakehouseContext>,
32 partitions_to_merge: Arc<Vec<Partition>>,
33 partitions_all_views: Arc<PartitionCache>,
34 insert_range: TimeRange,
35 ) -> Result<SendableRecordBatchStream>;
36}
37
38#[derive(Debug)]
40pub struct QueryMerger {
41 view_factory: Arc<ViewFactory>,
42 session_configurator: Arc<dyn SessionConfigurator>,
43 file_schema: Arc<Schema>,
44 query: Arc<String>,
45}
46
47impl QueryMerger {
48 pub fn new(
49 view_factory: Arc<ViewFactory>,
50 session_configurator: Arc<dyn SessionConfigurator>,
51 file_schema: Arc<Schema>,
52 query: Arc<String>,
53 ) -> Self {
54 Self {
55 view_factory,
56 session_configurator,
57 file_schema,
58 query,
59 }
60 }
61}
62
63#[async_trait]
64impl PartitionMerger for QueryMerger {
65 async fn execute_merge_query(
66 &self,
67 lakehouse: Arc<LakehouseContext>,
68 partitions_to_merge: Arc<Vec<Partition>>,
69 partitions_all_views: Arc<PartitionCache>,
70 insert_range: TimeRange,
71 ) -> Result<SendableRecordBatchStream> {
72 let reader_factory = lakehouse.reader_factory().clone();
73 let ctx = make_session_context(
74 lakehouse.clone(),
75 partitions_all_views,
76 Some(insert_range),
77 self.view_factory.clone(),
78 self.session_configurator.clone(),
79 )
80 .await?;
81 let src_table = PartitionedTableProvider::new(
82 self.file_schema.clone(),
83 reader_factory,
84 partitions_to_merge,
85 );
86 ctx.register_table(
87 TableReference::Bare {
88 table: "source".into(),
89 },
90 Arc::new(src_table),
91 )?;
92
93 ctx.sql(&self.query)
94 .await?
95 .execute_stream()
96 .await
97 .with_context(|| "merged_df.execute_stream")
98 }
99}
100
101fn partition_set_stats(
102 view: Arc<dyn View>,
103 filtered_partitions: &[Partition],
104) -> Result<(i64, i64)> {
105 let mut sum_size: i64 = 0;
106 let mut source_hash: i64 = 0;
107 let latest_file_schema_hash = view.get_file_schema_hash();
108 for p in filtered_partitions {
109 source_hash = if p.source_data_hash.len() == std::mem::size_of::<i64>() {
112 source_hash + hash_to_object_count(&p.source_data_hash)?
113 } else {
114 xxh32(&p.source_data_hash, source_hash as u32).into()
116 };
117
118 sum_size += p.file_size;
119
120 if p.view_metadata.file_schema_hash != latest_file_schema_hash {
121 anyhow::bail!(
122 "incompatible file schema with [{},{}]",
123 p.begin_insert_time().to_rfc3339(),
124 p.end_insert_time().to_rfc3339()
125 );
126 }
127 }
128 Ok((sum_size, source_hash))
129}
130
131pub async fn create_merged_partition(
133 partitions_to_merge: Arc<PartitionCache>,
134 partitions_all_views: Arc<PartitionCache>,
135 lakehouse: Arc<LakehouseContext>,
136 view: Arc<dyn View>,
137 insert_range: TimeRange,
138 logger: Arc<dyn Logger>,
139) -> Result<()> {
140 let view_set_name = &view.get_view_set_name();
141 let view_instance_id = &view.get_view_instance_id();
142 let desc = format!(
143 "[{}, {}] {view_set_name} {view_instance_id}",
144 insert_range.begin.to_rfc3339(),
145 insert_range.end.to_rfc3339()
146 );
147 let mut filtered_partitions = partitions_to_merge
150 .filter_inside_range(view_set_name, view_instance_id, insert_range)
151 .partitions;
152 if filtered_partitions.len() != partitions_to_merge.len() {
153 warn!("partitions_to_merge was not filtered properly");
154 }
155 if filtered_partitions.len() < 2 {
156 logger
157 .write_log_entry(format!("{desc}: not enough partitions to merge"))
158 .await
159 .with_context(|| "writing log")?;
160 return Ok(());
161 }
162 let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)
163 .with_context(|| "partition_set_stats")?;
164 logger
165 .write_log_entry(format!(
166 "{desc}: merging {} partitions sum_size={sum_size}",
167 filtered_partitions.len()
168 ))
169 .await
170 .with_context(|| "write_log_entry")?;
171 filtered_partitions.sort_by_key(|p| p.begin_insert_time());
172 let mut merged_stream = view
173 .merge_partitions(
174 lakehouse.clone(),
175 Arc::new(filtered_partitions),
176 partitions_all_views,
177 insert_range,
178 )
179 .await
180 .with_context(|| "view.merge_partitions")?;
181 let (tx, rx) = tokio::sync::mpsc::channel(1);
182 let view_copy = view.clone();
183 let lake = lakehouse.lake().clone();
184 let join_handle = spawn_with_context(async move {
185 let res = write_partition_from_rows(
186 lake,
187 view_copy.get_meta(),
188 view_copy.get_file_schema(),
189 insert_range,
190 source_hash.to_le_bytes().to_vec(),
191 rx,
192 logger.clone(),
193 )
194 .await;
195 if let Err(e) = &res {
196 error!("{e:?}");
197 }
198 res
199 });
200 let compute_time_bounds = view.get_time_bounds();
201 let ctx =
202 SessionContext::new_with_config_rt(SessionConfig::default(), lakehouse.runtime().clone());
203 while let Some(rb_res) = merged_stream.next().await {
204 let rb = rb_res.with_context(|| "receiving record_batch from stream")?;
205 let event_time_range = compute_time_bounds
206 .get_time_bounds(ctx.read_batch(rb.clone()).with_context(|| "read_batch")?)
207 .await?;
208 tx.send(PartitionRowSet::new(event_time_range, rb))
209 .await
210 .with_context(|| "sending partition row set")?;
211 }
212 drop(tx);
213 join_handle.await??;
214 Ok(())
215}