micromegas_analytics/lakehouse/
temp.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::Row;
6use std::sync::Arc;
7
8use super::partition_metadata::delete_partition_metadata_batch;
9
10#[span_fn]
11async fn delete_expired_temporary_files_batch(
12    lake: &DataLakeConnection,
13    now: DateTime<Utc>,
14) -> Result<bool> {
15    let batch_size: i32 = 1000;
16    let mut tr = lake.db_pool.begin().await?;
17    let rows = sqlx::query(
18        "DELETE FROM temporary_files
19         WHERE file_path IN (
20             SELECT file_path FROM temporary_files
21             WHERE expiration < $1
22             LIMIT $2
23         )
24         RETURNING file_path;",
25    )
26    .bind(now)
27    .bind(batch_size)
28    .fetch_all(&mut *tr)
29    .await
30    .with_context(|| "deleting expired temporary files batch")?;
31
32    if rows.is_empty() {
33        return Ok(false);
34    }
35
36    let to_delete: Vec<String> = rows
37        .iter()
38        .map(|r| r.try_get("file_path"))
39        .collect::<Result<_, _>>()?;
40
41    for file_path in &to_delete {
42        debug!("deleting expired temporary file {file_path}");
43    }
44
45    delete_partition_metadata_batch(&mut tr, &to_delete)
46        .await
47        .with_context(|| "deleting partition metadata for expired temporary files")?;
48
49    lake.blob_storage.delete_batch(&to_delete).await?;
50    tr.commit().await?;
51    info!("deleted {} expired temporary files", to_delete.len());
52    Ok(true)
53}
54
55#[span_fn]
56pub async fn delete_expired_temporary_files(lake: Arc<DataLakeConnection>) -> Result<()> {
57    let now = Utc::now();
58    while delete_expired_temporary_files_batch(&lake, now).await? {}
59    Ok(())
60}