micromegas_analytics/
delete.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Days, Utc};
3use micromegas_ingestion::data_lake_connection::DataLakeConnection;
4use micromegas_tracing::prelude::*;
5use sqlx::{Row, query};
6use uuid::Uuid;
7
8use crate::lakehouse::write_partition::retire_expired_partitions;
9
10/// Deletes a batch of expired blocks from the data lake.
11/// Returns `true` if there are more blocks to delete, `false` otherwise.
12#[span_fn]
13pub async fn delete_expired_blocks_batch(
14    lake: &DataLakeConnection,
15    expiration: DateTime<Utc>,
16) -> Result<bool> {
17    let batch_size: i32 = 1000;
18    let mut transaction = lake.db_pool.begin().await?;
19    let rows = query(
20        "DELETE FROM blocks
21         WHERE block_id IN (
22             SELECT block_id FROM blocks WHERE insert_time <= $1 LIMIT $2
23         )
24         RETURNING process_id, stream_id, block_id;",
25    )
26    .bind(expiration)
27    .bind(batch_size)
28    .fetch_all(&mut *transaction)
29    .await?;
30    if rows.is_empty() {
31        return Ok(false);
32    }
33    let mut paths = vec![];
34    for r in &rows {
35        let process_id: Uuid = r.try_get("process_id")?;
36        let stream_id: Uuid = r.try_get("stream_id")?;
37        let block_id: Uuid = r.try_get("block_id")?;
38        let path = format!("blobs/{process_id}/{stream_id}/{block_id}");
39        paths.push(path);
40    }
41    lake.blob_storage.delete_batch(&paths).await?;
42    transaction.commit().await.with_context(|| "commit")?;
43    for path in &paths {
44        debug!("deleted blob {path}");
45    }
46    info!("deleted {} blocks", paths.len());
47    Ok(paths.len() == batch_size as usize)
48}
49
50/// Deletes all expired blocks from the data lake.
51#[span_fn]
52pub async fn delete_expired_blocks(
53    lake: &DataLakeConnection,
54    expiration: DateTime<Utc>,
55) -> Result<()> {
56    while delete_expired_blocks_batch(lake, expiration).await? {}
57    Ok(())
58}
59
60/// Deletes a batch of empty streams from the data lake.
61/// Returns `true` if there are more streams to delete, `false` otherwise.
62#[span_fn]
63pub async fn delete_empty_streams_batch(
64    lake: &DataLakeConnection,
65    expiration: DateTime<Utc>,
66) -> Result<bool> {
67    let batch_size: i32 = 1000;
68    let rows = query(
69        "WITH batch AS (
70             SELECT stream_id FROM streams
71             WHERE  insert_time <= $1
72             AND    NOT EXISTS (SELECT 1 FROM blocks WHERE blocks.stream_id = streams.stream_id LIMIT 1)
73             LIMIT  $2
74         )
75         DELETE FROM streams
76         WHERE stream_id IN (SELECT stream_id FROM batch)
77         RETURNING stream_id;",
78    )
79    .bind(expiration)
80    .bind(batch_size)
81    .fetch_all(&lake.db_pool)
82    .await?;
83    if rows.is_empty() {
84        return Ok(false);
85    }
86    for r in &rows {
87        let stream_id: Uuid = r.try_get("stream_id")?;
88        debug!("deleted stream {stream_id}");
89    }
90    let count = rows.len();
91    info!("deleted {count} empty streams");
92    Ok(count == batch_size as usize)
93}
94
95/// Deletes all empty streams from the data lake.
96#[span_fn]
97pub async fn delete_empty_streams(
98    lake: &DataLakeConnection,
99    expiration: DateTime<Utc>,
100) -> Result<()> {
101    while delete_empty_streams_batch(lake, expiration).await? {}
102    Ok(())
103}
104
105/// Deletes a batch of empty processes from the data lake.
106/// Returns `true` if there are more processes to delete, `false` otherwise.
107#[span_fn]
108pub async fn delete_empty_processes_batch(
109    lake: &DataLakeConnection,
110    expiration: DateTime<Utc>,
111) -> Result<bool> {
112    let batch_size: i32 = 1000;
113    let rows = query(
114        "WITH batch AS (
115             SELECT process_id FROM processes
116             WHERE  insert_time <= $1
117             AND    NOT EXISTS (SELECT 1 FROM streams WHERE streams.process_id = processes.process_id LIMIT 1)
118             LIMIT  $2
119         )
120         DELETE FROM processes
121         WHERE process_id IN (SELECT process_id FROM batch)
122         RETURNING process_id;",
123    )
124    .bind(expiration)
125    .bind(batch_size)
126    .fetch_all(&lake.db_pool)
127    .await?;
128    if rows.is_empty() {
129        return Ok(false);
130    }
131    for r in &rows {
132        let process_id: Uuid = r.try_get("process_id")?;
133        debug!("deleted process {process_id}");
134    }
135    let count = rows.len();
136    info!("deleted {count} empty processes");
137    Ok(count == batch_size as usize)
138}
139
140/// Deletes all empty processes from the data lake.
141#[span_fn]
142pub async fn delete_empty_processes(
143    lake: &DataLakeConnection,
144    expiration: DateTime<Utc>,
145) -> Result<()> {
146    while delete_empty_processes_batch(lake, expiration).await? {}
147    Ok(())
148}
149
150/// Deletes all data older than a specified number of days from the data lake.
151#[span_fn]
152pub async fn delete_old_data(lake: &DataLakeConnection, min_days_old: i32) -> Result<()> {
153    let now = Utc::now();
154    let expiration = now
155        .checked_sub_days(Days::new(min_days_old as u64))
156        .with_context(|| "computing expiration date/time")?;
157    delete_expired_blocks(lake, expiration)
158        .await
159        .with_context(|| "delete_expired_blocks")?;
160    delete_empty_streams(lake, expiration)
161        .await
162        .with_context(|| "delete_empty_streams")?;
163    delete_empty_processes(lake, expiration)
164        .await
165        .with_context(|| "delete_empty_processes")?;
166    retire_expired_partitions(lake, expiration)
167        .await
168        .with_context(|| "retire_expired_partitions")?;
169    Ok(())
170}