micromegas_analytics/lakehouse/
write_partition.rs

1use crate::{
2    arrow_utils::serialize_parquet_metadata, lakehouse::async_parquet_writer::AsyncParquetWriter,
3    response_writer::Logger, time::TimeRange,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, TimeDelta, Utc};
7use datafusion::{
8    arrow::{array::RecordBatch, datatypes::Schema},
9    parquet::{
10        arrow::AsyncArrowWriter,
11        basic::Compression,
12        file::{
13            metadata::ParquetMetaData,
14            properties::{WriterProperties, WriterVersion},
15        },
16    },
17};
18use micromegas_ingestion::data_lake_connection::DataLakeConnection;
19use micromegas_tracing::prelude::*;
20use object_store::ObjectStoreExt;
21use object_store::buffered::BufWriter;
22use sqlx::Row;
23use std::collections::hash_map::DefaultHasher;
24use std::hash::{Hash, Hasher};
25use std::sync::{Arc, atomic::AtomicI64};
26use tokio::sync::mpsc::Receiver;
27
28use super::{partition::Partition, partition_source_data, view::ViewMetadata};
29
30/// Adds a file to the temporary_files table for cleanup.
31///
32/// Files added to temporary_files will be automatically deleted by the cleanup process
33/// after the expiration time. The default expiration is 1 hour from now.
34pub async fn add_file_for_cleanup(
35    transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
36    file_path: &str,
37    file_size: i64,
38) -> Result<()> {
39    let expiration = Utc::now()
40        + TimeDelta::try_hours(1)
41            .with_context(|| "calculating expiration time for temporary file")?;
42
43    sqlx::query("INSERT INTO temporary_files VALUES ($1, $2, $3)")
44        .bind(file_path)
45        .bind(file_size)
46        .bind(expiration)
47        .execute(&mut **transaction)
48        .await
49        .with_context(|| format!("adding file {file_path} to temporary files for cleanup"))?;
50
51    Ok(())
52}
53
54/// A set of rows for a partition, along with their time range.
55pub struct PartitionRowSet {
56    pub rows_time_range: TimeRange,
57    pub rows: RecordBatch,
58}
59
60impl PartitionRowSet {
61    pub fn new(rows_time_range: TimeRange, rows: RecordBatch) -> Self {
62        Self {
63            rows_time_range,
64            rows,
65        }
66    }
67}
68
69#[span_fn]
70async fn retire_expired_partitions_batch(
71    lake: &DataLakeConnection,
72    expiration: DateTime<Utc>,
73) -> Result<bool> {
74    let batch_size: i32 = 1000;
75    let mut transaction = lake.db_pool.begin().await?;
76    let rows = sqlx::query(
77        "DELETE FROM lakehouse_partitions
78         WHERE (view_set_name, view_instance_id, begin_insert_time, end_insert_time) IN (
79             SELECT view_set_name, view_instance_id, begin_insert_time, end_insert_time
80             FROM lakehouse_partitions
81             WHERE end_insert_time < $1
82             LIMIT $2
83         )
84         RETURNING file_path, file_size;",
85    )
86    .bind(expiration)
87    .bind(batch_size)
88    .fetch_all(&mut *transaction)
89    .await
90    .with_context(|| "deleting expired partitions batch")?;
91
92    if rows.is_empty() {
93        return Ok(false);
94    }
95    let count = rows.len();
96    for row in &rows {
97        let file_path: Option<String> = row.try_get("file_path")?;
98        let file_size: i64 = row.try_get("file_size")?;
99        if let Some(path) = file_path {
100            debug!("retiring expired partition file {path} ({file_size} bytes)");
101            add_file_for_cleanup(&mut transaction, &path, file_size).await?;
102        }
103    }
104    transaction.commit().await.with_context(|| "commit")?;
105    info!("retired {count} expired partitions");
106    Ok(count == batch_size as usize)
107}
108
109#[span_fn]
110pub async fn retire_expired_partitions(
111    lake: &DataLakeConnection,
112    expiration: DateTime<Utc>,
113) -> Result<()> {
114    while retire_expired_partitions_batch(lake, expiration).await? {}
115    Ok(())
116}
117
118/// Retires partitions from the active set.
119/// Overlap is determined by the insert_time of the telemetry.
120pub async fn retire_partitions(
121    transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
122    view_set_name: &str,
123    view_instance_id: &str,
124    begin_insert_time: DateTime<Utc>,
125    end_insert_time: DateTime<Utc>,
126    logger: Arc<dyn Logger>,
127) -> Result<()> {
128    // this is not an overlap test, we need to assume that we are not making a new smaller partition
129    // where a bigger one existed
130    // its gets tricky in the jit case where a partition can have only one block and begin_insert == end_insert
131
132    //todo: use DELETE+RETURNING
133    let old_partitions = if begin_insert_time == end_insert_time {
134        // For identical timestamps, look for exact matches to handle single-block partitions
135        sqlx::query(
136            "SELECT file_path, file_size
137             FROM lakehouse_partitions
138             WHERE view_set_name = $1
139             AND view_instance_id = $2
140             AND begin_insert_time = $3
141             AND end_insert_time = $3
142             ;",
143        )
144        .bind(view_set_name)
145        .bind(view_instance_id)
146        .bind(begin_insert_time)
147        .fetch_all(&mut **transaction)
148        .await
149        .with_context(|| "listing old partitions (exact match)")?
150    } else {
151        // For time ranges, use inclusive inequalities
152        sqlx::query(
153            "SELECT file_path, file_size
154             FROM lakehouse_partitions
155             WHERE view_set_name = $1
156             AND view_instance_id = $2
157             AND begin_insert_time >= $3
158             AND end_insert_time <= $4
159             ;",
160        )
161        .bind(view_set_name)
162        .bind(view_instance_id)
163        .bind(begin_insert_time)
164        .bind(end_insert_time)
165        .fetch_all(&mut **transaction)
166        .await
167        .with_context(|| "listing old partitions (range)")?
168    };
169
170    // LOG: Found partitions for retirement (only if any found)
171    if !old_partitions.is_empty() {
172        logger
173            .write_log_entry(format!(
174                "[RETIRE_FOUND] view={}/{} time_range=[{}, {}] found_partitions={}",
175                view_set_name,
176                view_instance_id,
177                begin_insert_time,
178                end_insert_time,
179                old_partitions.len()
180            ))
181            .await?;
182    }
183
184    let mut file_paths = Vec::new();
185    for old_part in &old_partitions {
186        let file_path: Option<String> = old_part.try_get("file_path")?;
187        let file_size: i64 = old_part.try_get("file_size")?;
188        if let Some(path) = file_path {
189            logger
190                .write_log_entry(format!(
191                    "adding out of date partition {path} to temporary files to be deleted"
192                ))
193                .await?;
194            add_file_for_cleanup(transaction, &path, file_size).await?;
195            file_paths.push(path);
196        }
197    }
198
199    if begin_insert_time == end_insert_time {
200        // For identical timestamps, delete exact matches to handle single-block partitions
201        sqlx::query(
202            "DELETE from lakehouse_partitions
203             WHERE view_set_name = $1
204             AND view_instance_id = $2
205             AND begin_insert_time = $3
206             AND end_insert_time = $3
207             ;",
208        )
209        .bind(view_set_name)
210        .bind(view_instance_id)
211        .bind(begin_insert_time)
212        .execute(&mut **transaction)
213        .await
214        .with_context(|| "deleting out of date partitions (exact match)")?
215    } else {
216        // For time ranges, use inclusive inequalities
217        sqlx::query(
218            "DELETE from lakehouse_partitions
219             WHERE view_set_name = $1
220             AND view_instance_id = $2
221             AND begin_insert_time >= $3
222             AND end_insert_time <= $4
223             ;",
224        )
225        .bind(view_set_name)
226        .bind(view_instance_id)
227        .bind(begin_insert_time)
228        .bind(end_insert_time)
229        .execute(&mut **transaction)
230        .await
231        .with_context(|| "deleting out of date partitions (range)")?
232    };
233    Ok(())
234}
235
236/// Generate a deterministic advisory lock key for a partition
237fn generate_partition_lock_key(
238    view_set_name: &str,
239    view_instance_id: &str,
240    begin_insert_time: DateTime<Utc>,
241    end_insert_time: DateTime<Utc>,
242) -> i64 {
243    let mut hasher = DefaultHasher::new();
244    view_set_name.hash(&mut hasher);
245    view_instance_id.hash(&mut hasher);
246    begin_insert_time.hash(&mut hasher);
247    end_insert_time.hash(&mut hasher);
248    hasher.finish() as i64
249}
250
251async fn insert_partition(
252    lake: &DataLakeConnection,
253    partition: &Partition,
254    file_metadata: Option<&Arc<ParquetMetaData>>,
255    logger: Arc<dyn Logger>,
256) -> Result<()> {
257    // Generate deterministic lock key for this partition
258    let lock_key = generate_partition_lock_key(
259        &partition.view_metadata.view_set_name,
260        &partition.view_metadata.view_instance_id,
261        partition.begin_insert_time(),
262        partition.end_insert_time(),
263    );
264
265    let mut transaction = lake.db_pool.begin().await?;
266
267    debug!(
268        "[PARTITION_LOCK] view={}/{} time_range=[{}, {}] lock_key={} - acquiring advisory lock",
269        &partition.view_metadata.view_set_name,
270        &partition.view_metadata.view_instance_id,
271        partition.begin_insert_time(),
272        partition.end_insert_time(),
273        lock_key
274    );
275
276    // Acquire advisory lock - this will block until we can proceed
277    // pg_advisory_xact_lock automatically releases when transaction ends
278    sqlx::query("SELECT pg_advisory_xact_lock($1);")
279        .bind(lock_key)
280        .execute(&mut *transaction)
281        .await
282        .with_context(|| "acquiring advisory lock")?;
283
284    // Decode source_data_hash back to the row count (it's stored as i64 little-endian bytes)
285    let source_row_count = partition_source_data::hash_to_object_count(&partition.source_data_hash)
286        .with_context(|| "decoding source_data_hash to row count")?;
287
288    // LOG: Lock acquired, starting partition write
289    logger
290        .write_log_entry(format!(
291            "[PARTITION_WRITE_START] view={}/{} time_range=[{}, {}] source_rows={} - lock acquired",
292            &partition.view_metadata.view_set_name,
293            &partition.view_metadata.view_instance_id,
294            partition.begin_insert_time(),
295            partition.end_insert_time(),
296            source_row_count
297        ))
298        .await?;
299
300    // for jit partitions, we assume that the blocks were registered in order
301    // since they are built based on begin_ticks, not insert_time
302    retire_partitions(
303        &mut transaction,
304        &partition.view_metadata.view_set_name,
305        &partition.view_metadata.view_instance_id,
306        partition.begin_insert_time(),
307        partition.end_insert_time(),
308        logger.clone(),
309    )
310    .await
311    .with_context(|| "retire_partitions")?;
312
313    debug!(
314        "[PARTITION_INSERT_ATTEMPT] view={}/{} time_range=[{}, {}] source_rows={} file_path={:?}",
315        &partition.view_metadata.view_set_name,
316        &partition.view_metadata.view_instance_id,
317        partition.begin_insert_time(),
318        partition.end_insert_time(),
319        source_row_count,
320        partition.file_path
321    );
322
323    // Insert the parquet metadata into the dedicated metadata table within the same transaction
324    // Only insert metadata if partition has a file (not empty)
325    if let (Some(file_path), Some(metadata)) = (&partition.file_path, file_metadata) {
326        let metadata_bytes = serialize_parquet_metadata(metadata)
327            .with_context(|| "serializing parquet metadata for dedicated table")?;
328        let insert_time = sqlx::types::chrono::Utc::now();
329
330        sqlx::query(
331            "INSERT INTO partition_metadata (file_path, metadata, insert_time, partition_format_version)
332             VALUES ($1, $2, $3, 2)",
333        )
334        .bind(file_path)
335        .bind(metadata_bytes.as_ref())
336        .bind(insert_time)
337        .execute(&mut *transaction)
338        .await
339        .with_context(|| format!("inserting metadata for file: {}", file_path))?;
340    }
341
342    // Insert the new partition with format version 2 (Arrow 57.0)
343    let insert_result = sqlx::query(
344        "INSERT INTO lakehouse_partitions VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 2);",
345    )
346    .bind(&*partition.view_metadata.view_set_name)
347    .bind(&*partition.view_metadata.view_instance_id)
348    .bind(partition.begin_insert_time())
349    .bind(partition.end_insert_time())
350    .bind(partition.min_event_time())
351    .bind(partition.max_event_time())
352    .bind(partition.updated)
353    .bind(&partition.file_path)
354    .bind(partition.file_size)
355    .bind(&partition.view_metadata.file_schema_hash)
356    .bind(&partition.source_data_hash)
357    .bind(partition.num_rows)
358    .execute(&mut *transaction)
359    .await;
360
361    match insert_result {
362        Ok(_) => {
363            debug!(
364                "[PARTITION_INSERT_SUCCESS] view={}/{} time_range=[{}, {}] source_rows={}",
365                &partition.view_metadata.view_set_name,
366                &partition.view_metadata.view_instance_id,
367                partition.begin_insert_time(),
368                partition.end_insert_time(),
369                source_row_count
370            );
371        }
372        Err(ref e) => {
373            logger
374                .write_log_entry(format!(
375                    "[PARTITION_INSERT_ERROR] view={}/{} time_range=[{}, {}] source_rows={} error={}",
376                    &partition.view_metadata.view_set_name,
377                    &partition.view_metadata.view_instance_id,
378                    partition.begin_insert_time(),
379                    partition.end_insert_time(),
380                    source_row_count,
381                    e
382                ))
383                .await?;
384            return Err(insert_result.unwrap_err().into());
385        }
386    };
387
388    // Commit the transaction (this also releases the advisory lock)
389    transaction.commit().await.with_context(|| "commit")?;
390
391    info!(
392        "[PARTITION_WRITE_COMMIT] view={}/{} time_range=[{}, {}] file_path={:?} - lock released",
393        &partition.view_metadata.view_set_name,
394        &partition.view_metadata.view_instance_id,
395        partition.begin_insert_time(),
396        partition.end_insert_time(),
397        partition.file_path
398    );
399    Ok(())
400}
401
402/// Result of writing rows to a partition file.
403struct PartitionWriteResult {
404    num_rows: i64,
405    file_metadata: Option<Arc<ParquetMetaData>>,
406    file_path: Option<String>,
407    file_size: i64,
408    event_time_range: Option<TimeRange>,
409}
410
411/// Writes rows from the stream and tracks event time ranges.
412pub async fn write_rows_and_track_times(
413    rb_stream: &mut Receiver<Result<PartitionRowSet, anyhow::Error>>,
414    arrow_writer: &mut AsyncArrowWriter<AsyncParquetWriter>,
415    logger: &Arc<dyn Logger>,
416    desc: &str,
417) -> Result<Option<TimeRange>> {
418    let mut min_event_time: Option<DateTime<Utc>> = None;
419    let mut max_event_time: Option<DateTime<Utc>> = None;
420    let mut write_progression = 0;
421
422    while let Some(msg) = rb_stream.recv().await {
423        let row_set = msg?;
424        min_event_time = Some(
425            min_event_time
426                .unwrap_or(row_set.rows_time_range.begin)
427                .min(row_set.rows_time_range.begin),
428        );
429        max_event_time = Some(
430            max_event_time
431                .unwrap_or(row_set.rows_time_range.end)
432                .max(row_set.rows_time_range.end),
433        );
434        arrow_writer
435            .write(&row_set.rows)
436            .await
437            .with_context(|| "arrow_writer.write")?;
438        if arrow_writer.in_progress_size() > 100 * 1024 * 1024 {
439            arrow_writer
440                .flush()
441                .await
442                .with_context(|| "arrow_writer.flush")?;
443        }
444
445        // Log progress every 10MB to avoid spamming and prevent idle timeout
446        let progression = arrow_writer.bytes_written() / (10 * 1024 * 1024);
447        if progression != write_progression {
448            write_progression = progression;
449            let written = arrow_writer.bytes_written();
450            logger
451                .write_log_entry(format!("{desc}: written {written} bytes"))
452                .await
453                .with_context(|| "writing log entry")?;
454        }
455    }
456
457    Ok(match (min_event_time, max_event_time) {
458        (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
459        _ => None,
460    })
461}
462
463/// Finalizes the partition write, closing the file and creating metadata.
464async fn finalize_partition_write(
465    event_time_range: Option<TimeRange>,
466    arrow_writer: AsyncArrowWriter<AsyncParquetWriter>,
467    file_path: String,
468    byte_counter: &Arc<AtomicI64>,
469    logger: &Arc<dyn Logger>,
470    desc: &str,
471    object_store: Arc<dyn object_store::ObjectStore>,
472) -> Result<PartitionWriteResult> {
473    if let Some(event_time_range) = event_time_range {
474        // Potentially non-empty partition: close the file and get metadata
475        let close_result = arrow_writer.close().await;
476
477        match close_result {
478            Ok(parquet_metadata) => {
479                let num_rows = parquet_metadata.file_metadata().num_rows();
480
481                // Check if the file actually contains rows
482                // Even if we tracked event times, the file might be empty
483                if num_rows == 0 {
484                    // File contains no rows - treat as empty partition
485                    logger
486                        .write_log_entry(format!(
487                            "created 0-row file, treating as empty partition for {desc}"
488                        ))
489                        .await
490                        .with_context(|| "writing log entry")?;
491
492                    // Delete the empty file
493                    let path = object_store::path::Path::from(file_path.as_str());
494                    if let Err(delete_err) = object_store.delete(&path).await {
495                        warn!("failed to delete empty file {}: {}", file_path, delete_err);
496                    }
497
498                    return Ok(PartitionWriteResult {
499                        num_rows: 0,
500                        file_metadata: None,
501                        file_path: None,
502                        file_size: 0,
503                        event_time_range: None,
504                    });
505                }
506
507                // Non-empty file: keep it and return full metadata
508                debug!(
509                    "wrote nb_rows={} size={} path={file_path}",
510                    num_rows,
511                    byte_counter.load(std::sync::atomic::Ordering::Relaxed)
512                );
513                let file_metadata = Arc::new(parquet_metadata);
514                let file_size = byte_counter.load(std::sync::atomic::Ordering::Relaxed);
515                Ok(PartitionWriteResult {
516                    num_rows,
517                    file_metadata: Some(file_metadata),
518                    file_path: Some(file_path),
519                    file_size,
520                    event_time_range: Some(event_time_range),
521                })
522            }
523            Err(e) => {
524                // Close failed - try to delete any partial file that may have been written
525                warn!(
526                    "arrow_writer.close failed, attempting to delete partial file: {}",
527                    file_path
528                );
529                let path = object_store::path::Path::from(file_path.as_str());
530                if let Err(delete_err) = object_store.delete(&path).await {
531                    warn!(
532                        "failed to delete partial file {}: {}",
533                        file_path, delete_err
534                    );
535                }
536                Err(e).with_context(|| "arrow_writer.close")
537            }
538        }
539    } else {
540        // Empty partition: no data was written, but the arrow writer may have written
541        // a partial file header. Drop the writer and delete any partial file.
542        drop(arrow_writer);
543
544        logger
545            .write_log_entry(format!("creating empty partition record for {desc}"))
546            .await
547            .with_context(|| "writing log entry")?;
548
549        // Try to delete any partial file that may have been created
550        // (ignore errors - file may not exist if no header was written)
551        let path = object_store::path::Path::from(file_path.as_str());
552        let _ = object_store.delete(&path).await;
553
554        Ok(PartitionWriteResult {
555            num_rows: 0,
556            file_metadata: None,
557            file_path: None,
558            file_size: 0,
559            event_time_range: None,
560        })
561    }
562}
563
564/// Writes a partition to a Parquet file from a stream of `PartitionRowSet`s.
565pub async fn write_partition_from_rows(
566    lake: Arc<DataLakeConnection>,
567    view_metadata: ViewMetadata,
568    file_schema: Arc<Schema>,
569    insert_range: TimeRange,
570    source_data_hash: Vec<u8>,
571    mut rb_stream: Receiver<Result<PartitionRowSet, anyhow::Error>>,
572    logger: Arc<dyn Logger>,
573) -> Result<()> {
574    let file_id = uuid::Uuid::new_v4();
575    let file_path = format!(
576        "views/{}/{}/{}/{}_{file_id}.parquet",
577        &view_metadata.view_set_name,
578        &view_metadata.view_instance_id,
579        insert_range.begin.format("%Y-%m-%d"),
580        insert_range.begin.format("%H-%M-%S")
581    );
582    let byte_counter = Arc::new(AtomicI64::new(0));
583    let object_store_writer = AsyncParquetWriter::new(
584        BufWriter::new(
585            lake.blob_storage.inner(),
586            object_store::path::Path::parse(&file_path).with_context(|| "parsing path")?,
587        )
588        .with_max_concurrency(2),
589        byte_counter.clone(),
590    );
591
592    // Configure writer with page-level statistics enabled (default in Arrow 57.0+)
593    // This ensures ColumnIndex with proper null_pages field is written for DataFusion 51+ compatibility
594    let props = WriterProperties::builder()
595        .set_writer_version(WriterVersion::PARQUET_2_0)
596        .set_compression(Compression::LZ4_RAW)
597        // Explicitly enable page-level statistics for clarity (this is the default in Arrow 57.0+)
598        // This generates ColumnIndex structures with proper null_pages field
599        .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
600        .build();
601    let mut arrow_writer =
602        AsyncArrowWriter::try_new(object_store_writer, file_schema.clone(), Some(props))
603            .with_context(|| "allocating async arrow writer")?;
604
605    let desc = format!(
606        "[{}, {}] {} {}",
607        view_metadata.view_set_name,
608        view_metadata.view_instance_id,
609        insert_range.begin.to_rfc3339(),
610        insert_range.end.to_rfc3339()
611    );
612
613    // Write rows and track event time ranges
614    let event_time_range =
615        match write_rows_and_track_times(&mut rb_stream, &mut arrow_writer, &logger, &desc).await {
616            Ok(range) => range,
617            Err(e) => {
618                // The writer is dropped without close/abort on this error path, which can
619                // leave already-uploaded multipart data orphaned in object storage. Delete
620                // any partial file before propagating the error (mirror finalize cleanup).
621                drop(arrow_writer);
622                warn!(
623                    "write_rows_and_track_times failed, attempting to delete partial file: {}",
624                    file_path
625                );
626                let path = object_store::path::Path::from(file_path.as_str());
627                if let Err(delete_err) = lake.blob_storage.inner().delete(&path).await {
628                    warn!(
629                        "failed to delete partial file {}: {}",
630                        file_path, delete_err
631                    );
632                }
633                return Err(e).with_context(|| "write_rows_and_track_times");
634            }
635        };
636
637    // Finalize the write (close file or create empty metadata)
638    let result = finalize_partition_write(
639        event_time_range,
640        arrow_writer,
641        file_path,
642        &byte_counter,
643        &logger,
644        &desc,
645        lake.blob_storage.inner(),
646    )
647    .await?;
648
649    insert_partition(
650        &lake,
651        &Partition {
652            view_metadata,
653            insert_time_range: insert_range,
654            event_time_range: result.event_time_range,
655            updated: sqlx::types::chrono::Utc::now(),
656            file_path: result.file_path,
657            file_size: result.file_size,
658            source_data_hash,
659            num_rows: result.num_rows,
660        },
661        result.file_metadata.as_ref(),
662        logger,
663    )
664    .await
665    .with_context(|| "insert_partition")?;
666    Ok(())
667}