micromegas_analytics/lakehouse/
retire_partition_by_metadata_udf.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use datafusion::{
5 arrow::{
6 array::{Array, StringArray, StringBuilder, TimestampNanosecondArray},
7 datatypes::{DataType, TimeUnit},
8 },
9 common::internal_err,
10 error::DataFusionError,
11 logical_expr::{
12 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
13 async_udf::AsyncScalarUDFImpl,
14 },
15};
16use micromegas_ingestion::data_lake_connection::DataLakeConnection;
17use micromegas_tracing::prelude::*;
18use sqlx::Row;
19use std::sync::Arc;
20
21use super::write_partition::add_file_for_cleanup;
22
23#[derive(Debug)]
32pub struct RetirePartitionByMetadata {
33 signature: Signature,
34 lake: Arc<DataLakeConnection>,
35}
36
37impl PartialEq for RetirePartitionByMetadata {
38 fn eq(&self, other: &Self) -> bool {
39 self.signature == other.signature
40 }
41}
42
43impl Eq for RetirePartitionByMetadata {}
44
45impl std::hash::Hash for RetirePartitionByMetadata {
46 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
47 self.signature.hash(state);
48 }
49}
50
51impl RetirePartitionByMetadata {
52 pub fn new(lake: Arc<DataLakeConnection>) -> Self {
53 Self {
54 signature: Signature::exact(
55 vec![
56 DataType::Utf8,
57 DataType::Utf8,
58 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
60 ],
61 Volatility::Volatile,
62 ),
63 lake,
64 }
65 }
66
67 async fn retire_partition_in_transaction(
80 &self,
81 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
82 view_set_name: &str,
83 view_instance_id: &str,
84 begin_insert_time: DateTime<Utc>,
85 end_insert_time: DateTime<Utc>,
86 ) -> Result<()> {
87 let partition_query = sqlx::query(
89 "SELECT file_path, file_size FROM lakehouse_partitions
90 WHERE view_set_name = $1
91 AND view_instance_id = $2
92 AND begin_insert_time = $3
93 AND end_insert_time = $4",
94 )
95 .bind(view_set_name)
96 .bind(view_instance_id)
97 .bind(begin_insert_time)
98 .bind(end_insert_time)
99 .fetch_optional(&mut **transaction)
100 .await
101 .with_context(|| {
102 format!(
103 "querying partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
104 )
105 })?;
106
107 let Some(partition_row) = partition_query else {
108 anyhow::bail!(
109 "Partition not found: {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
110 );
111 };
112
113 let file_path_opt: Option<String> = partition_row.try_get("file_path")?;
115 if let Some(file_path) = file_path_opt {
116 let file_size: i64 = partition_row.try_get("file_size")?;
117 add_file_for_cleanup(transaction, &file_path, file_size).await?;
119 }
120
121 let delete_result = sqlx::query(
123 "DELETE FROM lakehouse_partitions
124 WHERE view_set_name = $1
125 AND view_instance_id = $2
126 AND begin_insert_time = $3
127 AND end_insert_time = $4",
128 )
129 .bind(view_set_name)
130 .bind(view_instance_id)
131 .bind(begin_insert_time)
132 .bind(end_insert_time)
133 .execute(&mut **transaction)
134 .await
135 .with_context(|| {
136 format!(
137 "deleting partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
138 )
139 })?;
140
141 if delete_result.rows_affected() == 0 {
142 anyhow::bail!(
144 "Partition not found during deletion: {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
145 );
146 }
147
148 info!(
149 "Successfully retired partition: {}/{} [{}, {})",
150 view_set_name, view_instance_id, begin_insert_time, end_insert_time
151 );
152 Ok(())
153 }
154}
155
156impl ScalarUDFImpl for RetirePartitionByMetadata {
157 fn as_any(&self) -> &dyn std::any::Any {
158 self
159 }
160
161 fn name(&self) -> &str {
162 "retire_partition_by_metadata"
163 }
164
165 fn signature(&self) -> &Signature {
166 &self.signature
167 }
168
169 fn return_type(&self, _arg_types: &[DataType]) -> datafusion::error::Result<DataType> {
170 Ok(DataType::Utf8)
171 }
172
173 fn invoke_with_args(
174 &self,
175 _args: ScalarFunctionArgs,
176 ) -> datafusion::error::Result<ColumnarValue> {
177 Err(DataFusionError::NotImplemented(
178 "retire_partition_by_metadata can only be called from async contexts".into(),
179 ))
180 }
181}
182
183#[async_trait]
184impl AsyncScalarUDFImpl for RetirePartitionByMetadata {
185 async fn invoke_async_with_args(
186 &self,
187 args: ScalarFunctionArgs,
188 ) -> datafusion::error::Result<ColumnarValue> {
189 let args = ColumnarValue::values_to_arrays(&args.args)?;
190 if args.len() != 4 {
191 return internal_err!(
192 "retire_partition_by_metadata expects exactly 4 arguments: view_set_name, view_instance_id, begin_insert_time, end_insert_time"
193 );
194 }
195
196 let view_set_names: &StringArray =
197 args[0].as_any().downcast_ref::<_>().ok_or_else(|| {
198 DataFusionError::Execution(
199 "error casting view_set_name argument as StringArray".into(),
200 )
201 })?;
202
203 let view_instance_ids: &StringArray =
204 args[1].as_any().downcast_ref::<_>().ok_or_else(|| {
205 DataFusionError::Execution(
206 "error casting view_instance_id argument as StringArray".into(),
207 )
208 })?;
209
210 let begin_insert_times: &TimestampNanosecondArray =
211 args[2].as_any().downcast_ref::<_>().ok_or_else(|| {
212 DataFusionError::Execution(
213 "error casting begin_insert_time argument as TimestampNanosecondArray".into(),
214 )
215 })?;
216
217 let end_insert_times: &TimestampNanosecondArray =
218 args[3].as_any().downcast_ref::<_>().ok_or_else(|| {
219 DataFusionError::Execution(
220 "error casting end_insert_time argument as TimestampNanosecondArray".into(),
221 )
222 })?;
223
224 let mut builder = StringBuilder::with_capacity(view_set_names.len(), 64);
225
226 let mut transaction =
228 self.lake.db_pool.begin().await.map_err(|e| {
229 DataFusionError::Execution(format!("Failed to begin transaction: {e}"))
230 })?;
231
232 let mut success_count = 0;
233 let mut has_errors = false;
234
235 for index in 0..view_set_names.len() {
237 if view_set_names.is_null(index)
238 || view_instance_ids.is_null(index)
239 || begin_insert_times.is_null(index)
240 || end_insert_times.is_null(index)
241 {
242 builder.append_value("ERROR: all arguments must be non-null");
243 has_errors = true;
244 continue;
245 }
246
247 let view_set_name = view_set_names.value(index);
248 let view_instance_id = view_instance_ids.value(index);
249 let begin_insert_time_nanos = begin_insert_times.value(index);
250 let end_insert_time_nanos = end_insert_times.value(index);
251
252 let begin_insert_time = DateTime::from_timestamp_nanos(begin_insert_time_nanos);
254 let end_insert_time = DateTime::from_timestamp_nanos(end_insert_time_nanos);
255
256 match self
257 .retire_partition_in_transaction(
258 &mut transaction,
259 view_set_name,
260 view_instance_id,
261 begin_insert_time,
262 end_insert_time,
263 )
264 .await
265 {
266 Ok(()) => {
267 success_count += 1;
268 builder.append_value(format!(
269 "SUCCESS: Retired partition {view_set_name}/{view_instance_id} [{begin_insert_time}, {end_insert_time})"
270 ));
271 }
272 Err(e) => {
273 error!(
274 "Failed to retire partition {}/{} [{}, {}): {:?}",
275 view_set_name, view_instance_id, begin_insert_time, end_insert_time, e
276 );
277 builder.append_value(format!("ERROR: {e:?}"));
278 has_errors = true;
279 }
280 }
281 }
282
283 if has_errors {
285 if let Err(e) = transaction.rollback().await {
286 error!("Failed to rollback transaction after errors: {:?}", e);
287 }
288 info!("Rolled back transaction due to errors in batch retirement");
289 builder.append_value(format!(
290 "ROLLED_BACK: All {} previous changes were reverted due to errors in batch",
291 success_count
292 ));
293 } else {
294 transaction.commit().await.map_err(|e| {
295 DataFusionError::Execution(format!("Failed to commit transaction: {e}"))
296 })?;
297 info!("Successfully retired {} partitions in batch", success_count);
298 }
299
300 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
301 }
302}
303
304pub fn make_retire_partition_by_metadata_udf(
325 lake: Arc<DataLakeConnection>,
326) -> datafusion::logical_expr::async_udf::AsyncScalarUDF {
327 datafusion::logical_expr::async_udf::AsyncScalarUDF::new(Arc::new(
328 RetirePartitionByMetadata::new(lake),
329 ))
330}