micromegas_analytics/lakehouse/
retire_partition_by_file_udf.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use datafusion::{
4    arrow::{
5        array::{Array, StringArray, StringBuilder},
6        datatypes::DataType,
7    },
8    common::internal_err,
9    error::DataFusionError,
10    logical_expr::{
11        ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
12        async_udf::AsyncScalarUDFImpl,
13    },
14};
15use micromegas_ingestion::data_lake_connection::DataLakeConnection;
16use micromegas_tracing::prelude::*;
17use sqlx::Row;
18use std::sync::Arc;
19
20use super::write_partition::add_file_for_cleanup;
21
22/// A scalar UDF that retires a single partition by its file path.
23///
24/// This function retires only the exact specified partition from the lakehouse.
25#[derive(Debug)]
26pub struct RetirePartitionByFile {
27    signature: Signature,
28    lake: Arc<DataLakeConnection>,
29}
30
31impl PartialEq for RetirePartitionByFile {
32    fn eq(&self, other: &Self) -> bool {
33        self.signature == other.signature
34    }
35}
36
37impl Eq for RetirePartitionByFile {}
38
39impl std::hash::Hash for RetirePartitionByFile {
40    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
41        self.signature.hash(state);
42    }
43}
44
45impl RetirePartitionByFile {
46    pub fn new(lake: Arc<DataLakeConnection>) -> Self {
47        Self {
48            signature: Signature::exact(vec![DataType::Utf8], Volatility::Volatile),
49            lake,
50        }
51    }
52
53    /// Retires a single partition by its file path within an existing transaction.
54    ///
55    /// # Arguments
56    /// * `transaction` - Database transaction to use
57    /// * `file_path` - The exact file path of the partition to retire
58    ///
59    /// # Returns
60    /// * `Ok(())` on successful retirement
61    /// * `Err(anyhow::Error)` with descriptive message for any failure
62    async fn retire_partition_in_transaction(
63        &self,
64        transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
65        file_path: &str,
66    ) -> Result<()> {
67        // First, check if the partition exists and get its details
68        let partition_query = sqlx::query(
69            "SELECT file_path, file_size FROM lakehouse_partitions WHERE file_path = $1",
70        )
71        .bind(file_path)
72        .fetch_optional(&mut **transaction)
73        .await
74        .with_context(|| format!("querying partition {file_path}"))?;
75
76        let Some(partition_row) = partition_query else {
77            anyhow::bail!("Partition not found: {file_path}");
78        };
79
80        let file_size: i64 = partition_row.try_get("file_size")?;
81
82        // Add to temporary files for cleanup (expires in 1 hour)
83        add_file_for_cleanup(transaction, file_path, file_size).await?;
84
85        // Remove from active partitions
86        let delete_result = sqlx::query("DELETE FROM lakehouse_partitions WHERE file_path = $1")
87            .bind(file_path)
88            .execute(&mut **transaction)
89            .await
90            .with_context(|| format!("deleting partition {file_path}"))?;
91
92        if delete_result.rows_affected() == 0 {
93            // This shouldn't happen since we checked existence above, but handle it gracefully
94            anyhow::bail!("Partition not found during deletion: {file_path}");
95        }
96
97        info!("Successfully retired partition: {}", file_path);
98        Ok(())
99    }
100}
101
102impl ScalarUDFImpl for RetirePartitionByFile {
103    fn as_any(&self) -> &dyn std::any::Any {
104        self
105    }
106
107    fn name(&self) -> &str {
108        "retire_partition_by_file"
109    }
110
111    fn signature(&self) -> &Signature {
112        &self.signature
113    }
114
115    fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
116        Ok(DataType::Utf8)
117    }
118
119    fn invoke_with_args(
120        &self,
121        _args: ScalarFunctionArgs,
122    ) -> datafusion::error::Result<ColumnarValue> {
123        Err(DataFusionError::NotImplemented(
124            "retire_partition_by_file can only be called from async contexts".into(),
125        ))
126    }
127}
128
129#[async_trait]
130impl AsyncScalarUDFImpl for RetirePartitionByFile {
131    async fn invoke_async_with_args(
132        &self,
133        args: ScalarFunctionArgs,
134    ) -> datafusion::error::Result<ColumnarValue> {
135        let args = ColumnarValue::values_to_arrays(&args.args)?;
136        if args.len() != 1 {
137            return internal_err!("retire_partition_by_file expects exactly 1 argument: file_path");
138        }
139
140        let file_paths: &StringArray = args[0].as_any().downcast_ref::<_>().ok_or_else(|| {
141            DataFusionError::Execution("error casting file_path argument as StringArray".into())
142        })?;
143
144        let mut builder = StringBuilder::with_capacity(file_paths.len(), 64);
145
146        // Use a single transaction for the entire batch
147        let mut transaction =
148            self.lake.db_pool.begin().await.map_err(|e| {
149                DataFusionError::Execution(format!("Failed to begin transaction: {e}"))
150            })?;
151
152        let mut success_count = 0;
153        let mut has_errors = false;
154
155        // Process each file path in the batch within the same transaction
156        for index in 0..file_paths.len() {
157            if file_paths.is_null(index) {
158                builder.append_value("ERROR: file_path cannot be null");
159                has_errors = true;
160                continue;
161            }
162
163            let file_path = file_paths.value(index);
164
165            match self
166                .retire_partition_in_transaction(&mut transaction, file_path)
167                .await
168            {
169                Ok(()) => {
170                    success_count += 1;
171                    builder.append_value(format!("SUCCESS: Retired partition {file_path}"));
172                }
173                Err(e) => {
174                    error!("Failed to retire partition {}: {:?}", file_path, e);
175                    builder.append_value(format!("ERROR: {e:?}"));
176                    has_errors = true;
177                }
178            }
179        }
180
181        // Commit the transaction only if there were no errors
182        if has_errors {
183            if let Err(e) = transaction.rollback().await {
184                error!("Failed to rollback transaction after errors: {:?}", e);
185            }
186            info!("Rolled back transaction due to errors in batch retirement");
187        } else {
188            transaction.commit().await.map_err(|e| {
189                DataFusionError::Execution(format!("Failed to commit transaction: {e}"))
190            })?;
191            info!("Successfully retired {} partitions in batch", success_count);
192        }
193
194        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
195    }
196}
197
198/// Creates a user-defined function to retire a single partition by its file path.
199///
200/// This function ensures only the exact specified partition is removed from the lakehouse.
201///
202/// # Usage
203/// ```sql
204/// SELECT retire_partition_by_file('/path/to/partition.parquet') as result;
205/// ```
206///
207/// # Returns
208/// A string message indicating success or failure:
209/// - "SUCCESS: Retired partition <file_path>" on successful retirement
210/// - "ERROR: Partition not found: <file_path>" if the partition doesn't exist  
211/// - "ERROR: Database error: \<details\>" for any database-related failures
212pub fn make_retire_partition_by_file_udf(
213    lake: Arc<DataLakeConnection>,
214) -> datafusion::logical_expr::async_udf::AsyncScalarUDF {
215    datafusion::logical_expr::async_udf::AsyncScalarUDF::new(Arc::new(RetirePartitionByFile::new(
216        lake,
217    )))
218}