micromegas_analytics/lakehouse/
batch_update.rs1use super::{
2 lakehouse_context::LakehouseContext, merge::create_merged_partition,
3 partition_cache::PartitionCache, partition_source_data::hash_to_object_count, view::View,
4};
5use crate::{response_writer::Logger, time::TimeRange};
6use anyhow::{Context, Result};
7use chrono::TimeDelta;
8use micromegas_tracing::prelude::*;
9use std::sync::Arc;
10
11pub enum PartitionCreationStrategy {
13 CreateFromSource,
15 MergeExisting(Arc<PartitionCache>),
17 Abort,
19}
20
21async fn verify_overlapping_partitions(
24 existing_partitions_all_views: &PartitionCache,
25 insert_range: TimeRange,
26 view_set_name: &str,
27 view_instance_id: &str,
28 file_schema_hash: &[u8],
29 source_data_hash: &[u8],
30 logger: Arc<dyn Logger>,
31) -> Result<PartitionCreationStrategy> {
32 let desc = format!(
33 "[{}, {}] {view_set_name} {view_instance_id}",
34 insert_range.begin.to_rfc3339(),
35 insert_range.end.to_rfc3339()
36 );
37 if source_data_hash.len() != std::mem::size_of::<i64>() {
38 anyhow::bail!("Source data hash should be a i64");
39 }
40 let nb_source_events = hash_to_object_count(source_data_hash)?;
41 let filtered = existing_partitions_all_views.filter(
42 view_set_name,
43 view_instance_id,
44 file_schema_hash,
45 insert_range,
46 );
47 if filtered.partitions.is_empty() {
48 logger
49 .write_log_entry(format!("{desc}: matching partitions not found"))
50 .await?;
51 return Ok(PartitionCreationStrategy::CreateFromSource);
52 }
53 let mut existing_source_hash: i64 = 0;
54 let nb_existing_partitions = filtered.partitions.len();
55 for part in &filtered.partitions {
56 let begin = part.begin_insert_time();
57 let end = part.end_insert_time();
58 if begin < insert_range.begin || end > insert_range.end {
59 logger
60 .write_log_entry(format!(
61 "{desc}: found overlapping partition [{}, {}], aborting the update",
62 begin.to_rfc3339(),
63 end.to_rfc3339()
64 ))
65 .await?;
66 return Ok(PartitionCreationStrategy::Abort);
67 }
68 if part.source_data_hash.len() == std::mem::size_of::<i64>() {
69 existing_source_hash += hash_to_object_count(&part.source_data_hash)?
70 } else {
71 logger
73 .write_log_entry(format!(
74 "{desc}: found partition with incompatible source hash: recreate"
75 ))
76 .await?;
77 return Ok(PartitionCreationStrategy::CreateFromSource);
78 }
79 }
80
81 if nb_source_events != existing_source_hash {
82 logger
83 .write_log_entry(format!(
84 "{desc}: existing partitions do not match source data ({nb_source_events} vs {existing_source_hash}) : creating a new partition"
85 ))
86 .await?;
87 return Ok(PartitionCreationStrategy::CreateFromSource);
88 }
89
90 if nb_existing_partitions > 1 {
91 return Ok(PartitionCreationStrategy::MergeExisting(Arc::new(filtered)));
92 }
93
94 logger
95 .write_log_entry(format!(
96 "{desc}: already up to date, nb_source_events={nb_source_events}"
97 ))
98 .await?;
99 Ok(PartitionCreationStrategy::Abort)
100}
101
102#[span_fn]
103async fn materialize_partition(
104 existing_partitions_all_views: Arc<PartitionCache>,
105 lakehouse: Arc<LakehouseContext>,
106 insert_range: TimeRange,
107 view: Arc<dyn View>,
108 logger: Arc<dyn Logger>,
109) -> Result<()> {
110 let view_set_name = view.get_view_set_name();
111 let partition_spec = view
112 .make_batch_partition_spec(
113 lakehouse.clone(),
114 existing_partitions_all_views.clone(),
115 insert_range,
116 )
117 .await
118 .with_context(|| "make_batch_partition_spec")?;
119 let view_instance_id = view.get_view_instance_id();
122 let strategy = verify_overlapping_partitions(
123 &existing_partitions_all_views,
124 insert_range,
125 &view_set_name,
126 &view_instance_id,
127 &view.get_file_schema_hash(),
128 &partition_spec.get_source_data_hash(),
129 logger.clone(),
130 )
131 .await
132 .with_context(|| "verify_overlapping_partitions")?;
133 if let PartitionCreationStrategy::Abort = &strategy {
134 return Ok(());
135 }
136
137 let new_delta = view.get_max_partition_time_delta(&strategy);
138 if new_delta < (insert_range.end - insert_range.begin) {
139 if let PartitionCreationStrategy::MergeExisting(partition_cache) = &strategy
140 && partition_cache
141 .partitions
142 .iter()
143 .all(|p| (p.end_insert_time() - p.begin_insert_time()) == new_delta)
144 {
145 let desc = format!(
146 "[{}, {}] {view_set_name} {view_instance_id}",
147 insert_range.begin.to_rfc3339(),
148 insert_range.end.to_rfc3339()
149 );
150 logger
151 .write_log_entry(format!("{desc}: subpartitions already present",))
152 .await?;
153 return Ok(());
154 }
155
156 return Box::pin(materialize_partition_range(
157 existing_partitions_all_views,
158 lakehouse.clone(),
159 view,
160 insert_range,
161 new_delta,
162 logger,
163 ))
164 .await
165 .with_context(|| "materialize_partition_range");
166 }
167
168 match strategy {
169 PartitionCreationStrategy::CreateFromSource => {
170 partition_spec
171 .write(lakehouse.lake().clone(), logger)
172 .await
173 .with_context(|| "writing partition")?;
174 }
175 PartitionCreationStrategy::MergeExisting(partitions_to_merge) => {
176 create_merged_partition(
177 partitions_to_merge,
178 existing_partitions_all_views,
179 lakehouse,
180 view,
181 insert_range,
182 logger,
183 )
184 .await
185 .with_context(|| "create_merged_partition")?;
186 }
187 PartitionCreationStrategy::Abort => {}
188 }
189
190 Ok(())
191}
192
193#[span_fn]
195pub async fn materialize_partition_range(
196 existing_partitions_all_views: Arc<PartitionCache>,
197 lakehouse: Arc<LakehouseContext>,
198 view: Arc<dyn View>,
199 insert_range: TimeRange,
200 partition_time_delta: TimeDelta,
201 logger: Arc<dyn Logger>,
202) -> Result<()> {
203 let mut begin_part = insert_range.begin;
204 let mut end_part = begin_part + partition_time_delta;
205 while end_part <= insert_range.end {
206 let partition_insert_range = TimeRange::new(begin_part, end_part);
207 let insert_time_filtered =
208 Arc::new(existing_partitions_all_views.filter_insert_range(partition_insert_range));
209 materialize_partition(
210 insert_time_filtered,
211 lakehouse.clone(),
212 partition_insert_range,
213 view.clone(),
214 logger.clone(),
215 )
216 .await
217 .with_context(|| "materialize_partition")?;
218 begin_part = end_part;
219 end_part = begin_part + partition_time_delta;
220 }
221 Ok(())
222}