micromegas_analytics/lakehouse/
temp.rs1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::Row;
6use std::sync::Arc;
7
8use super::partition_metadata::delete_partition_metadata_batch;
9
10#[span_fn]
11async fn delete_expired_temporary_files_batch(
12 lake: &DataLakeConnection,
13 now: DateTime<Utc>,
14) -> Result<bool> {
15 let batch_size: i32 = 1000;
16 let mut tr = lake.db_pool.begin().await?;
17 let rows = sqlx::query(
18 "DELETE FROM temporary_files
19 WHERE file_path IN (
20 SELECT file_path FROM temporary_files
21 WHERE expiration < $1
22 LIMIT $2
23 )
24 RETURNING file_path;",
25 )
26 .bind(now)
27 .bind(batch_size)
28 .fetch_all(&mut *tr)
29 .await
30 .with_context(|| "deleting expired temporary files batch")?;
31
32 if rows.is_empty() {
33 return Ok(false);
34 }
35
36 let to_delete: Vec<String> = rows
37 .iter()
38 .map(|r| r.try_get("file_path"))
39 .collect::<Result<_, _>>()?;
40
41 for file_path in &to_delete {
42 debug!("deleting expired temporary file {file_path}");
43 }
44
45 delete_partition_metadata_batch(&mut tr, &to_delete)
46 .await
47 .with_context(|| "deleting partition metadata for expired temporary files")?;
48
49 lake.blob_storage.delete_batch(&to_delete).await?;
50 tr.commit().await?;
51 info!("deleted {} expired temporary files", to_delete.len());
52 Ok(true)
53}
54
55#[span_fn]
56pub async fn delete_expired_temporary_files(lake: Arc<DataLakeConnection>) -> Result<()> {
57 let now = Utc::now();
58 while delete_expired_temporary_files_batch(&lake, now).await? {}
59 Ok(())
60}