micromegas_analytics/lakehouse/
jit_partitions.rs

1use super::{
2    block_partition_spec::{BlockPartitionSpec, BlockProcessorMap},
3    blocks_view::BlocksView,
4    lakehouse_context::LakehouseContext,
5    partition_cache::{LivePartitionProvider, QueryPartitionProvider},
6    partition_source_data::{PartitionSourceBlock, SourceDataBlocksInMemory},
7    view::{View, ViewMetadata},
8};
9use crate::{
10    dfext::{
11        string_column_accessor::string_column_by_name,
12        typed_column::{get_single_row_primitive_value, typed_column_by_name},
13    },
14    lakehouse::{
15        partition_cache::PartitionCache, partition_source_data::hash_to_object_count,
16        query::query_partitions, view::PartitionSpec,
17    },
18    metadata::{ProcessMetadata, StreamMetadata, block_from_batch_row},
19    properties::properties_column_accessor::properties_column_by_name,
20    response_writer::ResponseWriter,
21    time::TimeRange,
22};
23use anyhow::{Context, Result};
24use chrono::DurationRound;
25use chrono::{DateTime, TimeDelta, Utc};
26use datafusion::arrow::array::{BinaryArray, GenericListArray, StringArray};
27use datafusion::arrow::datatypes::{Schema, TimestampNanosecondType};
28use micromegas_ingestion::data_lake_connection::DataLakeConnection;
29use micromegas_tracing::prelude::*;
30use sqlx::Row;
31use std::sync::Arc;
32use uuid::Uuid;
33
34/// Configuration for Just-In-Time (JIT) partition generation.
35pub struct JitPartitionConfig {
36    pub max_nb_objects: i64,
37    pub max_insert_time_slice: TimeDelta,
38}
39
40impl Default for JitPartitionConfig {
41    fn default() -> Self {
42        JitPartitionConfig {
43            max_nb_objects: 20 * 1024 * 1024,
44            max_insert_time_slice: TimeDelta::hours(1),
45        }
46    }
47}
48
49async fn get_insert_time_range(
50    lakehouse: Arc<LakehouseContext>,
51    blocks_view: &BlocksView,
52    query_time_range: &TimeRange,
53    stream: Arc<StreamMetadata>,
54) -> Result<Option<TimeRange>> {
55    // we would need a PartitionCache built from event time range and then filtered for insert time range
56    let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
57    let partitions = part_provider
58        .fetch(
59            &blocks_view.get_view_set_name(),
60            &blocks_view.get_view_instance_id(),
61            Some(*query_time_range),
62            blocks_view.get_file_schema_hash(),
63        )
64        .await?;
65    let stream_id = &stream.stream_id;
66    let begin_range_iso = query_time_range.begin.to_rfc3339();
67    let end_range_iso = query_time_range.end.to_rfc3339();
68    let sql = format!(
69        r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
70        FROM source
71        WHERE stream_id = '{stream_id}'
72        AND begin_time <= '{end_range_iso}'
73        AND end_time >= '{begin_range_iso}';"#
74    );
75    let reader_factory = lakehouse.reader_factory().clone();
76    let rbs = query_partitions(
77        lakehouse.runtime().clone(),
78        reader_factory,
79        lakehouse.lake().blob_storage.inner(),
80        blocks_view.get_file_schema(),
81        Arc::new(partitions),
82        &sql,
83    )
84    .await?
85    .collect()
86    .await?;
87    if rbs.is_empty() {
88        return Ok(None);
89    }
90    if rbs[0].num_rows() == 0 {
91        return Ok(None);
92    }
93    let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
94    let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
95    Ok(Some(TimeRange::new(
96        DateTime::from_timestamp_nanos(min_insert_time),
97        DateTime::from_timestamp_nanos(max_insert_time),
98    )))
99}
100
101/// Generates a segment of JIT partitions.
102pub async fn generate_stream_jit_partitions_segment(
103    config: &JitPartitionConfig,
104    lakehouse: Arc<LakehouseContext>,
105    blocks_view: &BlocksView,
106    insert_time_range: &TimeRange,
107    stream: Arc<StreamMetadata>,
108    process: Arc<ProcessMetadata>,
109) -> Result<Vec<SourceDataBlocksInMemory>> {
110    let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
111        &lakehouse.lake().db_pool,
112        blocks_view.get_view_set_name(),
113        blocks_view.get_view_instance_id(),
114        *insert_time_range,
115    )
116    .await?;
117    let partitions = cache.partitions;
118
119    let stream_id = &stream.stream_id;
120    let begin_range_iso = insert_time_range.begin.to_rfc3339();
121    let end_range_iso = insert_time_range.end.to_rfc3339();
122    let sql = format!(
123        r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time, "streams.format"
124             FROM source
125             WHERE stream_id = '{stream_id}'
126             AND insert_time >= '{begin_range_iso}'
127             AND insert_time < '{end_range_iso}'
128             ORDER BY insert_time, block_id;"#
129    );
130
131    let reader_factory = lakehouse.reader_factory().clone();
132    let rbs = query_partitions(
133        lakehouse.runtime().clone(),
134        reader_factory,
135        lakehouse.lake().blob_storage.inner(),
136        blocks_view.get_file_schema(),
137        Arc::new(partitions),
138        &sql,
139    )
140    .await?
141    .collect()
142    .await?;
143
144    let mut partitions = vec![];
145    let mut partition_blocks = vec![];
146    let mut partition_nb_objects: i64 = 0;
147    for rb in rbs {
148        let format_column = string_column_by_name(&rb, "streams.format")?;
149        for ir in 0..rb.num_rows() {
150            let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
151            let block_nb_objects = block.nb_objects as i64;
152            let format = format_column.value(ir)?.to_string();
153
154            // Check if adding this block would exceed the limit
155            if partition_nb_objects + block_nb_objects > config.max_nb_objects
156                && !partition_blocks.is_empty()
157            {
158                // Push current partition without this block
159                partitions.push(SourceDataBlocksInMemory {
160                    blocks: partition_blocks,
161                    block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
162                });
163                // Start new partition with this block
164                partition_blocks = vec![Arc::new(PartitionSourceBlock {
165                    block,
166                    stream: stream.clone(),
167                    process: process.clone(),
168                    format,
169                })];
170                partition_nb_objects = block_nb_objects;
171            } else {
172                // Add block to current partition
173                partition_nb_objects += block_nb_objects;
174                partition_blocks.push(Arc::new(PartitionSourceBlock {
175                    block,
176                    stream: stream.clone(),
177                    process: process.clone(),
178                    format,
179                }));
180            }
181        }
182    }
183    if partition_nb_objects != 0 {
184        partitions.push(SourceDataBlocksInMemory {
185            blocks: partition_blocks,
186            block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
187        });
188    }
189
190    Ok(partitions)
191}
192
193/// generate_stream_jit_partitions lists the partitiions that are needed to cover a time span
194/// these partitions may not exist or they could be out of date
195/// Generates JIT partitions for a given time range.
196pub async fn generate_stream_jit_partitions(
197    config: &JitPartitionConfig,
198    lakehouse: Arc<LakehouseContext>,
199    blocks_view: &BlocksView,
200    query_time_range: &TimeRange,
201    stream: Arc<StreamMetadata>,
202    process: Arc<ProcessMetadata>,
203) -> Result<Vec<SourceDataBlocksInMemory>> {
204    let insert_time_range = get_insert_time_range(
205        lakehouse.clone(),
206        blocks_view,
207        query_time_range,
208        stream.clone(),
209    )
210    .await?;
211    if insert_time_range.is_none() {
212        return Ok(vec![]);
213    }
214    let insert_time_range = insert_time_range.with_context(|| "missing insert_time_range")?;
215    let insert_time_range = TimeRange::new(
216        insert_time_range
217            .begin
218            .duration_trunc(config.max_insert_time_slice)?,
219        insert_time_range
220            .end
221            .duration_trunc(config.max_insert_time_slice)?
222            + config.max_insert_time_slice,
223    );
224    let mut begin_segment = insert_time_range.begin;
225    let mut end_segment = begin_segment + config.max_insert_time_slice;
226    let mut partitions = vec![];
227    while end_segment <= insert_time_range.end {
228        let insert_time_range = TimeRange::new(begin_segment, end_segment);
229        let mut segment_partitions = generate_stream_jit_partitions_segment(
230            config,
231            lakehouse.clone(),
232            blocks_view,
233            &insert_time_range,
234            stream.clone(),
235            process.clone(),
236        )
237        .await?;
238        partitions.append(&mut segment_partitions);
239        begin_segment = end_segment;
240        end_segment = begin_segment + config.max_insert_time_slice;
241    }
242    Ok(partitions)
243}
244
245/// Generates a segment of JIT partitions filtered by process.
246pub async fn generate_process_jit_partitions_segment(
247    config: &JitPartitionConfig,
248    lakehouse: Arc<LakehouseContext>,
249    blocks_view: &BlocksView,
250    insert_time_range: &TimeRange,
251    process: Arc<ProcessMetadata>,
252    stream_tag: &str,
253) -> Result<Vec<SourceDataBlocksInMemory>> {
254    let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
255        &lakehouse.lake().db_pool,
256        blocks_view.get_view_set_name(),
257        blocks_view.get_view_instance_id(),
258        *insert_time_range,
259    )
260    .await?;
261    let partitions = cache.partitions;
262
263    let process_id = &process.process_id;
264    let begin_range_iso = insert_time_range.begin.to_rfc3339();
265    let end_range_iso = insert_time_range.end.to_rfc3339();
266    let sql = format!(
267        r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time,
268             "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties", "streams.format"
269             FROM source
270             WHERE process_id = '{process_id}'
271             AND array_has( "streams.tags", '{stream_tag}' )
272             AND insert_time >= '{begin_range_iso}'
273             AND insert_time < '{end_range_iso}'
274             ORDER BY insert_time, block_id;"#
275    );
276
277    let reader_factory = lakehouse.reader_factory().clone();
278    let rbs = query_partitions(
279        lakehouse.runtime().clone(),
280        reader_factory,
281        lakehouse.lake().blob_storage.inner(),
282        blocks_view.get_file_schema(),
283        Arc::new(partitions),
284        &sql,
285    )
286    .await?
287    .collect()
288    .await?;
289
290    let mut partitions = vec![];
291    let mut partition_blocks = vec![];
292    let mut partition_nb_objects: i64 = 0;
293
294    for rb in rbs {
295        for ir in 0..rb.num_rows() {
296            let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
297            let block_nb_objects = block.nb_objects as i64;
298
299            // Build StreamMetadata from the query results
300            let stream_id_column = string_column_by_name(&rb, "stream_id")?;
301            let stream_process_id_column = string_column_by_name(&rb, "process_id")?;
302            let dependencies_metadata_column: &BinaryArray =
303                typed_column_by_name(&rb, "streams.dependencies_metadata")?;
304            let objects_metadata_column: &BinaryArray =
305                typed_column_by_name(&rb, "streams.objects_metadata")?;
306            let stream_tags_column: &GenericListArray<i32> =
307                typed_column_by_name(&rb, "streams.tags")?;
308            let stream_properties_accessor = properties_column_by_name(&rb, "streams.properties")?;
309            let stream_format_column = string_column_by_name(&rb, "streams.format")?;
310
311            let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)
312                .with_context(|| "parsing stream_id")?;
313            let stream_process_id = Uuid::parse_str(stream_process_id_column.value(ir)?)
314                .with_context(|| "parsing stream process_id")?;
315
316            let dependencies_metadata = dependencies_metadata_column.value(ir);
317            let objects_metadata = objects_metadata_column.value(ir);
318            let stream_tags = stream_tags_column
319                .value(ir)
320                .as_any()
321                .downcast_ref::<StringArray>()
322                .with_context(|| "casting stream_tags")?
323                .iter()
324                .map(|item| String::from(item.unwrap_or_default()))
325                .collect();
326
327            // Get pre-serialized JSONB properties directly from accessor
328            let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
329
330            let stream = Arc::new(StreamMetadata {
331                stream_id,
332                process_id: stream_process_id,
333                dependencies_metadata: ciborium::from_reader(dependencies_metadata)
334                    .with_context(|| "decoding dependencies_metadata")?,
335                objects_metadata: ciborium::from_reader(objects_metadata)
336                    .with_context(|| "decoding objects_metadata")?,
337                tags: stream_tags,
338                properties: Arc::new(stream_properties_jsonb),
339            });
340
341            let format = stream_format_column.value(ir)?.to_string();
342
343            // Check if adding this block would exceed the limit
344            if partition_nb_objects + block_nb_objects > config.max_nb_objects
345                && !partition_blocks.is_empty()
346            {
347                // Push current partition without this block
348                partitions.push(SourceDataBlocksInMemory {
349                    blocks: partition_blocks,
350                    block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
351                });
352                // Start new partition with this block
353                partition_blocks = vec![Arc::new(PartitionSourceBlock {
354                    block,
355                    stream: stream.clone(),
356                    process: process.clone(),
357                    format,
358                })];
359                partition_nb_objects = block_nb_objects;
360            } else {
361                // Add block to current partition
362                partition_nb_objects += block_nb_objects;
363                partition_blocks.push(Arc::new(PartitionSourceBlock {
364                    block,
365                    stream: stream.clone(),
366                    process: process.clone(),
367                    format,
368                }));
369            }
370        }
371    }
372    if partition_nb_objects != 0 {
373        partitions.push(SourceDataBlocksInMemory {
374            blocks: partition_blocks,
375            block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
376        });
377    }
378    Ok(partitions)
379}
380
381/// generate_process_jit_partitions lists the partitions that are needed to cover a time span for a specific process
382/// these partitions may not exist or they could be out of date
383/// Generates JIT partitions for a given time range filtered by process.
384pub async fn generate_process_jit_partitions(
385    config: &JitPartitionConfig,
386    lakehouse: Arc<LakehouseContext>,
387    blocks_view: &BlocksView,
388    query_time_range: &TimeRange,
389    process: Arc<ProcessMetadata>,
390    stream_tag: &str,
391) -> Result<Vec<SourceDataBlocksInMemory>> {
392    // Get insert time range for all blocks in this process
393    let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
394    let partitions = part_provider
395        .fetch(
396            &blocks_view.get_view_set_name(),
397            &blocks_view.get_view_instance_id(),
398            Some(*query_time_range),
399            blocks_view.get_file_schema_hash(),
400        )
401        .await?;
402
403    let process_id = &process.process_id;
404    let begin_range_iso = query_time_range.begin.to_rfc3339();
405    let end_range_iso = query_time_range.end.to_rfc3339();
406    let sql = format!(
407        r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
408        FROM source
409        WHERE process_id = '{process_id}'
410        AND array_has( "streams.tags", '{stream_tag}' )
411        AND begin_time <= '{end_range_iso}'
412        AND end_time >= '{begin_range_iso}';"#
413    );
414
415    let reader_factory = lakehouse.reader_factory().clone();
416    let rbs = query_partitions(
417        lakehouse.runtime().clone(),
418        reader_factory,
419        lakehouse.lake().blob_storage.inner(),
420        blocks_view.get_file_schema(),
421        Arc::new(partitions),
422        &sql,
423    )
424    .await?
425    .collect()
426    .await?;
427
428    if rbs.is_empty() || rbs[0].num_rows() == 0 {
429        return Ok(vec![]);
430    }
431
432    let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
433    let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
434
435    if min_insert_time == 0 || max_insert_time == 0 {
436        return Ok(vec![]);
437    }
438
439    let insert_time_range = TimeRange::new(
440        DateTime::from_timestamp_nanos(min_insert_time)
441            .duration_trunc(config.max_insert_time_slice)?,
442        DateTime::from_timestamp_nanos(max_insert_time)
443            .duration_trunc(config.max_insert_time_slice)?
444            + config.max_insert_time_slice,
445    );
446
447    let mut begin_segment = insert_time_range.begin;
448    let mut end_segment = begin_segment + config.max_insert_time_slice;
449    let mut partitions = vec![];
450
451    while end_segment <= insert_time_range.end {
452        let insert_time_range = TimeRange::new(begin_segment, end_segment);
453        let mut segment_partitions = generate_process_jit_partitions_segment(
454            config,
455            lakehouse.clone(),
456            blocks_view,
457            &insert_time_range,
458            process.clone(),
459            stream_tag,
460        )
461        .await?;
462        partitions.append(&mut segment_partitions);
463        begin_segment = end_segment;
464        end_segment = begin_segment + config.max_insert_time_slice;
465    }
466    Ok(partitions)
467}
468
469/// is_jit_partition_up_to_date compares a partition spec with the partitions that exist to know if it should be recreated
470/// Checks if a JIT partition is up to date.
471#[span_fn]
472pub async fn is_jit_partition_up_to_date(
473    pool: &sqlx::PgPool,
474    view_meta: ViewMetadata,
475    spec: &SourceDataBlocksInMemory,
476) -> Result<bool> {
477    let (min_insert_time, max_insert_time) =
478        get_part_insert_time_range(spec).with_context(|| "get_event_time_range")?;
479    let desc = format!(
480        "[{}, {}] {} {}",
481        min_insert_time.to_rfc3339(),
482        max_insert_time.to_rfc3339(),
483        &*view_meta.view_set_name,
484        &*view_meta.view_instance_id,
485    );
486
487    // CRITICAL: Use inclusive inequalities (<=, >=) to prevent race conditions.
488    // With exclusive inequalities (<, >), identical time ranges never match, causing
489    // partitions to be unnecessarily recreated on every query, leading to non-deterministic
490    // results. See: https://github.com/madesroches/micromegas/issues/488
491    //
492    // ADDITIONAL FIX: For identical timestamps (min_insert_time == max_insert_time),
493    // we need exact equality matching to handle single-timestamp partitions correctly.
494    let rows = if min_insert_time == max_insert_time {
495        // For identical timestamps, look for exact matches
496        sqlx::query(
497            "SELECT file_schema_hash, source_data_hash
498             FROM lakehouse_partitions
499             WHERE view_set_name = $1
500             AND view_instance_id = $2
501             AND begin_insert_time = $3
502             AND end_insert_time = $3
503             ;",
504        )
505        .bind(&*view_meta.view_set_name)
506        .bind(&*view_meta.view_instance_id)
507        .bind(min_insert_time)
508    } else {
509        // For time ranges, use inclusive inequalities
510        sqlx::query(
511            "SELECT file_schema_hash, source_data_hash
512             FROM lakehouse_partitions
513             WHERE view_set_name = $1
514             AND view_instance_id = $2
515             AND begin_insert_time <= $3
516             AND end_insert_time >= $4
517             ;",
518        )
519        .bind(&*view_meta.view_set_name)
520        .bind(&*view_meta.view_instance_id)
521        .bind(max_insert_time)
522        .bind(min_insert_time)
523    }
524    .fetch_all(pool)
525    .await
526    .with_context(|| "fetching matching partitions")?;
527    if rows.len() != 1 {
528        debug!("{desc}: found {} partitions (expected 1)", rows.len());
529        for (i, row) in rows.iter().enumerate() {
530            let part_file_schema: Vec<u8> = row.try_get("file_schema_hash")?;
531            let part_source_data: Vec<u8> = row.try_get("source_data_hash")?;
532            let source_row_count = hash_to_object_count(&part_source_data)?;
533            debug!(
534                "{desc}: partition {}: file_schema_hash={:?}, source_rows={}",
535                i, part_file_schema, source_row_count
536            );
537        }
538        info!("{desc}: found {} partitions", rows.len());
539        return Ok(false);
540    }
541    let r = &rows[0];
542    let part_file_schema: Vec<u8> = r.try_get("file_schema_hash")?;
543    if part_file_schema != view_meta.file_schema_hash {
544        // this is dangerous because we could be creating a new partition smaller than the old one, which is not supported.
545        // let's make sure there is no old data loitering
546        warn!("{desc}: found matching partition with different file schema");
547        return Ok(false);
548    }
549    let part_source_data: Vec<u8> = r.try_get("source_data_hash")?;
550    let existing_count = hash_to_object_count(&part_source_data)?;
551    let required_count = hash_to_object_count(&spec.block_ids_hash)?;
552    if existing_count < required_count {
553        info!("{desc}: existing partition lacks source data: creating a new partition");
554        return Ok(false);
555    }
556    info!("{desc}: partition up to date");
557    Ok(true)
558}
559
560/// get_event_time_range returns the time range covered by a partition spec
561/// Returns the event time range covered by a partition spec.
562fn get_part_insert_time_range(
563    spec: &SourceDataBlocksInMemory,
564) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
565    if spec.blocks.is_empty() {
566        anyhow::bail!("empty partition should not exist");
567    }
568    // blocks need to be sorted by (event & insert) time
569    let min_insert_time = spec.blocks[0].block.insert_time;
570    let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
571    Ok((min_insert_time, max_insert_time))
572}
573
574/// Writes a partition from a set of blocks.
575///
576/// `block_processors` keys per-block dispatch on the source block's `format`;
577/// see `BlockPartitionSpec::write` for the unknown-format behavior.
578#[span_fn]
579pub async fn write_partition_from_blocks(
580    lake: Arc<DataLakeConnection>,
581    view_metadata: ViewMetadata,
582    schema: Arc<Schema>,
583    source_data: SourceDataBlocksInMemory,
584    block_processors: Arc<BlockProcessorMap>,
585) -> Result<()> {
586    if source_data.blocks.is_empty() {
587        anyhow::bail!("empty partition spec");
588    }
589    // blocks need to be sorted by (event & insert) time
590    let min_insert_time = source_data.blocks[0].block.insert_time;
591    let max_insert_time = source_data.blocks[source_data.blocks.len() - 1]
592        .block
593        .insert_time;
594    let block_spec = BlockPartitionSpec {
595        view_metadata,
596        schema,
597        insert_range: TimeRange::new(min_insert_time, max_insert_time),
598        source_data: Arc::new(source_data),
599        block_processors,
600    };
601    let null_response_writer = Arc::new(ResponseWriter::new(None));
602    block_spec
603        .write(lake, null_response_writer)
604        .await
605        .with_context(|| "block_spec.write")?;
606    Ok(())
607}