micromegas_analytics/lakehouse/
retire_partitions_table_function.rs1use anyhow::Context;
2use chrono::DateTime;
3use chrono::Utc;
4use datafusion::catalog::TableFunctionImpl;
5use datafusion::catalog::TableProvider;
6use datafusion::common::plan_err;
7use datafusion::prelude::Expr;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::prelude::*;
10use std::sync::Arc;
11
12use crate::dfext::expressions::exp_to_string;
13use crate::dfext::expressions::exp_to_timestamp;
14use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
15use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
16use crate::response_writer::LogSender;
17use crate::response_writer::Logger;
18
19use super::write_partition::retire_partitions;
20
21#[derive(Debug)]
23pub struct RetirePartitionsTableFunction {
24 lake: Arc<DataLakeConnection>,
25}
26
27impl RetirePartitionsTableFunction {
28 pub fn new(lake: Arc<DataLakeConnection>) -> Self {
29 Self { lake }
30 }
31}
32
33async fn retire_partitions_impl(
34 lake: Arc<DataLakeConnection>,
35 view_set_name: &str,
36 view_instance_id: &str,
37 begin_insert_time: DateTime<Utc>,
38 end_insert_time: DateTime<Utc>,
39 logger: Arc<dyn Logger>,
40) -> anyhow::Result<()> {
41 let mut tr = lake.db_pool.begin().await?;
42 retire_partitions(
43 &mut tr,
44 view_set_name,
45 view_instance_id,
46 begin_insert_time,
47 end_insert_time,
48 logger,
49 )
50 .await?;
51 tr.commit().await.with_context(|| "commit")?;
52 Ok(())
53}
54
55impl TableFunctionImpl for RetirePartitionsTableFunction {
56 fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
57 let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
59 return plan_err!("Missing first argument, expected view_set_name: String");
60 };
61 let Some(view_instance_id) = args.get(1).map(exp_to_string).transpose()? else {
62 return plan_err!("Missing 2nd argument, expected view_instance_id: String");
63 };
64 let Some(begin) = args.get(2).map(exp_to_timestamp).transpose()? else {
65 return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
66 };
67 let Some(end) = args.get(3).map(exp_to_timestamp).transpose()? else {
68 return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
69 };
70
71 let lake = self.lake.clone();
72
73 let spawner = move || {
74 let (tx, rx) = tokio::sync::mpsc::channel(100);
75 let logger = Arc::new(LogSender::new(tx));
76 spawn_with_context(async move {
77 if let Err(e) = retire_partitions_impl(
78 lake,
79 &view_set_name,
80 &view_instance_id,
81 begin,
82 end,
83 logger,
84 )
85 .await
86 .with_context(|| "retire_partitions_impl")
87 {
88 error!("{e:?}");
89 }
90 });
91 rx
92 };
93
94 Ok(Arc::new(LogStreamTableProvider {
95 log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
96 }))
97 }
98}