micromegas_analytics/lakehouse/
materialize_partitions_table_function.rs

1use super::batch_update::materialize_partition_range;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use super::view_factory::ViewFactory;
5use crate::dfext::expressions::exp_to_i64;
6use crate::dfext::expressions::exp_to_string;
7use crate::dfext::expressions::exp_to_timestamp;
8use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
9use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
10use crate::response_writer::LogSender;
11use crate::response_writer::Logger;
12use crate::time::TimeRange;
13use anyhow::Context;
14use chrono::TimeDelta;
15use datafusion::catalog::TableFunctionImpl;
16use datafusion::catalog::TableProvider;
17use datafusion::common::plan_err;
18use datafusion::prelude::Expr;
19use micromegas_tracing::prelude::*;
20use std::sync::Arc;
21
22/// A DataFusion `TableFunctionImpl` for materializing lakehouse partitions.
23#[derive(Debug)]
24pub struct MaterializePartitionsTableFunction {
25    lakehouse: Arc<LakehouseContext>,
26    view_factory: Arc<ViewFactory>,
27}
28
29impl MaterializePartitionsTableFunction {
30    pub fn new(lakehouse: Arc<LakehouseContext>, view_factory: Arc<ViewFactory>) -> Self {
31        Self {
32            lakehouse,
33            view_factory,
34        }
35    }
36}
37
38#[span_fn]
39async fn materialize_partitions_impl(
40    lakehouse: Arc<LakehouseContext>,
41    view_factory: Arc<ViewFactory>,
42    view_name: &str,
43    insert_range: TimeRange,
44    partition_time_delta: TimeDelta,
45    logger: Arc<dyn Logger>,
46) -> anyhow::Result<()> {
47    let view = view_factory
48        .get_global_view(view_name)
49        .with_context(|| format!("can't find view {view_name}"))?;
50
51    let existing_partitions_all_views = Arc::new(
52        PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
53            .await?,
54    );
55
56    materialize_partition_range(
57        existing_partitions_all_views,
58        lakehouse,
59        view,
60        insert_range,
61        partition_time_delta,
62        logger,
63    )
64    .await?;
65    Ok(())
66}
67
68impl TableFunctionImpl for MaterializePartitionsTableFunction {
69    fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
70        // an alternative would be to use coerce & create_physical_expr
71        let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
72            return plan_err!("Missing first argument, expected view_set_name: String");
73        };
74        let Some(begin) = args.get(1).map(exp_to_timestamp).transpose()? else {
75            return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
76        };
77        let Some(end) = args.get(2).map(exp_to_timestamp).transpose()? else {
78            return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
79        };
80        let Some(delta) = args.get(3).map(exp_to_i64).transpose()? else {
81            return plan_err!("Missing 5th argument, expected a number of seconds(i64)");
82        };
83
84        let lakehouse = self.lakehouse.clone();
85        let view_factory = self.view_factory.clone();
86
87        let spawner = move || {
88            let (tx, rx) = tokio::sync::mpsc::channel(100);
89            let logger = Arc::new(LogSender::new(tx));
90            spawn_with_context(async move {
91                if let Err(e) = materialize_partitions_impl(
92                    lakehouse,
93                    view_factory,
94                    &view_set_name,
95                    TimeRange::new(begin, end),
96                    TimeDelta::seconds(delta),
97                    logger.clone(),
98                )
99                .await
100                .with_context(|| "materialize_partitions_impl")
101                {
102                    let msg = format!("{e:?}");
103                    let _ = logger.write_log_entry(msg.clone()).await;
104                    error!("{msg}");
105                }
106            });
107            rx
108        };
109
110        Ok(Arc::new(LogStreamTableProvider {
111            log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
112        }))
113    }
114}