micromegas_analytics/lakehouse/
materialize_partitions_table_function.rs1use super::batch_update::materialize_partition_range;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use super::view_factory::ViewFactory;
5use crate::dfext::expressions::exp_to_i64;
6use crate::dfext::expressions::exp_to_string;
7use crate::dfext::expressions::exp_to_timestamp;
8use crate::dfext::log_stream_table_provider::LogStreamTableProvider;
9use crate::dfext::task_log_exec_plan::TaskLogExecPlan;
10use crate::response_writer::LogSender;
11use crate::response_writer::Logger;
12use crate::time::TimeRange;
13use anyhow::Context;
14use chrono::TimeDelta;
15use datafusion::catalog::TableFunctionImpl;
16use datafusion::catalog::TableProvider;
17use datafusion::common::plan_err;
18use datafusion::prelude::Expr;
19use micromegas_tracing::prelude::*;
20use std::sync::Arc;
21
22#[derive(Debug)]
24pub struct MaterializePartitionsTableFunction {
25 lakehouse: Arc<LakehouseContext>,
26 view_factory: Arc<ViewFactory>,
27}
28
29impl MaterializePartitionsTableFunction {
30 pub fn new(lakehouse: Arc<LakehouseContext>, view_factory: Arc<ViewFactory>) -> Self {
31 Self {
32 lakehouse,
33 view_factory,
34 }
35 }
36}
37
38#[span_fn]
39async fn materialize_partitions_impl(
40 lakehouse: Arc<LakehouseContext>,
41 view_factory: Arc<ViewFactory>,
42 view_name: &str,
43 insert_range: TimeRange,
44 partition_time_delta: TimeDelta,
45 logger: Arc<dyn Logger>,
46) -> anyhow::Result<()> {
47 let view = view_factory
48 .get_global_view(view_name)
49 .with_context(|| format!("can't find view {view_name}"))?;
50
51 let existing_partitions_all_views = Arc::new(
52 PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
53 .await?,
54 );
55
56 materialize_partition_range(
57 existing_partitions_all_views,
58 lakehouse,
59 view,
60 insert_range,
61 partition_time_delta,
62 logger,
63 )
64 .await?;
65 Ok(())
66}
67
68impl TableFunctionImpl for MaterializePartitionsTableFunction {
69 fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
70 let Some(view_set_name) = args.first().map(exp_to_string).transpose()? else {
72 return plan_err!("Missing first argument, expected view_set_name: String");
73 };
74 let Some(begin) = args.get(1).map(exp_to_timestamp).transpose()? else {
75 return plan_err!("Missing 3rd argument, expected a UTC nanoseconds timestamp");
76 };
77 let Some(end) = args.get(2).map(exp_to_timestamp).transpose()? else {
78 return plan_err!("Missing 4th argument, expected a UTC nanoseconds timestamp");
79 };
80 let Some(delta) = args.get(3).map(exp_to_i64).transpose()? else {
81 return plan_err!("Missing 5th argument, expected a number of seconds(i64)");
82 };
83
84 let lakehouse = self.lakehouse.clone();
85 let view_factory = self.view_factory.clone();
86
87 let spawner = move || {
88 let (tx, rx) = tokio::sync::mpsc::channel(100);
89 let logger = Arc::new(LogSender::new(tx));
90 spawn_with_context(async move {
91 if let Err(e) = materialize_partitions_impl(
92 lakehouse,
93 view_factory,
94 &view_set_name,
95 TimeRange::new(begin, end),
96 TimeDelta::seconds(delta),
97 logger.clone(),
98 )
99 .await
100 .with_context(|| "materialize_partitions_impl")
101 {
102 let msg = format!("{e:?}");
103 let _ = logger.write_log_entry(msg.clone()).await;
104 error!("{msg}");
105 }
106 });
107 rx
108 };
109
110 Ok(Arc::new(LogStreamTableProvider {
111 log_stream: Arc::new(TaskLogExecPlan::new(Box::new(spawner))),
112 }))
113 }
114}