micromegas_analytics/lakehouse/
write_partition.rs

1use crate::{
2    arrow_utils::serialize_parquet_metadata, lakehouse::async_parquet_writer::AsyncParquetWriter,
3    response_writer::Logger, time::TimeRange,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, TimeDelta, Utc};
7use datafusion::{
8    arrow::{array::RecordBatch, datatypes::Schema},
9    parquet::{
10        arrow::AsyncArrowWriter,
11        basic::Compression,
12        file::{
13            metadata::ParquetMetaData,
14            properties::{WriterProperties, WriterVersion},
15        },
16    },
17};
18use micromegas_ingestion::data_lake_connection::DataLakeConnection;
19use micromegas_tracing::prelude::*;
20use object_store::buffered::BufWriter;
21use sqlx::Row;
22use std::collections::hash_map::DefaultHasher;
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, atomic::AtomicI64};
25use tokio::sync::mpsc::Receiver;
26
27use super::{partition::Partition, partition_source_data, view::ViewMetadata};
28
29/// Adds a file to the temporary_files table for cleanup.
30///
31/// Files added to temporary_files will be automatically deleted by the cleanup process
32/// after the expiration time. The default expiration is 1 hour from now.
33pub async fn add_file_for_cleanup(
34    transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
35    file_path: &str,
36    file_size: i64,
37) -> Result<()> {
38    let expiration = Utc::now()
39        + TimeDelta::try_hours(1)
40            .with_context(|| "calculating expiration time for temporary file")?;
41
42    sqlx::query("INSERT INTO temporary_files VALUES ($1, $2, $3)")
43        .bind(file_path)
44        .bind(file_size)
45        .bind(expiration)
46        .execute(&mut **transaction)
47        .await
48        .with_context(|| format!("adding file {file_path} to temporary files for cleanup"))?;
49
50    Ok(())
51}
52
53/// A set of rows for a partition, along with their time range.
54pub struct PartitionRowSet {
55    pub rows_time_range: TimeRange,
56    pub rows: RecordBatch,
57}
58
59impl PartitionRowSet {
60    pub fn new(rows_time_range: TimeRange, rows: RecordBatch) -> Self {
61        Self {
62            rows_time_range,
63            rows,
64        }
65    }
66}
67
68/// Retires partitions that have exceeded their expiration time.
69pub async fn retire_expired_partitions(
70    lake: &DataLakeConnection,
71    expiration: DateTime<Utc>,
72) -> Result<()> {
73    let mut transaction = lake.db_pool.begin().await?;
74    let old_partitions = sqlx::query(
75        "SELECT file_path, file_size
76         FROM lakehouse_partitions
77         WHERE end_insert_time < $1
78         ;",
79    )
80    .bind(expiration)
81    .fetch_all(&mut *transaction)
82    .await
83    .with_context(|| "listing expired partitions")?;
84
85    let mut file_paths = Vec::new();
86    for old_part in &old_partitions {
87        let file_path: Option<String> = old_part.try_get("file_path")?;
88        let file_size: i64 = old_part.try_get("file_size")?;
89        if let Some(path) = file_path {
90            info!("adding out of date partition {path} to temporary files to be deleted");
91            add_file_for_cleanup(&mut transaction, &path, file_size).await?;
92            file_paths.push(path);
93        }
94    }
95
96    sqlx::query(
97        "DELETE from lakehouse_partitions
98         WHERE end_insert_time < $1
99         ;",
100    )
101    .bind(expiration)
102    .execute(&mut *transaction)
103    .await
104    .with_context(|| "deleting expired partitions")?;
105    transaction.commit().await.with_context(|| "commit")?;
106    Ok(())
107}
108
109/// Retires partitions from the active set.
110/// Overlap is determined by the insert_time of the telemetry.
111pub async fn retire_partitions(
112    transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
113    view_set_name: &str,
114    view_instance_id: &str,
115    begin_insert_time: DateTime<Utc>,
116    end_insert_time: DateTime<Utc>,
117    logger: Arc<dyn Logger>,
118) -> Result<()> {
119    // this is not an overlap test, we need to assume that we are not making a new smaller partition
120    // where a bigger one existed
121    // its gets tricky in the jit case where a partition can have only one block and begin_insert == end_insert
122
123    //todo: use DELETE+RETURNING
124    let old_partitions = if begin_insert_time == end_insert_time {
125        // For identical timestamps, look for exact matches to handle single-block partitions
126        sqlx::query(
127            "SELECT file_path, file_size
128             FROM lakehouse_partitions
129             WHERE view_set_name = $1
130             AND view_instance_id = $2
131             AND begin_insert_time = $3
132             AND end_insert_time = $3
133             ;",
134        )
135        .bind(view_set_name)
136        .bind(view_instance_id)
137        .bind(begin_insert_time)
138        .fetch_all(&mut **transaction)
139        .await
140        .with_context(|| "listing old partitions (exact match)")?
141    } else {
142        // For time ranges, use inclusive inequalities
143        sqlx::query(
144            "SELECT file_path, file_size
145             FROM lakehouse_partitions
146             WHERE view_set_name = $1
147             AND view_instance_id = $2
148             AND begin_insert_time >= $3
149             AND end_insert_time <= $4
150             ;",
151        )
152        .bind(view_set_name)
153        .bind(view_instance_id)
154        .bind(begin_insert_time)
155        .bind(end_insert_time)
156        .fetch_all(&mut **transaction)
157        .await
158        .with_context(|| "listing old partitions (range)")?
159    };
160
161    // LOG: Found partitions for retirement (only if any found)
162    if !old_partitions.is_empty() {
163        logger
164            .write_log_entry(format!(
165                "[RETIRE_FOUND] view={}/{} time_range=[{}, {}] found_partitions={}",
166                view_set_name,
167                view_instance_id,
168                begin_insert_time,
169                end_insert_time,
170                old_partitions.len()
171            ))
172            .await?;
173    }
174
175    let mut file_paths = Vec::new();
176    for old_part in &old_partitions {
177        let file_path: Option<String> = old_part.try_get("file_path")?;
178        let file_size: i64 = old_part.try_get("file_size")?;
179        if let Some(path) = file_path {
180            logger
181                .write_log_entry(format!(
182                    "adding out of date partition {path} to temporary files to be deleted"
183                ))
184                .await?;
185            add_file_for_cleanup(transaction, &path, file_size).await?;
186            file_paths.push(path);
187        }
188    }
189
190    if begin_insert_time == end_insert_time {
191        // For identical timestamps, delete exact matches to handle single-block partitions
192        sqlx::query(
193            "DELETE from lakehouse_partitions
194             WHERE view_set_name = $1
195             AND view_instance_id = $2
196             AND begin_insert_time = $3
197             AND end_insert_time = $3
198             ;",
199        )
200        .bind(view_set_name)
201        .bind(view_instance_id)
202        .bind(begin_insert_time)
203        .execute(&mut **transaction)
204        .await
205        .with_context(|| "deleting out of date partitions (exact match)")?
206    } else {
207        // For time ranges, use inclusive inequalities
208        sqlx::query(
209            "DELETE from lakehouse_partitions
210             WHERE view_set_name = $1
211             AND view_instance_id = $2
212             AND begin_insert_time >= $3
213             AND end_insert_time <= $4
214             ;",
215        )
216        .bind(view_set_name)
217        .bind(view_instance_id)
218        .bind(begin_insert_time)
219        .bind(end_insert_time)
220        .execute(&mut **transaction)
221        .await
222        .with_context(|| "deleting out of date partitions (range)")?
223    };
224    Ok(())
225}
226
227/// Generate a deterministic advisory lock key for a partition
228fn generate_partition_lock_key(
229    view_set_name: &str,
230    view_instance_id: &str,
231    begin_insert_time: DateTime<Utc>,
232    end_insert_time: DateTime<Utc>,
233) -> i64 {
234    let mut hasher = DefaultHasher::new();
235    view_set_name.hash(&mut hasher);
236    view_instance_id.hash(&mut hasher);
237    begin_insert_time.hash(&mut hasher);
238    end_insert_time.hash(&mut hasher);
239    hasher.finish() as i64
240}
241
242async fn insert_partition(
243    lake: &DataLakeConnection,
244    partition: &Partition,
245    file_metadata: Option<&Arc<ParquetMetaData>>,
246    logger: Arc<dyn Logger>,
247) -> Result<()> {
248    // Generate deterministic lock key for this partition
249    let lock_key = generate_partition_lock_key(
250        &partition.view_metadata.view_set_name,
251        &partition.view_metadata.view_instance_id,
252        partition.begin_insert_time(),
253        partition.end_insert_time(),
254    );
255
256    let mut transaction = lake.db_pool.begin().await?;
257
258    debug!(
259        "[PARTITION_LOCK] view={}/{} time_range=[{}, {}] lock_key={} - acquiring advisory lock",
260        &partition.view_metadata.view_set_name,
261        &partition.view_metadata.view_instance_id,
262        partition.begin_insert_time(),
263        partition.end_insert_time(),
264        lock_key
265    );
266
267    // Acquire advisory lock - this will block until we can proceed
268    // pg_advisory_xact_lock automatically releases when transaction ends
269    sqlx::query("SELECT pg_advisory_xact_lock($1);")
270        .bind(lock_key)
271        .execute(&mut *transaction)
272        .await
273        .with_context(|| "acquiring advisory lock")?;
274
275    // Decode source_data_hash back to the row count (it's stored as i64 little-endian bytes)
276    let source_row_count = partition_source_data::hash_to_object_count(&partition.source_data_hash)
277        .with_context(|| "decoding source_data_hash to row count")?;
278
279    // LOG: Lock acquired, starting partition write
280    logger
281        .write_log_entry(format!(
282            "[PARTITION_WRITE_START] view={}/{} time_range=[{}, {}] source_rows={} - lock acquired",
283            &partition.view_metadata.view_set_name,
284            &partition.view_metadata.view_instance_id,
285            partition.begin_insert_time(),
286            partition.end_insert_time(),
287            source_row_count
288        ))
289        .await?;
290
291    // for jit partitions, we assume that the blocks were registered in order
292    // since they are built based on begin_ticks, not insert_time
293    retire_partitions(
294        &mut transaction,
295        &partition.view_metadata.view_set_name,
296        &partition.view_metadata.view_instance_id,
297        partition.begin_insert_time(),
298        partition.end_insert_time(),
299        logger.clone(),
300    )
301    .await
302    .with_context(|| "retire_partitions")?;
303
304    debug!(
305        "[PARTITION_INSERT_ATTEMPT] view={}/{} time_range=[{}, {}] source_rows={} file_path={:?}",
306        &partition.view_metadata.view_set_name,
307        &partition.view_metadata.view_instance_id,
308        partition.begin_insert_time(),
309        partition.end_insert_time(),
310        source_row_count,
311        partition.file_path
312    );
313
314    // Insert the parquet metadata into the dedicated metadata table within the same transaction
315    // Only insert metadata if partition has a file (not empty)
316    if let (Some(file_path), Some(metadata)) = (&partition.file_path, file_metadata) {
317        let metadata_bytes = serialize_parquet_metadata(metadata)
318            .with_context(|| "serializing parquet metadata for dedicated table")?;
319        let insert_time = sqlx::types::chrono::Utc::now();
320
321        sqlx::query(
322            "INSERT INTO partition_metadata (file_path, metadata, insert_time, partition_format_version)
323             VALUES ($1, $2, $3, 2)",
324        )
325        .bind(file_path)
326        .bind(metadata_bytes.as_ref())
327        .bind(insert_time)
328        .execute(&mut *transaction)
329        .await
330        .with_context(|| format!("inserting metadata for file: {}", file_path))?;
331    }
332
333    // Insert the new partition with format version 2 (Arrow 57.0)
334    let insert_result = sqlx::query(
335        "INSERT INTO lakehouse_partitions VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 2);",
336    )
337    .bind(&*partition.view_metadata.view_set_name)
338    .bind(&*partition.view_metadata.view_instance_id)
339    .bind(partition.begin_insert_time())
340    .bind(partition.end_insert_time())
341    .bind(partition.min_event_time())
342    .bind(partition.max_event_time())
343    .bind(partition.updated)
344    .bind(&partition.file_path)
345    .bind(partition.file_size)
346    .bind(&partition.view_metadata.file_schema_hash)
347    .bind(&partition.source_data_hash)
348    .bind(partition.num_rows)
349    .execute(&mut *transaction)
350    .await;
351
352    match insert_result {
353        Ok(_) => {
354            debug!(
355                "[PARTITION_INSERT_SUCCESS] view={}/{} time_range=[{}, {}] source_rows={}",
356                &partition.view_metadata.view_set_name,
357                &partition.view_metadata.view_instance_id,
358                partition.begin_insert_time(),
359                partition.end_insert_time(),
360                source_row_count
361            );
362        }
363        Err(ref e) => {
364            logger
365                .write_log_entry(format!(
366                    "[PARTITION_INSERT_ERROR] view={}/{} time_range=[{}, {}] source_rows={} error={}",
367                    &partition.view_metadata.view_set_name,
368                    &partition.view_metadata.view_instance_id,
369                    partition.begin_insert_time(),
370                    partition.end_insert_time(),
371                    source_row_count,
372                    e
373                ))
374                .await?;
375            return Err(insert_result.unwrap_err().into());
376        }
377    };
378
379    // Commit the transaction (this also releases the advisory lock)
380    transaction.commit().await.with_context(|| "commit")?;
381
382    info!(
383        "[PARTITION_WRITE_COMMIT] view={}/{} time_range=[{}, {}] file_path={:?} - lock released",
384        &partition.view_metadata.view_set_name,
385        &partition.view_metadata.view_instance_id,
386        partition.begin_insert_time(),
387        partition.end_insert_time(),
388        partition.file_path
389    );
390    Ok(())
391}
392
393/// Result of writing rows to a partition file.
394struct PartitionWriteResult {
395    num_rows: i64,
396    file_metadata: Option<Arc<ParquetMetaData>>,
397    file_path: Option<String>,
398    file_size: i64,
399    event_time_range: Option<TimeRange>,
400}
401
402/// Writes rows from the stream and tracks event time ranges.
403async fn write_rows_and_track_times(
404    rb_stream: &mut Receiver<PartitionRowSet>,
405    arrow_writer: &mut AsyncArrowWriter<AsyncParquetWriter>,
406    logger: &Arc<dyn Logger>,
407    desc: &str,
408) -> Result<Option<TimeRange>> {
409    let mut min_event_time: Option<DateTime<Utc>> = None;
410    let mut max_event_time: Option<DateTime<Utc>> = None;
411    let mut write_progression = 0;
412
413    while let Some(row_set) = rb_stream.recv().await {
414        min_event_time = Some(
415            min_event_time
416                .unwrap_or(row_set.rows_time_range.begin)
417                .min(row_set.rows_time_range.begin),
418        );
419        max_event_time = Some(
420            max_event_time
421                .unwrap_or(row_set.rows_time_range.end)
422                .max(row_set.rows_time_range.end),
423        );
424        arrow_writer
425            .write(&row_set.rows)
426            .await
427            .with_context(|| "arrow_writer.write")?;
428        if arrow_writer.in_progress_size() > 100 * 1024 * 1024 {
429            arrow_writer
430                .flush()
431                .await
432                .with_context(|| "arrow_writer.flush")?;
433        }
434
435        // Log progress every 10MB to avoid spamming and prevent idle timeout
436        let progression = arrow_writer.bytes_written() / (10 * 1024 * 1024);
437        if progression != write_progression {
438            write_progression = progression;
439            let written = arrow_writer.bytes_written();
440            logger
441                .write_log_entry(format!("{desc}: written {written} bytes"))
442                .await
443                .with_context(|| "writing log entry")?;
444        }
445    }
446
447    Ok(match (min_event_time, max_event_time) {
448        (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
449        _ => None,
450    })
451}
452
453/// Finalizes the partition write, closing the file and creating metadata.
454async fn finalize_partition_write(
455    event_time_range: Option<TimeRange>,
456    arrow_writer: AsyncArrowWriter<AsyncParquetWriter>,
457    file_path: String,
458    byte_counter: &Arc<AtomicI64>,
459    logger: &Arc<dyn Logger>,
460    desc: &str,
461    object_store: Arc<dyn object_store::ObjectStore>,
462) -> Result<PartitionWriteResult> {
463    if let Some(event_time_range) = event_time_range {
464        // Potentially non-empty partition: close the file and get metadata
465        let close_result = arrow_writer.close().await;
466
467        match close_result {
468            Ok(parquet_metadata) => {
469                let num_rows = parquet_metadata.file_metadata().num_rows();
470
471                // Check if the file actually contains rows
472                // Even if we tracked event times, the file might be empty
473                if num_rows == 0 {
474                    // File contains no rows - treat as empty partition
475                    logger
476                        .write_log_entry(format!(
477                            "created 0-row file, treating as empty partition for {desc}"
478                        ))
479                        .await
480                        .with_context(|| "writing log entry")?;
481
482                    // Delete the empty file
483                    let path = object_store::path::Path::from(file_path.as_str());
484                    if let Err(delete_err) = object_store.delete(&path).await {
485                        warn!("failed to delete empty file {}: {}", file_path, delete_err);
486                    }
487
488                    return Ok(PartitionWriteResult {
489                        num_rows: 0,
490                        file_metadata: None,
491                        file_path: None,
492                        file_size: 0,
493                        event_time_range: None,
494                    });
495                }
496
497                // Non-empty file: keep it and return full metadata
498                debug!(
499                    "wrote nb_rows={} size={} path={file_path}",
500                    num_rows,
501                    byte_counter.load(std::sync::atomic::Ordering::Relaxed)
502                );
503                let file_metadata = Arc::new(parquet_metadata);
504                let file_size = byte_counter.load(std::sync::atomic::Ordering::Relaxed);
505                Ok(PartitionWriteResult {
506                    num_rows,
507                    file_metadata: Some(file_metadata),
508                    file_path: Some(file_path),
509                    file_size,
510                    event_time_range: Some(event_time_range),
511                })
512            }
513            Err(e) => {
514                // Close failed - try to delete any partial file that may have been written
515                warn!(
516                    "arrow_writer.close failed, attempting to delete partial file: {}",
517                    file_path
518                );
519                let path = object_store::path::Path::from(file_path.as_str());
520                if let Err(delete_err) = object_store.delete(&path).await {
521                    warn!(
522                        "failed to delete partial file {}: {}",
523                        file_path, delete_err
524                    );
525                }
526                Err(e).with_context(|| "arrow_writer.close")
527            }
528        }
529    } else {
530        // Empty partition: no data was written, but the arrow writer may have written
531        // a partial file header. Drop the writer and delete any partial file.
532        drop(arrow_writer);
533
534        logger
535            .write_log_entry(format!("creating empty partition record for {desc}"))
536            .await
537            .with_context(|| "writing log entry")?;
538
539        // Try to delete any partial file that may have been created
540        // (ignore errors - file may not exist if no header was written)
541        let path = object_store::path::Path::from(file_path.as_str());
542        let _ = object_store.delete(&path).await;
543
544        Ok(PartitionWriteResult {
545            num_rows: 0,
546            file_metadata: None,
547            file_path: None,
548            file_size: 0,
549            event_time_range: None,
550        })
551    }
552}
553
554/// Writes a partition to a Parquet file from a stream of `PartitionRowSet`s.
555pub async fn write_partition_from_rows(
556    lake: Arc<DataLakeConnection>,
557    view_metadata: ViewMetadata,
558    file_schema: Arc<Schema>,
559    insert_range: TimeRange,
560    source_data_hash: Vec<u8>,
561    mut rb_stream: Receiver<PartitionRowSet>,
562    logger: Arc<dyn Logger>,
563) -> Result<()> {
564    let file_id = uuid::Uuid::new_v4();
565    let file_path = format!(
566        "views/{}/{}/{}/{}_{file_id}.parquet",
567        &view_metadata.view_set_name,
568        &view_metadata.view_instance_id,
569        insert_range.begin.format("%Y-%m-%d"),
570        insert_range.begin.format("%H-%M-%S")
571    );
572    let byte_counter = Arc::new(AtomicI64::new(0));
573    let object_store_writer = AsyncParquetWriter::new(
574        BufWriter::new(
575            lake.blob_storage.inner(),
576            object_store::path::Path::parse(&file_path).with_context(|| "parsing path")?,
577        )
578        .with_max_concurrency(2),
579        byte_counter.clone(),
580    );
581
582    // Configure writer with page-level statistics enabled (default in Arrow 57.0+)
583    // This ensures ColumnIndex with proper null_pages field is written for DataFusion 51+ compatibility
584    let props = WriterProperties::builder()
585        .set_writer_version(WriterVersion::PARQUET_2_0)
586        .set_compression(Compression::LZ4_RAW)
587        // Explicitly enable page-level statistics for clarity (this is the default in Arrow 57.0+)
588        // This generates ColumnIndex structures with proper null_pages field
589        .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
590        .build();
591    let mut arrow_writer =
592        AsyncArrowWriter::try_new(object_store_writer, file_schema.clone(), Some(props))
593            .with_context(|| "allocating async arrow writer")?;
594
595    let desc = format!(
596        "[{}, {}] {} {}",
597        view_metadata.view_set_name,
598        view_metadata.view_instance_id,
599        insert_range.begin.to_rfc3339(),
600        insert_range.end.to_rfc3339()
601    );
602
603    // Write rows and track event time ranges
604    let event_time_range =
605        write_rows_and_track_times(&mut rb_stream, &mut arrow_writer, &logger, &desc).await?;
606
607    // Finalize the write (close file or create empty metadata)
608    let result = finalize_partition_write(
609        event_time_range,
610        arrow_writer,
611        file_path,
612        &byte_counter,
613        &logger,
614        &desc,
615        lake.blob_storage.inner(),
616    )
617    .await?;
618
619    insert_partition(
620        &lake,
621        &Partition {
622            view_metadata,
623            insert_time_range: insert_range,
624            event_time_range: result.event_time_range,
625            updated: sqlx::types::chrono::Utc::now(),
626            file_path: result.file_path,
627            file_size: result.file_size,
628            source_data_hash,
629            num_rows: result.num_rows,
630        },
631        result.file_metadata.as_ref(),
632        logger,
633    )
634    .await
635    .with_context(|| "insert_partition")?;
636    Ok(())
637}