micromegas_analytics/lakehouse/
batch_update.rs

1use super::{
2    lakehouse_context::LakehouseContext, merge::create_merged_partition,
3    partition_cache::PartitionCache, partition_source_data::hash_to_object_count, view::View,
4};
5use crate::{response_writer::Logger, time::TimeRange};
6use anyhow::{Context, Result};
7use chrono::TimeDelta;
8use micromegas_tracing::prelude::*;
9use std::sync::Arc;
10
11/// Defines the strategy for creating a new partition.
12pub enum PartitionCreationStrategy {
13    /// Create the partition from the source data.
14    CreateFromSource,
15    /// Merge existing partitions.
16    MergeExisting(Arc<PartitionCache>),
17    /// Abort the partition creation.
18    Abort,
19}
20
21// verify_overlapping_partitions returns true to continue and make a new partition,
22// returns false to abort (existing partition is up to date or there is a problem)
23async fn verify_overlapping_partitions(
24    existing_partitions_all_views: &PartitionCache,
25    insert_range: TimeRange,
26    view_set_name: &str,
27    view_instance_id: &str,
28    file_schema_hash: &[u8],
29    source_data_hash: &[u8],
30    logger: Arc<dyn Logger>,
31) -> Result<PartitionCreationStrategy> {
32    let desc = format!(
33        "[{}, {}] {view_set_name} {view_instance_id}",
34        insert_range.begin.to_rfc3339(),
35        insert_range.end.to_rfc3339()
36    );
37    if source_data_hash.len() != std::mem::size_of::<i64>() {
38        anyhow::bail!("Source data hash should be a i64");
39    }
40    let nb_source_events = hash_to_object_count(source_data_hash)?;
41    let filtered = existing_partitions_all_views.filter(
42        view_set_name,
43        view_instance_id,
44        file_schema_hash,
45        insert_range,
46    );
47    if filtered.partitions.is_empty() {
48        logger
49            .write_log_entry(format!("{desc}: matching partitions not found"))
50            .await?;
51        return Ok(PartitionCreationStrategy::CreateFromSource);
52    }
53    let mut existing_source_hash: i64 = 0;
54    let nb_existing_partitions = filtered.partitions.len();
55    for part in &filtered.partitions {
56        let begin = part.begin_insert_time();
57        let end = part.end_insert_time();
58        if begin < insert_range.begin || end > insert_range.end {
59            logger
60                .write_log_entry(format!(
61                    "{desc}: found overlapping partition [{}, {}], aborting the update",
62                    begin.to_rfc3339(),
63                    end.to_rfc3339()
64                ))
65                .await?;
66            return Ok(PartitionCreationStrategy::Abort);
67        }
68        if part.source_data_hash.len() == std::mem::size_of::<i64>() {
69            existing_source_hash += hash_to_object_count(&part.source_data_hash)?
70        } else {
71            // old hash that does not represent the number of events
72            logger
73                .write_log_entry(format!(
74                    "{desc}: found partition with incompatible source hash: recreate"
75                ))
76                .await?;
77            return Ok(PartitionCreationStrategy::CreateFromSource);
78        }
79    }
80
81    if nb_source_events != existing_source_hash {
82        logger
83            .write_log_entry(format!(
84                "{desc}: existing partitions do not match source data ({nb_source_events} vs {existing_source_hash}) : creating a new partition"
85            ))
86            .await?;
87        return Ok(PartitionCreationStrategy::CreateFromSource);
88    }
89
90    if nb_existing_partitions > 1 {
91        return Ok(PartitionCreationStrategy::MergeExisting(Arc::new(filtered)));
92    }
93
94    logger
95        .write_log_entry(format!(
96            "{desc}: already up to date, nb_source_events={nb_source_events}"
97        ))
98        .await?;
99    Ok(PartitionCreationStrategy::Abort)
100}
101
102#[span_fn]
103async fn materialize_partition(
104    existing_partitions_all_views: Arc<PartitionCache>,
105    lakehouse: Arc<LakehouseContext>,
106    insert_range: TimeRange,
107    view: Arc<dyn View>,
108    logger: Arc<dyn Logger>,
109) -> Result<()> {
110    let view_set_name = view.get_view_set_name();
111    let partition_spec = view
112        .make_batch_partition_spec(
113            lakehouse.clone(),
114            existing_partitions_all_views.clone(),
115            insert_range,
116        )
117        .await
118        .with_context(|| "make_batch_partition_spec")?;
119    // Allow empty partition specs to be written - write_partition_from_rows
120    // will create an empty partition record
121    let view_instance_id = view.get_view_instance_id();
122    let strategy = verify_overlapping_partitions(
123        &existing_partitions_all_views,
124        insert_range,
125        &view_set_name,
126        &view_instance_id,
127        &view.get_file_schema_hash(),
128        &partition_spec.get_source_data_hash(),
129        logger.clone(),
130    )
131    .await
132    .with_context(|| "verify_overlapping_partitions")?;
133    if let PartitionCreationStrategy::Abort = &strategy {
134        return Ok(());
135    }
136
137    let new_delta = view.get_max_partition_time_delta(&strategy);
138    if new_delta < (insert_range.end - insert_range.begin) {
139        if let PartitionCreationStrategy::MergeExisting(partition_cache) = &strategy
140            && partition_cache
141                .partitions
142                .iter()
143                .all(|p| (p.end_insert_time() - p.begin_insert_time()) == new_delta)
144        {
145            let desc = format!(
146                "[{}, {}] {view_set_name} {view_instance_id}",
147                insert_range.begin.to_rfc3339(),
148                insert_range.end.to_rfc3339()
149            );
150            logger
151                .write_log_entry(format!("{desc}: subpartitions already present",))
152                .await?;
153            return Ok(());
154        }
155
156        return Box::pin(materialize_partition_range(
157            existing_partitions_all_views,
158            lakehouse.clone(),
159            view,
160            insert_range,
161            new_delta,
162            logger,
163        ))
164        .await
165        .with_context(|| "materialize_partition_range");
166    }
167
168    match strategy {
169        PartitionCreationStrategy::CreateFromSource => {
170            partition_spec
171                .write(lakehouse.lake().clone(), logger)
172                .await
173                .with_context(|| "writing partition")?;
174        }
175        PartitionCreationStrategy::MergeExisting(partitions_to_merge) => {
176            create_merged_partition(
177                partitions_to_merge,
178                existing_partitions_all_views,
179                lakehouse,
180                view,
181                insert_range,
182                logger,
183            )
184            .await
185            .with_context(|| "create_merged_partition")?;
186        }
187        PartitionCreationStrategy::Abort => {}
188    }
189
190    Ok(())
191}
192
193/// Materializes partitions within a given time range.
194#[span_fn]
195pub async fn materialize_partition_range(
196    existing_partitions_all_views: Arc<PartitionCache>,
197    lakehouse: Arc<LakehouseContext>,
198    view: Arc<dyn View>,
199    insert_range: TimeRange,
200    partition_time_delta: TimeDelta,
201    logger: Arc<dyn Logger>,
202) -> Result<()> {
203    let mut begin_part = insert_range.begin;
204    let mut end_part = begin_part + partition_time_delta;
205    while end_part <= insert_range.end {
206        let partition_insert_range = TimeRange::new(begin_part, end_part);
207        let insert_time_filtered =
208            Arc::new(existing_partitions_all_views.filter_insert_range(partition_insert_range));
209        materialize_partition(
210            insert_time_filtered,
211            lakehouse.clone(),
212            partition_insert_range,
213            view.clone(),
214            logger.clone(),
215        )
216        .await
217        .with_context(|| "materialize_partition")?;
218        begin_part = end_part;
219        end_part = begin_part + partition_time_delta;
220    }
221    Ok(())
222}