micromegas_analytics/lakehouse/
list_view_sets_table_function.rs

1use super::view_factory::ViewFactory;
2use crate::lakehouse::catalog::list_view_sets;
3use async_trait::async_trait;
4use datafusion::arrow::array::{ArrayRef, BinaryArray, BooleanArray, StringArray};
5use datafusion::arrow::datatypes::DataType;
6use datafusion::arrow::datatypes::Field;
7use datafusion::arrow::datatypes::Schema;
8use datafusion::arrow::datatypes::SchemaRef;
9use datafusion::arrow::record_batch::RecordBatch;
10use datafusion::catalog::Session;
11use datafusion::catalog::TableFunctionImpl;
12use datafusion::catalog::TableProvider;
13use datafusion::datasource::TableType;
14use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
15use datafusion::error::DataFusionError;
16use datafusion::physical_plan::ExecutionPlan;
17use datafusion::prelude::Expr;
18use std::any::Any;
19use std::sync::Arc;
20
21/// A DataFusion `TableFunctionImpl` for listing view sets with their current schema information.
22#[derive(Debug)]
23pub struct ListViewSetsTableFunction {
24    view_factory: Arc<ViewFactory>,
25}
26
27impl ListViewSetsTableFunction {
28    pub fn new(view_factory: Arc<ViewFactory>) -> Self {
29        Self { view_factory }
30    }
31}
32
33impl TableFunctionImpl for ListViewSetsTableFunction {
34    fn call(
35        &self,
36        _args: &[datafusion::prelude::Expr],
37    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
38        Ok(Arc::new(ListViewSetsTableProvider {
39            view_factory: self.view_factory.clone(),
40        }))
41    }
42}
43
44/// A DataFusion `TableProvider` for listing view sets with their current schema information.
45#[derive(Debug)]
46pub struct ListViewSetsTableProvider {
47    pub view_factory: Arc<ViewFactory>,
48}
49
50#[async_trait]
51impl TableProvider for ListViewSetsTableProvider {
52    fn as_any(&self) -> &dyn Any {
53        self
54    }
55
56    fn schema(&self) -> SchemaRef {
57        Arc::new(Schema::new(vec![
58            Field::new("view_set_name", DataType::Utf8, false),
59            Field::new("current_schema_hash", DataType::Binary, false),
60            Field::new("schema", DataType::Utf8, false),
61            Field::new("has_view_maker", DataType::Boolean, false),
62            Field::new("global_instance_available", DataType::Boolean, false),
63        ]))
64    }
65
66    fn table_type(&self) -> TableType {
67        TableType::Temporary
68    }
69
70    async fn scan(
71        &self,
72        _state: &dyn Session,
73        projection: Option<&Vec<usize>>,
74        _filters: &[Expr],
75        limit: Option<usize>,
76    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
77        // Get current schema versions from the view factory
78        let schema_infos =
79            list_view_sets(&self.view_factory).map_err(|e| DataFusionError::External(e.into()))?;
80
81        // Apply limit early to avoid building unnecessary arrays.
82        // DataFusion trusts us to apply the limit - if we ignore it, too many rows
83        // will be returned to the client.
84        let limited_infos: &[_] = if let Some(n) = limit {
85            &schema_infos[..n.min(schema_infos.len())]
86        } else {
87            &schema_infos
88        };
89
90        // Convert to Arrow arrays
91        let view_set_names: Vec<String> = limited_infos
92            .iter()
93            .map(|info| info.view_set_name.clone())
94            .collect();
95        let schema_hashes: Vec<&[u8]> = limited_infos
96            .iter()
97            .map(|info| info.current_schema_hash.as_slice())
98            .collect();
99        let schemas: Vec<String> = limited_infos
100            .iter()
101            .map(|info| info.schema.clone())
102            .collect();
103        let has_view_makers: Vec<bool> = limited_infos
104            .iter()
105            .map(|info| info.has_view_maker)
106            .collect();
107        let global_instances: Vec<bool> = limited_infos
108            .iter()
109            .map(|info| info.global_instance_available)
110            .collect();
111
112        let view_set_name_array: ArrayRef = Arc::new(StringArray::from(view_set_names));
113        let schema_hash_array: ArrayRef = Arc::new(BinaryArray::from(schema_hashes));
114        let schema_array: ArrayRef = Arc::new(StringArray::from(schemas));
115        let has_view_maker_array: ArrayRef = Arc::new(BooleanArray::from(has_view_makers));
116        let global_instance_array: ArrayRef = Arc::new(BooleanArray::from(global_instances));
117
118        let columns = vec![
119            view_set_name_array,
120            schema_hash_array,
121            schema_array,
122            has_view_maker_array,
123            global_instance_array,
124        ];
125
126        let record_batch = RecordBatch::try_new(self.schema(), columns)
127            .map_err(|e| DataFusionError::External(e.into()))?;
128
129        let source = MemorySourceConfig::try_new(
130            &[vec![record_batch]],
131            self.schema(),
132            projection.map(|v| v.to_owned()),
133        )?;
134        Ok(DataSourceExec::from_data_source(source))
135    }
136}