micromegas_analytics/
delete.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Days, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::{Row, query};
6use uuid::Uuid;
7
8use crate::lakehouse::write_partition::retire_expired_partitions;
9
10/// Deletes a batch of expired blocks from the data lake.
11/// Returns `true` if there are more blocks to delete, `false` otherwise.
12pub async fn delete_expired_blocks_batch(
13    lake: &DataLakeConnection,
14    expiration: DateTime<Utc>,
15) -> Result<bool> {
16    let batch_size: i32 = 1000;
17    let rows = query(
18        "SELECT process_id, stream_id, block_id
19         FROM   blocks
20         WHERE  blocks.insert_time <= $1
21         LIMIT $2;",
22    )
23    .bind(expiration)
24    .bind(batch_size)
25    .fetch_all(&lake.db_pool)
26    .await?;
27    let mut paths = vec![];
28    let mut block_ids = vec![];
29    for r in rows {
30        let process_id: Uuid = r.try_get("process_id")?;
31        let stream_id: Uuid = r.try_get("stream_id")?;
32        let block_id: Uuid = r.try_get("block_id")?;
33        let path = format!("blobs/{process_id}/{stream_id}/{block_id}");
34        paths.push(path);
35        block_ids.push(block_id);
36    }
37    info!("deleting {:?}", &paths);
38    lake.blob_storage.delete_batch(&paths).await?;
39    query("DELETE FROM blocks where block_id = ANY($1);")
40        .bind(block_ids)
41        .execute(&lake.db_pool)
42        .await?;
43    Ok(paths.len() == batch_size as usize)
44}
45
46/// Deletes all expired blocks from the data lake.
47pub async fn delete_expired_blocks(
48    lake: &DataLakeConnection,
49    expiration: DateTime<Utc>,
50) -> Result<()> {
51    while delete_expired_blocks_batch(lake, expiration).await? {}
52    Ok(())
53}
54
55/// Deletes a batch of empty streams from the data lake.
56/// Returns `true` if there are more streams to delete, `false` otherwise.
57pub async fn delete_empty_streams_batch(
58    lake: &DataLakeConnection,
59    expiration: DateTime<Utc>,
60) -> Result<bool> {
61    let batch_size: i32 = 1000;
62    // delete returning would be more efficient
63    let rows = query(
64        "SELECT streams.stream_id
65         FROM streams
66         WHERE streams.insert_time <= $1
67         AND NOT EXISTS
68         (
69           SELECT 1
70           FROM blocks
71           WHERE blocks.stream_id = streams.stream_id
72           LIMIT 1
73          )
74         LIMIT $2
75         ;",
76    )
77    .bind(expiration)
78    .bind(batch_size)
79    .fetch_all(&lake.db_pool)
80    .await?;
81    let mut stream_ids = vec![];
82    for r in rows {
83        let stream_id: Uuid = r.try_get("stream_id")?;
84        stream_ids.push(stream_id);
85    }
86
87    info!("deleting expired streams {stream_ids:?}");
88    query("DELETE FROM streams where stream_id = ANY($1);")
89        .bind(&stream_ids)
90        .execute(&lake.db_pool)
91        .await?;
92
93    Ok(stream_ids.len() == batch_size as usize)
94}
95
96/// Deletes all empty streams from the data lake.
97pub async fn delete_empty_streams(
98    lake: &DataLakeConnection,
99    expiration: DateTime<Utc>,
100) -> Result<()> {
101    while delete_empty_streams_batch(lake, expiration).await? {}
102    Ok(())
103}
104
105/// Deletes a batch of empty processes from the data lake.
106/// Returns `true` if there are more processes to delete, `false` otherwise.
107pub async fn delete_empty_processes_batch(
108    lake: &DataLakeConnection,
109    expiration: DateTime<Utc>,
110) -> Result<bool> {
111    let batch_size: i32 = 1000;
112    // delete returning would be more efficient
113    // also, we should remove the group by and use a query similar to delete_empty_streams_batch
114    let rows = query(
115        "SELECT processes.process_id
116         FROM processes
117         LEFT OUTER JOIN streams ON streams.process_id = processes.process_id
118         WHERE processes.insert_time <= $1
119         GROUP BY processes.process_id
120         HAVING count(streams.stream_id) = 0
121         LIMIT $2;",
122    )
123    .bind(expiration)
124    .bind(batch_size)
125    .fetch_all(&lake.db_pool)
126    .await?;
127    let mut process_ids = vec![];
128    for r in rows {
129        let process_id: Uuid = r.try_get("process_id")?;
130        process_ids.push(process_id);
131    }
132
133    info!("deleting expired processes {process_ids:?}");
134    query("DELETE FROM processes where process_id = ANY($1);")
135        .bind(&process_ids)
136        .execute(&lake.db_pool)
137        .await?;
138
139    Ok(process_ids.len() == batch_size as usize)
140}
141
142/// Deletes all empty processes from the data lake.
143pub async fn delete_empty_processes(
144    lake: &DataLakeConnection,
145    expiration: DateTime<Utc>,
146) -> Result<()> {
147    while delete_empty_processes_batch(lake, expiration).await? {}
148    Ok(())
149}
150
151/// Deletes all data older than a specified number of days from the data lake.
152pub async fn delete_old_data(lake: &DataLakeConnection, min_days_old: i32) -> Result<()> {
153    let now = Utc::now();
154    let expiration = now
155        .checked_sub_days(Days::new(min_days_old as u64))
156        .with_context(|| "computing expiration date/time")?;
157    delete_expired_blocks(lake, expiration)
158        .await
159        .with_context(|| "delete_expired_blocks")?;
160    delete_empty_streams(lake, expiration)
161        .await
162        .with_context(|| "delete_empty_streams")?;
163    delete_empty_processes(lake, expiration)
164        .await
165        .with_context(|| "delete_empty_processes")?;
166    retire_expired_partitions(lake, expiration)
167        .await
168        .with_context(|| "retire_expired_partitions")?;
169    Ok(())
170}