micromegas_analytics/lakehouse/
view_instance_table_function.rs

1use super::{
2    lakehouse_context::LakehouseContext, materialized_view::MaterializedView,
3    partition_cache::QueryPartitionProvider, view_factory::ViewFactory,
4};
5use crate::{dfext::expressions::exp_to_string, time::TimeRange};
6use datafusion::{
7    catalog::{TableFunctionImpl, TableProvider},
8    common::plan_err,
9    error::DataFusionError,
10    logical_expr::Expr,
11};
12use micromegas_tracing::prelude::*;
13use std::sync::Arc;
14
15/// `ViewInstanceTableFunction` gives access to any view instance using a [ViewFactory].
16///
17/// ```python
18/// # Python code showing the usage of `view_instance(view_set_name, view_instance_id)`
19/// sql = """
20/// SELECT *
21/// FROM view_instance('thread_spans', '{stream_id}')
22/// ;""".format(stream_id=stream_id)
23/// df_spans = client.query(sql, begin_spans, end_spans)
24/// ```
25///
26#[derive(Debug)]
27pub struct ViewInstanceTableFunction {
28    lakehouse: Arc<LakehouseContext>,
29    view_factory: Arc<ViewFactory>,
30    part_provider: Arc<dyn QueryPartitionProvider>,
31    query_range: Option<TimeRange>,
32}
33
34impl ViewInstanceTableFunction {
35    pub fn new(
36        lakehouse: Arc<LakehouseContext>,
37        view_factory: Arc<ViewFactory>,
38        part_provider: Arc<dyn QueryPartitionProvider>,
39        query_range: Option<TimeRange>,
40    ) -> Self {
41        Self {
42            lakehouse,
43            view_factory,
44            part_provider,
45            query_range,
46        }
47    }
48}
49
50impl TableFunctionImpl for ViewInstanceTableFunction {
51    #[span_fn]
52    fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
53        let arg1 = exprs.first().map(exp_to_string);
54        let Some(Ok(view_set_name)) = arg1 else {
55            return plan_err!(
56                "First argument to view_instance must be a string (the view set name), given {:?}",
57                arg1
58            );
59        };
60        let arg2 = exprs.get(1).map(exp_to_string);
61        let Some(Ok(view_instance_id)) = arg2 else {
62            return plan_err!(
63                "Second argument to view_instance must be a string (the view instance id), given {:?}",
64                arg2
65            );
66        };
67
68        let view = self
69            .view_factory
70            .make_view(&view_set_name, &view_instance_id)
71            .map_err(|e| DataFusionError::Plan(format!("error making view {e:?}")))?;
72
73        Ok(Arc::new(MaterializedView::new(
74            self.lakehouse.clone(),
75            self.lakehouse.reader_factory().clone(),
76            view,
77            self.part_provider.clone(),
78            self.query_range,
79        )))
80    }
81}