micromegas_analytics/lakehouse/
retire_partition_by_metadata_udf.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use datafusion::{
5    arrow::{
6        array::{Array, StringArray, StringBuilder, TimestampNanosecondArray},
7        datatypes::{DataType, TimeUnit},
8    },
9    common::internal_err,
10    error::DataFusionError,
11    logical_expr::{
12        ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
13        async_udf::AsyncScalarUDFImpl,
14    },
15};
16use micromegas_ingestion::data_lake_connection::DataLakeConnection;
17use micromegas_tracing::prelude::*;
18use sqlx::Row;
19use std::sync::Arc;
20
21use super::write_partition::add_file_for_cleanup;
22
23/// A scalar UDF that retires a single partition by its metadata.
24///
25/// This function retires partitions by their metadata identifiers (view_set_name,
26/// view_instance_id, begin_insert_time, end_insert_time). This works for both empty
27/// partitions (file_path=NULL) and non-empty partitions.
28///
29/// This is the preferred method for retiring partitions as it uses the partition's
30/// natural identifiers rather than relying on file paths.
31#[derive(Debug)]
32pub struct RetirePartitionByMetadata {
33    signature: Signature,
34    lake: Arc<DataLakeConnection>,
35}
36
37impl PartialEq for RetirePartitionByMetadata {
38    fn eq(&self, other: &Self) -> bool {
39        self.signature == other.signature
40    }
41}
42
43impl Eq for RetirePartitionByMetadata {}
44
45impl std::hash::Hash for RetirePartitionByMetadata {
46    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
47        self.signature.hash(state);
48    }
49}
50
51impl RetirePartitionByMetadata {
52    pub fn new(lake: Arc<DataLakeConnection>) -> Self {
53        Self {
54            signature: Signature::exact(
55                vec![
56                    DataType::Utf8,
57                    DataType::Utf8,
58                    DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59                    DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
60                ],
61                Volatility::Volatile,
62            ),
63            lake,
64        }
65    }
66
67    /// Retires a single partition by its metadata within an existing transaction.
68    ///
69    /// # Arguments
70    /// * `transaction` - Database transaction to use
71    /// * `view_set_name` - The name of the view set
72    /// * `view_instance_id` - The instance ID (e.g., process_id or 'global')
73    /// * `begin_insert_time` - Begin insert time timestamp
74    /// * `end_insert_time` - End insert time timestamp
75    ///
76    /// # Returns
77    /// * `Ok(())` on successful retirement
78    /// * `Err(anyhow::Error)` with descriptive message for any failure
79    async fn retire_partition_in_transaction(
80        &self,
81        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
82        view_set_name: &str,
83        view_instance_id: &str,
84        begin_insert_time: DateTime<Utc>,
85        end_insert_time: DateTime<Utc>,
86    ) -> Result<()> {
87        // First, check if the partition exists and get its details
88        let partition_query = sqlx::query(
89            "SELECT file_path, file_size FROM lakehouse_partitions 
90             WHERE view_set_name = $1 
91               AND view_instance_id = $2 
92               AND begin_insert_time = $3 
93               AND end_insert_time = $4",
94        )
95        .bind(view_set_name)
96        .bind(view_instance_id)
97        .bind(begin_insert_time)
98        .bind(end_insert_time)
99        .fetch_optional(&mut **transaction)
100        .await
101        .with_context(|| {
102            format!(
103                "querying partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
104            )
105        })?;
106
107        let Some(partition_row) = partition_query else {
108            anyhow::bail!(
109                "Partition not found: {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
110            );
111        };
112
113        // Handle file cleanup if file_path is not NULL
114        let file_path_opt: Option<String> = partition_row.try_get("file_path")?;
115        if let Some(file_path) = file_path_opt {
116            let file_size: i64 = partition_row.try_get("file_size")?;
117            // Add to temporary files for cleanup (expires in 1 hour)
118            add_file_for_cleanup(transaction, &file_path, file_size).await?;
119        }
120
121        // Remove from active partitions
122        let delete_result = sqlx::query(
123            "DELETE FROM lakehouse_partitions 
124             WHERE view_set_name = $1 
125               AND view_instance_id = $2 
126               AND begin_insert_time = $3 
127               AND end_insert_time = $4",
128        )
129        .bind(view_set_name)
130        .bind(view_instance_id)
131        .bind(begin_insert_time)
132        .bind(end_insert_time)
133        .execute(&mut **transaction)
134        .await
135        .with_context(|| {
136            format!(
137                "deleting partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
138            )
139        })?;
140
141        if delete_result.rows_affected() == 0 {
142            // This shouldn't happen since we checked existence above, but handle it gracefully
143            anyhow::bail!(
144                "Partition not found during deletion: {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
145            );
146        }
147
148        info!(
149            "Successfully retired partition: {}/{} [{}, {})",
150            view_set_name, view_instance_id, begin_insert_time, end_insert_time
151        );
152        Ok(())
153    }
154}
155
156impl ScalarUDFImpl for RetirePartitionByMetadata {
157    fn as_any(&self) -> &dyn std::any::Any {
158        self
159    }
160
161    fn name(&self) -> &str {
162        "retire_partition_by_metadata"
163    }
164
165    fn signature(&self) -> &Signature {
166        &self.signature
167    }
168
169    fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
170        Ok(DataType::Utf8)
171    }
172
173    fn invoke_with_args(
174        &self,
175        _args: ScalarFunctionArgs,
176    ) -> datafusion::error::Result<ColumnarValue> {
177        Err(DataFusionError::NotImplemented(
178            "retire_partition_by_metadata can only be called from async contexts".into(),
179        ))
180    }
181}
182
183#[async_trait]
184impl AsyncScalarUDFImpl for RetirePartitionByMetadata {
185    async fn invoke_async_with_args(
186        &self,
187        args: ScalarFunctionArgs,
188    ) -> datafusion::error::Result<ColumnarValue> {
189        let args = ColumnarValue::values_to_arrays(&args.args)?;
190        if args.len() != 4 {
191            return internal_err!(
192                "retire_partition_by_metadata expects exactly 4 arguments: view_set_name, view_instance_id, begin_insert_time, end_insert_time"
193            );
194        }
195
196        let view_set_names: &StringArray =
197            args[0].as_any().downcast_ref::<_>().ok_or_else(|| {
198                DataFusionError::Execution(
199                    "error casting view_set_name argument as StringArray".into(),
200                )
201            })?;
202
203        let view_instance_ids: &StringArray =
204            args[1].as_any().downcast_ref::<_>().ok_or_else(|| {
205                DataFusionError::Execution(
206                    "error casting view_instance_id argument as StringArray".into(),
207                )
208            })?;
209
210        let begin_insert_times: &TimestampNanosecondArray =
211            args[2].as_any().downcast_ref::<_>().ok_or_else(|| {
212                DataFusionError::Execution(
213                    "error casting begin_insert_time argument as TimestampNanosecondArray".into(),
214                )
215            })?;
216
217        let end_insert_times: &TimestampNanosecondArray =
218            args[3].as_any().downcast_ref::<_>().ok_or_else(|| {
219                DataFusionError::Execution(
220                    "error casting end_insert_time argument as TimestampNanosecondArray".into(),
221                )
222            })?;
223
224        let mut builder = StringBuilder::with_capacity(view_set_names.len(), 64);
225
226        // Use a single transaction for the entire batch
227        let mut transaction =
228            self.lake.db_pool.begin().await.map_err(|e| {
229                DataFusionError::Execution(format!("Failed to begin transaction: {e}"))
230            })?;
231
232        let mut success_count = 0;
233        let mut has_errors = false;
234
235        // Process each partition in the batch within the same transaction
236        for index in 0..view_set_names.len() {
237            if view_set_names.is_null(index)
238                || view_instance_ids.is_null(index)
239                || begin_insert_times.is_null(index)
240                || end_insert_times.is_null(index)
241            {
242                builder.append_value("ERROR: all arguments must be non-null");
243                has_errors = true;
244                continue;
245            }
246
247            let view_set_name = view_set_names.value(index);
248            let view_instance_id = view_instance_ids.value(index);
249            let begin_insert_time_nanos = begin_insert_times.value(index);
250            let end_insert_time_nanos = end_insert_times.value(index);
251
252            // Convert nanoseconds to DateTime<Utc> for proper sqlx binding
253            let begin_insert_time = DateTime::from_timestamp_nanos(begin_insert_time_nanos);
254            let end_insert_time = DateTime::from_timestamp_nanos(end_insert_time_nanos);
255
256            match self
257                .retire_partition_in_transaction(
258                    &mut transaction,
259                    view_set_name,
260                    view_instance_id,
261                    begin_insert_time,
262                    end_insert_time,
263                )
264                .await
265            {
266                Ok(()) => {
267                    success_count += 1;
268                    builder.append_value(format!(
269                        "SUCCESS: Retired partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
270                    ));
271                }
272                Err(e) => {
273                    error!(
274                        "Failed to retire partition {}/{} [{}, {}): {:?}",
275                        view_set_name, view_instance_id, begin_insert_time, end_insert_time, e
276                    );
277                    builder.append_value(format!("ERROR: {e:?}"));
278                    has_errors = true;
279                }
280            }
281        }
282
283        // Commit the transaction only if there were no errors
284        if has_errors {
285            if let Err(e) = transaction.rollback().await {
286                error!("Failed to rollback transaction after errors: {:?}", e);
287            }
288            info!("Rolled back transaction due to errors in batch retirement");
289            builder.append_value(format!(
290                "ROLLED_BACK: All {} previous changes were reverted due to errors in batch",
291                success_count
292            ));
293        } else {
294            transaction.commit().await.map_err(|e| {
295                DataFusionError::Execution(format!("Failed to commit transaction: {e}"))
296            })?;
297            info!("Successfully retired {} partitions in batch", success_count);
298        }
299
300        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
301    }
302}
303
304/// Creates a user-defined function to retire a single partition by its metadata.
305///
306/// This function retires partitions by their metadata identifiers rather than file path,
307/// making it suitable for both empty partitions (file_path=NULL) and non-empty partitions.
308///
309/// # Usage
310/// ```sql
311/// SELECT retire_partition_by_metadata(
312///     'log_entries',
313///     'process_123',
314///     TIMESTAMP '2024-01-01 00:00:00',
315///     TIMESTAMP '2024-01-01 01:00:00'
316/// ) as result;
317/// ```
318///
319/// # Returns
320/// A string message indicating success or failure:
321/// - "SUCCESS: Retired partition \<view_set\>/\<instance\> [\<begin\>, \<end\>)" on successful retirement
322/// - "ERROR: Partition not found: \<view_set\>/\<instance\> [\<begin\>, \<end\>)" if the partition doesn't exist
323/// - "ERROR: Database error: \<details\>" for any database-related failures
324pub fn make_retire_partition_by_metadata_udf(
325    lake: Arc<DataLakeConnection>,
326) -> datafusion::logical_expr::async_udf::AsyncScalarUDF {
327    datafusion::logical_expr::async_udf::AsyncScalarUDF::new(Arc::new(
328        RetirePartitionByMetadata::new(lake),
329    ))
330}