micromegas_analytics/lakehouse/
view_instance_table_function.rs1use super::{
2 lakehouse_context::LakehouseContext, materialized_view::MaterializedView,
3 partition_cache::QueryPartitionProvider, view_factory::ViewFactory,
4};
5use crate::{dfext::expressions::exp_to_string, time::TimeRange};
6use datafusion::{
7 catalog::{TableFunctionImpl, TableProvider},
8 common::plan_err,
9 error::DataFusionError,
10 logical_expr::Expr,
11};
12use micromegas_tracing::prelude::*;
13use std::sync::Arc;
14
15#[derive(Debug)]
27pub struct ViewInstanceTableFunction {
28 lakehouse: Arc<LakehouseContext>,
29 view_factory: Arc<ViewFactory>,
30 part_provider: Arc<dyn QueryPartitionProvider>,
31 query_range: Option<TimeRange>,
32}
33
34impl ViewInstanceTableFunction {
35 pub fn new(
36 lakehouse: Arc<LakehouseContext>,
37 view_factory: Arc<ViewFactory>,
38 part_provider: Arc<dyn QueryPartitionProvider>,
39 query_range: Option<TimeRange>,
40 ) -> Self {
41 Self {
42 lakehouse,
43 view_factory,
44 part_provider,
45 query_range,
46 }
47 }
48}
49
50impl TableFunctionImpl for ViewInstanceTableFunction {
51 #[span_fn]
52 fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
53 let arg1 = exprs.first().map(exp_to_string);
54 let Some(Ok(view_set_name)) = arg1 else {
55 return plan_err!(
56 "First argument to view_instance must be a string (the view set name), given {:?}",
57 arg1
58 );
59 };
60 let arg2 = exprs.get(1).map(exp_to_string);
61 let Some(Ok(view_instance_id)) = arg2 else {
62 return plan_err!(
63 "Second argument to view_instance must be a string (the view instance id), given {:?}",
64 arg2
65 );
66 };
67
68 let view = self
69 .view_factory
70 .make_view(&view_set_name, &view_instance_id)
71 .map_err(|e| DataFusionError::Plan(format!("error making view {e:?}")))?;
72
73 Ok(Arc::new(MaterializedView::new(
74 self.lakehouse.clone(),
75 self.lakehouse.reader_factory().clone(),
76 view,
77 self.part_provider.clone(),
78 self.query_range,
79 )))
80 }
81}