micromegas_analytics/lakehouse/
retire_partition_by_file_udf.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use datafusion::{
4 arrow::{
5 array::{Array, StringArray, StringBuilder},
6 datatypes::DataType,
7 },
8 common::internal_err,
9 error::DataFusionError,
10 logical_expr::{
11 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
12 async_udf::AsyncScalarUDFImpl,
13 },
14};
15use micromegas_ingestion::data_lake_connection::DataLakeConnection;
16use micromegas_tracing::prelude::*;
17use sqlx::Row;
18use std::sync::Arc;
19
20use super::write_partition::add_file_for_cleanup;
21
22#[derive(Debug)]
26pub struct RetirePartitionByFile {
27 signature: Signature,
28 lake: Arc<DataLakeConnection>,
29}
30
31impl PartialEq for RetirePartitionByFile {
32 fn eq(&self, other: &Self) -> bool {
33 self.signature == other.signature
34 }
35}
36
37impl Eq for RetirePartitionByFile {}
38
39impl std::hash::Hash for RetirePartitionByFile {
40 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
41 self.signature.hash(state);
42 }
43}
44
45impl RetirePartitionByFile {
46 pub fn new(lake: Arc<DataLakeConnection>) -> Self {
47 Self {
48 signature: Signature::exact(vec![DataType::Utf8], Volatility::Volatile),
49 lake,
50 }
51 }
52
53 async fn retire_partition_in_transaction(
63 &self,
64 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
65 file_path: &str,
66 ) -> Result<()> {
67 let partition_query = sqlx::query(
69 "SELECT file_path, file_size FROM lakehouse_partitions WHERE file_path = $1",
70 )
71 .bind(file_path)
72 .fetch_optional(&mut **transaction)
73 .await
74 .with_context(|| format!("querying partition {file_path}"))?;
75
76 let Some(partition_row) = partition_query else {
77 anyhow::bail!("Partition not found: {file_path}");
78 };
79
80 let file_size: i64 = partition_row.try_get("file_size")?;
81
82 add_file_for_cleanup(transaction, file_path, file_size).await?;
84
85 let delete_result = sqlx::query("DELETE FROM lakehouse_partitions WHERE file_path = $1")
87 .bind(file_path)
88 .execute(&mut **transaction)
89 .await
90 .with_context(|| format!("deleting partition {file_path}"))?;
91
92 if delete_result.rows_affected() == 0 {
93 anyhow::bail!("Partition not found during deletion: {file_path}");
95 }
96
97 info!("Successfully retired partition: {}", file_path);
98 Ok(())
99 }
100}
101
102impl ScalarUDFImpl for RetirePartitionByFile {
103 fn as_any(&self) -> &dyn std::any::Any {
104 self
105 }
106
107 fn name(&self) -> &str {
108 "retire_partition_by_file"
109 }
110
111 fn signature(&self) -> &Signature {
112 &self.signature
113 }
114
115 fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
116 Ok(DataType::Utf8)
117 }
118
119 fn invoke_with_args(
120 &self,
121 _args: ScalarFunctionArgs,
122 ) -> datafusion::error::Result<ColumnarValue> {
123 Err(DataFusionError::NotImplemented(
124 "retire_partition_by_file can only be called from async contexts".into(),
125 ))
126 }
127}
128
129#[async_trait]
130impl AsyncScalarUDFImpl for RetirePartitionByFile {
131 async fn invoke_async_with_args(
132 &self,
133 args: ScalarFunctionArgs,
134 ) -> datafusion::error::Result<ColumnarValue> {
135 let args = ColumnarValue::values_to_arrays(&args.args)?;
136 if args.len() != 1 {
137 return internal_err!("retire_partition_by_file expects exactly 1 argument: file_path");
138 }
139
140 let file_paths: &StringArray = args[0].as_any().downcast_ref::<_>().ok_or_else(|| {
141 DataFusionError::Execution("error casting file_path argument as StringArray".into())
142 })?;
143
144 let mut builder = StringBuilder::with_capacity(file_paths.len(), 64);
145
146 let mut transaction =
148 self.lake.db_pool.begin().await.map_err(|e| {
149 DataFusionError::Execution(format!("Failed to begin transaction: {e}"))
150 })?;
151
152 let mut success_count = 0;
153 let mut has_errors = false;
154
155 for index in 0..file_paths.len() {
157 if file_paths.is_null(index) {
158 builder.append_value("ERROR: file_path cannot be null");
159 has_errors = true;
160 continue;
161 }
162
163 let file_path = file_paths.value(index);
164
165 match self
166 .retire_partition_in_transaction(&mut transaction, file_path)
167 .await
168 {
169 Ok(()) => {
170 success_count += 1;
171 builder.append_value(format!("SUCCESS: Retired partition {file_path}"));
172 }
173 Err(e) => {
174 error!("Failed to retire partition {}: {:?}", file_path, e);
175 builder.append_value(format!("ERROR: {e:?}"));
176 has_errors = true;
177 }
178 }
179 }
180
181 if has_errors {
183 if let Err(e) = transaction.rollback().await {
184 error!("Failed to rollback transaction after errors: {:?}", e);
185 }
186 info!("Rolled back transaction due to errors in batch retirement");
187 } else {
188 transaction.commit().await.map_err(|e| {
189 DataFusionError::Execution(format!("Failed to commit transaction: {e}"))
190 })?;
191 info!("Successfully retired {} partitions in batch", success_count);
192 }
193
194 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
195 }
196}
197
198pub fn make_retire_partition_by_file_udf(
213 lake: Arc<DataLakeConnection>,
214) -> datafusion::logical_expr::async_udf::AsyncScalarUDF {
215 datafusion::logical_expr::async_udf::AsyncScalarUDF::new(Arc::new(RetirePartitionByFile::new(
216 lake,
217 )))
218}