micromegas_analytics/lakehouse/
list_view_sets_table_function.rs1use super::view_factory::ViewFactory;
2use crate::lakehouse::catalog::list_view_sets;
3use async_trait::async_trait;
4use datafusion::arrow::array::{ArrayRef, BinaryArray, BooleanArray, StringArray};
5use datafusion::arrow::datatypes::DataType;
6use datafusion::arrow::datatypes::Field;
7use datafusion::arrow::datatypes::Schema;
8use datafusion::arrow::datatypes::SchemaRef;
9use datafusion::arrow::record_batch::RecordBatch;
10use datafusion::catalog::Session;
11use datafusion::catalog::TableFunctionImpl;
12use datafusion::catalog::TableProvider;
13use datafusion::datasource::TableType;
14use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
15use datafusion::error::DataFusionError;
16use datafusion::physical_plan::ExecutionPlan;
17use datafusion::prelude::Expr;
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Debug)]
23pub struct ListViewSetsTableFunction {
24 view_factory: Arc<ViewFactory>,
25}
26
27impl ListViewSetsTableFunction {
28 pub fn new(view_factory: Arc<ViewFactory>) -> Self {
29 Self { view_factory }
30 }
31}
32
33impl TableFunctionImpl for ListViewSetsTableFunction {
34 fn call(
35 &self,
36 _args: &[datafusion::prelude::Expr],
37 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
38 Ok(Arc::new(ListViewSetsTableProvider {
39 view_factory: self.view_factory.clone(),
40 }))
41 }
42}
43
44#[derive(Debug)]
46pub struct ListViewSetsTableProvider {
47 pub view_factory: Arc<ViewFactory>,
48}
49
50#[async_trait]
51impl TableProvider for ListViewSetsTableProvider {
52 fn as_any(&self) -> &dyn Any {
53 self
54 }
55
56 fn schema(&self) -> SchemaRef {
57 Arc::new(Schema::new(vec![
58 Field::new("view_set_name", DataType::Utf8, false),
59 Field::new("current_schema_hash", DataType::Binary, false),
60 Field::new("schema", DataType::Utf8, false),
61 Field::new("has_view_maker", DataType::Boolean, false),
62 Field::new("global_instance_available", DataType::Boolean, false),
63 ]))
64 }
65
66 fn table_type(&self) -> TableType {
67 TableType::Temporary
68 }
69
70 async fn scan(
71 &self,
72 _state: &dyn Session,
73 projection: Option<&Vec<usize>>,
74 _filters: &[Expr],
75 limit: Option<usize>,
76 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
77 let schema_infos =
79 list_view_sets(&self.view_factory).map_err(|e| DataFusionError::External(e.into()))?;
80
81 let limited_infos: &[_] = if let Some(n) = limit {
85 &schema_infos[..n.min(schema_infos.len())]
86 } else {
87 &schema_infos
88 };
89
90 let view_set_names: Vec<String> = limited_infos
92 .iter()
93 .map(|info| info.view_set_name.clone())
94 .collect();
95 let schema_hashes: Vec<&[u8]> = limited_infos
96 .iter()
97 .map(|info| info.current_schema_hash.as_slice())
98 .collect();
99 let schemas: Vec<String> = limited_infos
100 .iter()
101 .map(|info| info.schema.clone())
102 .collect();
103 let has_view_makers: Vec<bool> = limited_infos
104 .iter()
105 .map(|info| info.has_view_maker)
106 .collect();
107 let global_instances: Vec<bool> = limited_infos
108 .iter()
109 .map(|info| info.global_instance_available)
110 .collect();
111
112 let view_set_name_array: ArrayRef = Arc::new(StringArray::from(view_set_names));
113 let schema_hash_array: ArrayRef = Arc::new(BinaryArray::from(schema_hashes));
114 let schema_array: ArrayRef = Arc::new(StringArray::from(schemas));
115 let has_view_maker_array: ArrayRef = Arc::new(BooleanArray::from(has_view_makers));
116 let global_instance_array: ArrayRef = Arc::new(BooleanArray::from(global_instances));
117
118 let columns = vec![
119 view_set_name_array,
120 schema_hash_array,
121 schema_array,
122 has_view_maker_array,
123 global_instance_array,
124 ];
125
126 let record_batch = RecordBatch::try_new(self.schema(), columns)
127 .map_err(|e| DataFusionError::External(e.into()))?;
128
129 let source = MemorySourceConfig::try_new(
130 &[vec![record_batch]],
131 self.schema(),
132 projection.map(|v| v.to_owned()),
133 )?;
134 Ok(DataSourceExec::from_data_source(source))
135 }
136}