micromegas_analytics/lakehouse/
temp.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::Row;
6use std::sync::Arc;
7
8use super::partition_metadata::delete_partition_metadata_batch;
9
10/// Deletes expired temporary files from the data lake.
11pub async fn delete_expired_temporary_files(lake: Arc<DataLakeConnection>) -> Result<()> {
12    let mut tr = lake.db_pool.begin().await?;
13    let now = Utc::now();
14    let rows = sqlx::query(
15        "DELETE FROM temporary_files
16         WHERE expiration < $1
17         RETURNING file_path;",
18    )
19    .bind(now)
20    .fetch_all(&mut *tr)
21    .await
22    .with_context(|| "listing expired temporary files")?;
23    let mut to_delete = vec![];
24    for r in rows {
25        let file_path: String = r.try_get("file_path")?;
26        info!("deleting expired file {file_path}");
27        to_delete.push(file_path);
28    }
29
30    // Delete metadata for expired temporary files
31    if !to_delete.is_empty() {
32        delete_partition_metadata_batch(&mut tr, &to_delete)
33            .await
34            .with_context(|| "deleting partition metadata for expired temporary files")?;
35    }
36
37    lake.blob_storage.delete_batch(&to_delete).await?;
38    tr.commit().await?;
39    Ok(())
40}