micromegas_analytics/lakehouse/
temp.rs1use anyhow::{Context, Result};
2use chrono::Utc;
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::Row;
6use std::sync::Arc;
7
8use super::partition_metadata::delete_partition_metadata_batch;
9
10pub async fn delete_expired_temporary_files(lake: Arc<DataLakeConnection>) -> Result<()> {
12 let mut tr = lake.db_pool.begin().await?;
13 let now = Utc::now();
14 let rows = sqlx::query(
15 "DELETE FROM temporary_files
16 WHERE expiration < $1
17 RETURNING file_path;",
18 )
19 .bind(now)
20 .fetch_all(&mut *tr)
21 .await
22 .with_context(|| "listing expired temporary files")?;
23 let mut to_delete = vec![];
24 for r in rows {
25 let file_path: String = r.try_get("file_path")?;
26 info!("deleting expired file {file_path}");
27 to_delete.push(file_path);
28 }
29
30 if !to_delete.is_empty() {
32 delete_partition_metadata_batch(&mut tr, &to_delete)
33 .await
34 .with_context(|| "deleting partition metadata for expired temporary files")?;
35 }
36
37 lake.blob_storage.delete_batch(&to_delete).await?;
38 tr.commit().await?;
39 Ok(())
40}