micromegas_analytics/
delete.rs1use anyhow::{Context, Result};
2use chrono::{DateTime, Days, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::{Row, query};
6use uuid::Uuid;
7
8use crate::lakehouse::write_partition::retire_expired_partitions;
9
10#[span_fn]
13pub async fn delete_expired_blocks_batch(
14 lake: &DataLakeConnection,
15 expiration: DateTime<Utc>,
16) -> Result<bool> {
17 let batch_size: i32 = 1000;
18 let mut transaction = lake.db_pool.begin().await?;
19 let rows = query(
20 "DELETE FROM blocks
21 WHERE block_id IN (
22 SELECT block_id FROM blocks WHERE insert_time <= $1 LIMIT $2
23 )
24 RETURNING process_id, stream_id, block_id;",
25 )
26 .bind(expiration)
27 .bind(batch_size)
28 .fetch_all(&mut *transaction)
29 .await?;
30 if rows.is_empty() {
31 return Ok(false);
32 }
33 let mut paths = vec![];
34 for r in &rows {
35 let process_id: Uuid = r.try_get("process_id")?;
36 let stream_id: Uuid = r.try_get("stream_id")?;
37 let block_id: Uuid = r.try_get("block_id")?;
38 let path = format!("blobs/{process_id}/{stream_id}/{block_id}");
39 paths.push(path);
40 }
41 lake.blob_storage.delete_batch(&paths).await?;
42 transaction.commit().await.with_context(|| "commit")?;
43 for path in &paths {
44 debug!("deleted blob {path}");
45 }
46 info!("deleted {} blocks", paths.len());
47 Ok(paths.len() == batch_size as usize)
48}
49
50#[span_fn]
52pub async fn delete_expired_blocks(
53 lake: &DataLakeConnection,
54 expiration: DateTime<Utc>,
55) -> Result<()> {
56 while delete_expired_blocks_batch(lake, expiration).await? {}
57 Ok(())
58}
59
60#[span_fn]
63pub async fn delete_empty_streams_batch(
64 lake: &DataLakeConnection,
65 expiration: DateTime<Utc>,
66) -> Result<bool> {
67 let batch_size: i32 = 1000;
68 let rows = query(
69 "WITH batch AS (
70 SELECT stream_id FROM streams
71 WHERE insert_time <= $1
72 AND NOT EXISTS (SELECT 1 FROM blocks WHERE blocks.stream_id = streams.stream_id LIMIT 1)
73 LIMIT $2
74 )
75 DELETE FROM streams
76 WHERE stream_id IN (SELECT stream_id FROM batch)
77 RETURNING stream_id;",
78 )
79 .bind(expiration)
80 .bind(batch_size)
81 .fetch_all(&lake.db_pool)
82 .await?;
83 if rows.is_empty() {
84 return Ok(false);
85 }
86 for r in &rows {
87 let stream_id: Uuid = r.try_get("stream_id")?;
88 debug!("deleted stream {stream_id}");
89 }
90 let count = rows.len();
91 info!("deleted {count} empty streams");
92 Ok(count == batch_size as usize)
93}
94
95#[span_fn]
97pub async fn delete_empty_streams(
98 lake: &DataLakeConnection,
99 expiration: DateTime<Utc>,
100) -> Result<()> {
101 while delete_empty_streams_batch(lake, expiration).await? {}
102 Ok(())
103}
104
105#[span_fn]
108pub async fn delete_empty_processes_batch(
109 lake: &DataLakeConnection,
110 expiration: DateTime<Utc>,
111) -> Result<bool> {
112 let batch_size: i32 = 1000;
113 let rows = query(
114 "WITH batch AS (
115 SELECT process_id FROM processes
116 WHERE insert_time <= $1
117 AND NOT EXISTS (SELECT 1 FROM streams WHERE streams.process_id = processes.process_id LIMIT 1)
118 LIMIT $2
119 )
120 DELETE FROM processes
121 WHERE process_id IN (SELECT process_id FROM batch)
122 RETURNING process_id;",
123 )
124 .bind(expiration)
125 .bind(batch_size)
126 .fetch_all(&lake.db_pool)
127 .await?;
128 if rows.is_empty() {
129 return Ok(false);
130 }
131 for r in &rows {
132 let process_id: Uuid = r.try_get("process_id")?;
133 debug!("deleted process {process_id}");
134 }
135 let count = rows.len();
136 info!("deleted {count} empty processes");
137 Ok(count == batch_size as usize)
138}
139
140#[span_fn]
142pub async fn delete_empty_processes(
143 lake: &DataLakeConnection,
144 expiration: DateTime<Utc>,
145) -> Result<()> {
146 while delete_empty_processes_batch(lake, expiration).await? {}
147 Ok(())
148}
149
150#[span_fn]
152pub async fn delete_old_data(lake: &DataLakeConnection, min_days_old: i32) -> Result<()> {
153 let now = Utc::now();
154 let expiration = now
155 .checked_sub_days(Days::new(min_days_old as u64))
156 .with_context(|| "computing expiration date/time")?;
157 delete_expired_blocks(lake, expiration)
158 .await
159 .with_context(|| "delete_expired_blocks")?;
160 delete_empty_streams(lake, expiration)
161 .await
162 .with_context(|| "delete_empty_streams")?;
163 delete_empty_processes(lake, expiration)
164 .await
165 .with_context(|| "delete_empty_processes")?;
166 retire_expired_partitions(lake, expiration)
167 .await
168 .with_context(|| "retire_expired_partitions")?;
169 Ok(())
170}