micromegas_analytics/lakehouse/
jit_partitions.rs

1use super::{
2    block_partition_spec::{BlockPartitionSpec, BlockProcessor},
3    blocks_view::BlocksView,
4    lakehouse_context::LakehouseContext,
5    partition_cache::{LivePartitionProvider, QueryPartitionProvider},
6    partition_source_data::{PartitionSourceBlock, SourceDataBlocksInMemory},
7    view::{View, ViewMetadata},
8};
9use crate::{
10    dfext::typed_column::get_single_row_primitive_value,
11    lakehouse::{partition_cache::PartitionCache, view::PartitionSpec},
12    metadata::{ProcessMetadata, StreamMetadata, block_from_batch_row},
13    time::TimeRange,
14};
15use crate::{
16    lakehouse::{partition_source_data::hash_to_object_count, query::query_partitions},
17    response_writer::ResponseWriter,
18};
19use anyhow::{Context, Result};
20use chrono::DurationRound;
21use chrono::{DateTime, TimeDelta, Utc};
22use datafusion::arrow::datatypes::{Schema, TimestampNanosecondType};
23use micromegas_ingestion::data_lake_connection::DataLakeConnection;
24use micromegas_tracing::prelude::*;
25use sqlx::Row;
26use std::sync::Arc;
27
28/// Configuration for Just-In-Time (JIT) partition generation.
29pub struct JitPartitionConfig {
30    pub max_nb_objects: i64,
31    pub max_insert_time_slice: TimeDelta,
32}
33
34impl Default for JitPartitionConfig {
35    fn default() -> Self {
36        JitPartitionConfig {
37            max_nb_objects: 20 * 1024 * 1024,
38            max_insert_time_slice: TimeDelta::hours(1),
39        }
40    }
41}
42
43async fn get_insert_time_range(
44    lakehouse: Arc<LakehouseContext>,
45    blocks_view: &BlocksView,
46    query_time_range: &TimeRange,
47    stream: Arc<StreamMetadata>,
48) -> Result<Option<TimeRange>> {
49    // we would need a PartitionCache built from event time range and then filtered for insert time range
50    let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
51    let partitions = part_provider
52        .fetch(
53            &blocks_view.get_view_set_name(),
54            &blocks_view.get_view_instance_id(),
55            Some(*query_time_range),
56            blocks_view.get_file_schema_hash(),
57        )
58        .await?;
59    let stream_id = &stream.stream_id;
60    let begin_range_iso = query_time_range.begin.to_rfc3339();
61    let end_range_iso = query_time_range.end.to_rfc3339();
62    let sql = format!(
63        r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
64        FROM source
65        WHERE stream_id = '{stream_id}'
66        AND begin_time <= '{end_range_iso}'
67        AND end_time >= '{begin_range_iso}';"#
68    );
69    let reader_factory = lakehouse.reader_factory().clone();
70    let rbs = query_partitions(
71        lakehouse.runtime().clone(),
72        reader_factory,
73        lakehouse.lake().blob_storage.inner(),
74        blocks_view.get_file_schema(),
75        Arc::new(partitions),
76        &sql,
77    )
78    .await?
79    .collect()
80    .await?;
81    if rbs.is_empty() {
82        return Ok(None);
83    }
84    if rbs[0].num_rows() == 0 {
85        return Ok(None);
86    }
87    let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
88    let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
89    Ok(Some(TimeRange::new(
90        DateTime::from_timestamp_nanos(min_insert_time),
91        DateTime::from_timestamp_nanos(max_insert_time),
92    )))
93}
94
95/// Generates a segment of JIT partitions.
96pub async fn generate_stream_jit_partitions_segment(
97    config: &JitPartitionConfig,
98    lakehouse: Arc<LakehouseContext>,
99    blocks_view: &BlocksView,
100    insert_time_range: &TimeRange,
101    stream: Arc<StreamMetadata>,
102    process: Arc<ProcessMetadata>,
103) -> Result<Vec<SourceDataBlocksInMemory>> {
104    let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
105        &lakehouse.lake().db_pool,
106        blocks_view.get_view_set_name(),
107        blocks_view.get_view_instance_id(),
108        *insert_time_range,
109    )
110    .await?;
111    let partitions = cache.partitions;
112
113    let stream_id = &stream.stream_id;
114    let begin_range_iso = insert_time_range.begin.to_rfc3339();
115    let end_range_iso = insert_time_range.end.to_rfc3339();
116    let sql = format!(
117        r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time
118             FROM source
119             WHERE stream_id = '{stream_id}'
120             AND insert_time >= '{begin_range_iso}'
121             AND insert_time < '{end_range_iso}'
122             ORDER BY insert_time, block_id;"#
123    );
124
125    let reader_factory = lakehouse.reader_factory().clone();
126    let rbs = query_partitions(
127        lakehouse.runtime().clone(),
128        reader_factory,
129        lakehouse.lake().blob_storage.inner(),
130        blocks_view.get_file_schema(),
131        Arc::new(partitions),
132        &sql,
133    )
134    .await?
135    .collect()
136    .await?;
137
138    let mut partitions = vec![];
139    let mut partition_blocks = vec![];
140    let mut partition_nb_objects: i64 = 0;
141    for rb in rbs {
142        for ir in 0..rb.num_rows() {
143            let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
144            let block_nb_objects = block.nb_objects as i64;
145
146            // Check if adding this block would exceed the limit
147            if partition_nb_objects + block_nb_objects > config.max_nb_objects
148                && !partition_blocks.is_empty()
149            {
150                // Push current partition without this block
151                partitions.push(SourceDataBlocksInMemory {
152                    blocks: partition_blocks,
153                    block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
154                });
155                // Start new partition with this block
156                partition_blocks = vec![Arc::new(PartitionSourceBlock {
157                    block,
158                    stream: stream.clone(),
159                    process: process.clone(),
160                })];
161                partition_nb_objects = block_nb_objects;
162            } else {
163                // Add block to current partition
164                partition_nb_objects += block_nb_objects;
165                partition_blocks.push(Arc::new(PartitionSourceBlock {
166                    block,
167                    stream: stream.clone(),
168                    process: process.clone(),
169                }));
170            }
171        }
172    }
173    if partition_nb_objects != 0 {
174        partitions.push(SourceDataBlocksInMemory {
175            blocks: partition_blocks,
176            block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
177        });
178    }
179
180    Ok(partitions)
181}
182
183/// generate_stream_jit_partitions lists the partitiions that are needed to cover a time span
184/// these partitions may not exist or they could be out of date
185/// Generates JIT partitions for a given time range.
186pub async fn generate_stream_jit_partitions(
187    config: &JitPartitionConfig,
188    lakehouse: Arc<LakehouseContext>,
189    blocks_view: &BlocksView,
190    query_time_range: &TimeRange,
191    stream: Arc<StreamMetadata>,
192    process: Arc<ProcessMetadata>,
193) -> Result<Vec<SourceDataBlocksInMemory>> {
194    let insert_time_range = get_insert_time_range(
195        lakehouse.clone(),
196        blocks_view,
197        query_time_range,
198        stream.clone(),
199    )
200    .await?;
201    if insert_time_range.is_none() {
202        return Ok(vec![]);
203    }
204    let insert_time_range = insert_time_range.with_context(|| "missing insert_time_range")?;
205    let insert_time_range = TimeRange::new(
206        insert_time_range
207            .begin
208            .duration_trunc(config.max_insert_time_slice)?,
209        insert_time_range
210            .end
211            .duration_trunc(config.max_insert_time_slice)?
212            + config.max_insert_time_slice,
213    );
214    let mut begin_segment = insert_time_range.begin;
215    let mut end_segment = begin_segment + config.max_insert_time_slice;
216    let mut partitions = vec![];
217    while end_segment <= insert_time_range.end {
218        let insert_time_range = TimeRange::new(begin_segment, end_segment);
219        let mut segment_partitions = generate_stream_jit_partitions_segment(
220            config,
221            lakehouse.clone(),
222            blocks_view,
223            &insert_time_range,
224            stream.clone(),
225            process.clone(),
226        )
227        .await?;
228        partitions.append(&mut segment_partitions);
229        begin_segment = end_segment;
230        end_segment = begin_segment + config.max_insert_time_slice;
231    }
232    Ok(partitions)
233}
234
235/// Generates a segment of JIT partitions filtered by process.
236pub async fn generate_process_jit_partitions_segment(
237    config: &JitPartitionConfig,
238    lakehouse: Arc<LakehouseContext>,
239    blocks_view: &BlocksView,
240    insert_time_range: &TimeRange,
241    process: Arc<ProcessMetadata>,
242    stream_tag: &str,
243) -> Result<Vec<SourceDataBlocksInMemory>> {
244    let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
245        &lakehouse.lake().db_pool,
246        blocks_view.get_view_set_name(),
247        blocks_view.get_view_instance_id(),
248        *insert_time_range,
249    )
250    .await?;
251    let partitions = cache.partitions;
252
253    let process_id = &process.process_id;
254    let begin_range_iso = insert_time_range.begin.to_rfc3339();
255    let end_range_iso = insert_time_range.end.to_rfc3339();
256    let sql = format!(
257        r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time,
258             "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties"
259             FROM source
260             WHERE process_id = '{process_id}'
261             AND array_has( "streams.tags", '{stream_tag}' )
262             AND insert_time >= '{begin_range_iso}'
263             AND insert_time < '{end_range_iso}'
264             ORDER BY insert_time, block_id;"#
265    );
266
267    let reader_factory = lakehouse.reader_factory().clone();
268    let rbs = query_partitions(
269        lakehouse.runtime().clone(),
270        reader_factory,
271        lakehouse.lake().blob_storage.inner(),
272        blocks_view.get_file_schema(),
273        Arc::new(partitions),
274        &sql,
275    )
276    .await?
277    .collect()
278    .await?;
279
280    let mut partitions = vec![];
281    let mut partition_blocks = vec![];
282    let mut partition_nb_objects: i64 = 0;
283
284    for rb in rbs {
285        for ir in 0..rb.num_rows() {
286            let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
287            let block_nb_objects = block.nb_objects as i64;
288
289            // Build StreamMetadata from the query results
290            use crate::dfext::{
291                string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
292            };
293            use crate::properties::properties_column_accessor::properties_column_by_name;
294            use datafusion::arrow::array::{BinaryArray, GenericListArray, StringArray};
295            use uuid::Uuid;
296
297            let stream_id_column = string_column_by_name(&rb, "stream_id")?;
298            let stream_process_id_column = string_column_by_name(&rb, "process_id")?;
299            let dependencies_metadata_column: &BinaryArray =
300                typed_column_by_name(&rb, "streams.dependencies_metadata")?;
301            let objects_metadata_column: &BinaryArray =
302                typed_column_by_name(&rb, "streams.objects_metadata")?;
303            let stream_tags_column: &GenericListArray<i32> =
304                typed_column_by_name(&rb, "streams.tags")?;
305            let stream_properties_accessor = properties_column_by_name(&rb, "streams.properties")?;
306
307            let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)
308                .with_context(|| "parsing stream_id")?;
309            let stream_process_id = Uuid::parse_str(stream_process_id_column.value(ir)?)
310                .with_context(|| "parsing stream process_id")?;
311
312            let dependencies_metadata = dependencies_metadata_column.value(ir);
313            let objects_metadata = objects_metadata_column.value(ir);
314            let stream_tags = stream_tags_column
315                .value(ir)
316                .as_any()
317                .downcast_ref::<StringArray>()
318                .with_context(|| "casting stream_tags")?
319                .iter()
320                .map(|item| String::from(item.unwrap_or_default()))
321                .collect();
322
323            // Get pre-serialized JSONB properties directly from accessor
324            let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
325
326            let stream = Arc::new(StreamMetadata {
327                stream_id,
328                process_id: stream_process_id,
329                dependencies_metadata: ciborium::from_reader(dependencies_metadata)
330                    .with_context(|| "decoding dependencies_metadata")?,
331                objects_metadata: ciborium::from_reader(objects_metadata)
332                    .with_context(|| "decoding objects_metadata")?,
333                tags: stream_tags,
334                properties: Arc::new(stream_properties_jsonb),
335            });
336
337            // Check if adding this block would exceed the limit
338            if partition_nb_objects + block_nb_objects > config.max_nb_objects
339                && !partition_blocks.is_empty()
340            {
341                // Push current partition without this block
342                partitions.push(SourceDataBlocksInMemory {
343                    blocks: partition_blocks,
344                    block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
345                });
346                // Start new partition with this block
347                partition_blocks = vec![Arc::new(PartitionSourceBlock {
348                    block,
349                    stream: stream.clone(),
350                    process: process.clone(),
351                })];
352                partition_nb_objects = block_nb_objects;
353            } else {
354                // Add block to current partition
355                partition_nb_objects += block_nb_objects;
356                partition_blocks.push(Arc::new(PartitionSourceBlock {
357                    block,
358                    stream: stream.clone(),
359                    process: process.clone(),
360                }));
361            }
362        }
363    }
364    if partition_nb_objects != 0 {
365        partitions.push(SourceDataBlocksInMemory {
366            blocks: partition_blocks,
367            block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
368        });
369    }
370    Ok(partitions)
371}
372
373/// generate_process_jit_partitions lists the partitions that are needed to cover a time span for a specific process
374/// these partitions may not exist or they could be out of date
375/// Generates JIT partitions for a given time range filtered by process.
376pub async fn generate_process_jit_partitions(
377    config: &JitPartitionConfig,
378    lakehouse: Arc<LakehouseContext>,
379    blocks_view: &BlocksView,
380    query_time_range: &TimeRange,
381    process: Arc<ProcessMetadata>,
382    stream_tag: &str,
383) -> Result<Vec<SourceDataBlocksInMemory>> {
384    // Get insert time range for all blocks in this process
385    let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
386    let partitions = part_provider
387        .fetch(
388            &blocks_view.get_view_set_name(),
389            &blocks_view.get_view_instance_id(),
390            Some(*query_time_range),
391            blocks_view.get_file_schema_hash(),
392        )
393        .await?;
394
395    let process_id = &process.process_id;
396    let begin_range_iso = query_time_range.begin.to_rfc3339();
397    let end_range_iso = query_time_range.end.to_rfc3339();
398    let sql = format!(
399        r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
400        FROM source
401        WHERE process_id = '{process_id}'
402        AND array_has( "streams.tags", '{stream_tag}' )
403        AND begin_time <= '{end_range_iso}'
404        AND end_time >= '{begin_range_iso}';"#
405    );
406
407    let reader_factory = lakehouse.reader_factory().clone();
408    let rbs = query_partitions(
409        lakehouse.runtime().clone(),
410        reader_factory,
411        lakehouse.lake().blob_storage.inner(),
412        blocks_view.get_file_schema(),
413        Arc::new(partitions),
414        &sql,
415    )
416    .await?
417    .collect()
418    .await?;
419
420    if rbs.is_empty() || rbs[0].num_rows() == 0 {
421        return Ok(vec![]);
422    }
423
424    let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
425    let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
426
427    if min_insert_time == 0 || max_insert_time == 0 {
428        return Ok(vec![]);
429    }
430
431    let insert_time_range = TimeRange::new(
432        DateTime::from_timestamp_nanos(min_insert_time)
433            .duration_trunc(config.max_insert_time_slice)?,
434        DateTime::from_timestamp_nanos(max_insert_time)
435            .duration_trunc(config.max_insert_time_slice)?
436            + config.max_insert_time_slice,
437    );
438
439    let mut begin_segment = insert_time_range.begin;
440    let mut end_segment = begin_segment + config.max_insert_time_slice;
441    let mut partitions = vec![];
442
443    while end_segment <= insert_time_range.end {
444        let insert_time_range = TimeRange::new(begin_segment, end_segment);
445        let mut segment_partitions = generate_process_jit_partitions_segment(
446            config,
447            lakehouse.clone(),
448            blocks_view,
449            &insert_time_range,
450            process.clone(),
451            stream_tag,
452        )
453        .await?;
454        partitions.append(&mut segment_partitions);
455        begin_segment = end_segment;
456        end_segment = begin_segment + config.max_insert_time_slice;
457    }
458    Ok(partitions)
459}
460
461/// is_jit_partition_up_to_date compares a partition spec with the partitions that exist to know if it should be recreated
462/// Checks if a JIT partition is up to date.
463#[span_fn]
464pub async fn is_jit_partition_up_to_date(
465    pool: &sqlx::PgPool,
466    view_meta: ViewMetadata,
467    spec: &SourceDataBlocksInMemory,
468) -> Result<bool> {
469    let (min_insert_time, max_insert_time) =
470        get_part_insert_time_range(spec).with_context(|| "get_event_time_range")?;
471    let desc = format!(
472        "[{}, {}] {} {}",
473        min_insert_time.to_rfc3339(),
474        max_insert_time.to_rfc3339(),
475        &*view_meta.view_set_name,
476        &*view_meta.view_instance_id,
477    );
478
479    // CRITICAL: Use inclusive inequalities (<=, >=) to prevent race conditions.
480    // With exclusive inequalities (<, >), identical time ranges never match, causing
481    // partitions to be unnecessarily recreated on every query, leading to non-deterministic
482    // results. See: https://github.com/madesroches/micromegas/issues/488
483    //
484    // ADDITIONAL FIX: For identical timestamps (min_insert_time == max_insert_time),
485    // we need exact equality matching to handle single-timestamp partitions correctly.
486    let rows = if min_insert_time == max_insert_time {
487        // For identical timestamps, look for exact matches
488        sqlx::query(
489            "SELECT file_schema_hash, source_data_hash
490             FROM lakehouse_partitions
491             WHERE view_set_name = $1
492             AND view_instance_id = $2
493             AND begin_insert_time = $3
494             AND end_insert_time = $3
495             ;",
496        )
497        .bind(&*view_meta.view_set_name)
498        .bind(&*view_meta.view_instance_id)
499        .bind(min_insert_time)
500    } else {
501        // For time ranges, use inclusive inequalities
502        sqlx::query(
503            "SELECT file_schema_hash, source_data_hash
504             FROM lakehouse_partitions
505             WHERE view_set_name = $1
506             AND view_instance_id = $2
507             AND begin_insert_time <= $3
508             AND end_insert_time >= $4
509             ;",
510        )
511        .bind(&*view_meta.view_set_name)
512        .bind(&*view_meta.view_instance_id)
513        .bind(max_insert_time)
514        .bind(min_insert_time)
515    }
516    .fetch_all(pool)
517    .await
518    .with_context(|| "fetching matching partitions")?;
519    if rows.len() != 1 {
520        debug!("{desc}: found {} partitions (expected 1)", rows.len());
521        for (i, row) in rows.iter().enumerate() {
522            let part_file_schema: Vec<u8> = row.try_get("file_schema_hash")?;
523            let part_source_data: Vec<u8> = row.try_get("source_data_hash")?;
524            let source_row_count = hash_to_object_count(&part_source_data)?;
525            debug!(
526                "{desc}: partition {}: file_schema_hash={:?}, source_rows={}",
527                i, part_file_schema, source_row_count
528            );
529        }
530        info!("{desc}: found {} partitions", rows.len());
531        return Ok(false);
532    }
533    let r = &rows[0];
534    let part_file_schema: Vec<u8> = r.try_get("file_schema_hash")?;
535    if part_file_schema != view_meta.file_schema_hash {
536        // this is dangerous because we could be creating a new partition smaller than the old one, which is not supported.
537        // let's make sure there is no old data loitering
538        warn!("{desc}: found matching partition with different file schema");
539        return Ok(false);
540    }
541    let part_source_data: Vec<u8> = r.try_get("source_data_hash")?;
542    let existing_count = hash_to_object_count(&part_source_data)?;
543    let required_count = hash_to_object_count(&spec.block_ids_hash)?;
544    if existing_count < required_count {
545        info!("{desc}: existing partition lacks source data: creating a new partition");
546        return Ok(false);
547    }
548    info!("{desc}: partition up to date");
549    Ok(true)
550}
551
552/// get_event_time_range returns the time range covered by a partition spec
553/// Returns the event time range covered by a partition spec.
554fn get_part_insert_time_range(
555    spec: &SourceDataBlocksInMemory,
556) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
557    if spec.blocks.is_empty() {
558        anyhow::bail!("empty partition should not exist");
559    }
560    // blocks need to be sorted by (event & insert) time
561    let min_insert_time = spec.blocks[0].block.insert_time;
562    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
563    Ok((min_insert_time, max_insert_time))
564}
565
566/// Writes a partition from a set of blocks.
567#[span_fn]
568pub async fn write_partition_from_blocks(
569    lake: Arc<DataLakeConnection>,
570    view_metadata: ViewMetadata,
571    schema: Arc<Schema>,
572    source_data: SourceDataBlocksInMemory,
573    block_processor: Arc<dyn BlockProcessor>,
574) -> Result<()> {
575    if source_data.blocks.is_empty() {
576        anyhow::bail!("empty partition spec");
577    }
578    // blocks need to be sorted by (event & insert) time
579    let min_insert_time = source_data.blocks[0].block.insert_time;
580    let max_insert_time = source_data.blocks[source_data.blocks.len() - 1]
581        .block
582        .insert_time;
583    let block_spec = BlockPartitionSpec {
584        view_metadata,
585        schema,
586        insert_range: TimeRange::new(min_insert_time, max_insert_time),
587        source_data: Arc::new(source_data),
588        block_processor,
589    };
590    let null_response_writer = Arc::new(ResponseWriter::new(None));
591    block_spec
592        .write(lake, null_response_writer)
593        .await
594        .with_context(|| "block_spec.write")?;
595    Ok(())
596}