micromegas_analytics/lakehouse/
retire_partitions_table_function.rs

1use anyhow::Context;
2use chrono::DateTime;
3use chrono::Utc;
4use datafusion::catalog::TableFunctionImpl;
5use datafusion::catalog::TableProvider;
6use datafusion::common::plan_err;
7use datafusion::prelude::Expr;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::prelude::*;
10use std::sync::Arc;
11
12use crate::dfext::expressions::exp_to_string;
13use crate::dfext::expressions::exp_to_timestamp;
14use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
15use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
16use crate::response_writer::LogSender;
17use crate::response_writer::Logger;
18
19use super::write_partition::retire_partitions;
20
21/// A DataFusion `TableFunctionImpl` for retiring lakehouse partitions.
22#[derive(Debug)]
23pub struct RetirePartitionsTableFunction {
24    lake: Arc<DataLakeConnection>,
25}
26
27impl RetirePartitionsTableFunction {
28    pub fn new(lake: Arc<DataLakeConnection>) -> Self {
29        Self { lake }
30    }
31}
32
33async fn retire_partitions_impl(
34    lake: Arc<DataLakeConnection>,
35    view_set_name: &str,
36    view_instance_id: &str,
37    begin_insert_time: DateTime<Utc>,
38    end_insert_time: DateTime<Utc>,
39    logger: Arc<dyn Logger>,
40) -> anyhow::Result<()> {
41    let mut tr = lake.db_pool.begin().await?;
42    retire_partitions(
43        &mut tr,
44        view_set_name,
45        view_instance_id,
46        begin_insert_time,
47        end_insert_time,
48        logger,
49    )
50    .await?;
51    tr.commit().await.with_context(|| "commit")?;
52    Ok(())
53}
54
55impl TableFunctionImpl for RetirePartitionsTableFunction {
56    fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
57        // an alternative would be to use coerce & create_physical_expr
58        let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
59            return plan_err!("Missing first argument, expected view_set_name: String");
60        };
61        let Some(view_instance_id) = args.get(1).map(exp_to_string).transpose()? else {
62            return plan_err!("Missing 2nd argument, expected view_instance_id: String");
63        };
64        let Some(begin) = args.get(2).map(exp_to_timestamp).transpose()? else {
65            return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
66        };
67        let Some(end) = args.get(3).map(exp_to_timestamp).transpose()? else {
68            return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
69        };
70
71        let lake = self.lake.clone();
72
73        let spawner = move || {
74            let (tx, rx) = tokio::sync::mpsc::channel(100);
75            let logger = Arc::new(LogSender::new(tx));
76            spawn_with_context(async move {
77                if let Err(e) = retire_partitions_impl(
78                    lake,
79                    &view_set_name,
80                    &view_instance_id,
81                    begin,
82                    end,
83                    logger,
84                )
85                .await
86                .with_context(|| "retire_partitions_impl")
87                {
88                    error!("{e:?}");
89                }
90            });
91            rx
92        };
93
94        Ok(Arc::new(LogStreamTableProvider {
95            log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
96        }))
97    }
98}