micromegas_analytics/
delete.rs1use anyhow::{Context, Result};
2use chrono::{DateTime, Days, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::{Row, query};
6use uuid::Uuid;
7
8use crate::lakehouse::write_partition::retire_expired_partitions;
9
10pub async fn delete_expired_blocks_batch(
13 lake: &DataLakeConnection,
14 expiration: DateTime<Utc>,
15) -> Result<bool> {
16 let batch_size: i32 = 1000;
17 let rows = query(
18 "SELECT process_id, stream_id, block_id
19 FROM blocks
20 WHERE blocks.insert_time <= $1
21 LIMIT $2;",
22 )
23 .bind(expiration)
24 .bind(batch_size)
25 .fetch_all(&lake.db_pool)
26 .await?;
27 let mut paths = vec![];
28 let mut block_ids = vec![];
29 for r in rows {
30 let process_id: Uuid = r.try_get("process_id")?;
31 let stream_id: Uuid = r.try_get("stream_id")?;
32 let block_id: Uuid = r.try_get("block_id")?;
33 let path = format!("blobs/{process_id}/{stream_id}/{block_id}");
34 paths.push(path);
35 block_ids.push(block_id);
36 }
37 info!("deleting {:?}", &paths);
38 lake.blob_storage.delete_batch(&paths).await?;
39 query("DELETE FROM blocks where block_id = ANY($1);")
40 .bind(block_ids)
41 .execute(&lake.db_pool)
42 .await?;
43 Ok(paths.len() == batch_size as usize)
44}
45
46pub async fn delete_expired_blocks(
48 lake: &DataLakeConnection,
49 expiration: DateTime<Utc>,
50) -> Result<()> {
51 while delete_expired_blocks_batch(lake, expiration).await? {}
52 Ok(())
53}
54
55pub async fn delete_empty_streams_batch(
58 lake: &DataLakeConnection,
59 expiration: DateTime<Utc>,
60) -> Result<bool> {
61 let batch_size: i32 = 1000;
62 let rows = query(
64 "SELECT streams.stream_id
65 FROM streams
66 WHERE streams.insert_time <= $1
67 AND NOT EXISTS
68 (
69 SELECT 1
70 FROM blocks
71 WHERE blocks.stream_id = streams.stream_id
72 LIMIT 1
73 )
74 LIMIT $2
75 ;",
76 )
77 .bind(expiration)
78 .bind(batch_size)
79 .fetch_all(&lake.db_pool)
80 .await?;
81 let mut stream_ids = vec![];
82 for r in rows {
83 let stream_id: Uuid = r.try_get("stream_id")?;
84 stream_ids.push(stream_id);
85 }
86
87 info!("deleting expired streams {stream_ids:?}");
88 query("DELETE FROM streams where stream_id = ANY($1);")
89 .bind(&stream_ids)
90 .execute(&lake.db_pool)
91 .await?;
92
93 Ok(stream_ids.len() == batch_size as usize)
94}
95
96pub async fn delete_empty_streams(
98 lake: &DataLakeConnection,
99 expiration: DateTime<Utc>,
100) -> Result<()> {
101 while delete_empty_streams_batch(lake, expiration).await? {}
102 Ok(())
103}
104
105pub async fn delete_empty_processes_batch(
108 lake: &DataLakeConnection,
109 expiration: DateTime<Utc>,
110) -> Result<bool> {
111 let batch_size: i32 = 1000;
112 let rows = query(
115 "SELECT processes.process_id
116 FROM processes
117 LEFT OUTER JOIN streams ON streams.process_id = processes.process_id
118 WHERE processes.insert_time <= $1
119 GROUP BY processes.process_id
120 HAVING count(streams.stream_id) = 0
121 LIMIT $2;",
122 )
123 .bind(expiration)
124 .bind(batch_size)
125 .fetch_all(&lake.db_pool)
126 .await?;
127 let mut process_ids = vec![];
128 for r in rows {
129 let process_id: Uuid = r.try_get("process_id")?;
130 process_ids.push(process_id);
131 }
132
133 info!("deleting expired processes {process_ids:?}");
134 query("DELETE FROM processes where process_id = ANY($1);")
135 .bind(&process_ids)
136 .execute(&lake.db_pool)
137 .await?;
138
139 Ok(process_ids.len() == batch_size as usize)
140}
141
142pub async fn delete_empty_processes(
144 lake: &DataLakeConnection,
145 expiration: DateTime<Utc>,
146) -> Result<()> {
147 while delete_empty_processes_batch(lake, expiration).await? {}
148 Ok(())
149}
150
151pub async fn delete_old_data(lake: &DataLakeConnection, min_days_old: i32) -> Result<()> {
153 let now = Utc::now();
154 let expiration = now
155 .checked_sub_days(Days::new(min_days_old as u64))
156 .with_context(|| "computing expiration date/time")?;
157 delete_expired_blocks(lake, expiration)
158 .await
159 .with_context(|| "delete_expired_blocks")?;
160 delete_empty_streams(lake, expiration)
161 .await
162 .with_context(|| "delete_empty_streams")?;
163 delete_empty_processes(lake, expiration)
164 .await
165 .with_context(|| "delete_empty_processes")?;
166 retire_expired_partitions(lake, expiration)
167 .await
168 .with_context(|| "retire_expired_partitions")?;
169 Ok(())
170}