micromegas_analytics/lakehouse/
list_partitions_table_function.rs1use crate::sql_arrow_bridge::rows_to_record_batch;
2use async_trait::async_trait;
3use datafusion::arrow::datatypes::DataType;
4use datafusion::arrow::datatypes::Field;
5use datafusion::arrow::datatypes::Schema;
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::arrow::datatypes::TimeUnit;
8use datafusion::catalog::Session;
9use datafusion::catalog::TableFunctionImpl;
10use datafusion::catalog::TableProvider;
11use datafusion::datasource::TableType;
12use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
13use datafusion::error::DataFusionError;
14use datafusion::physical_plan::ExecutionPlan;
15use datafusion::prelude::Expr;
16use micromegas_ingestion::data_lake_connection::DataLakeConnection;
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
22pub struct ListPartitionsTableFunction {
23 lake: Arc<DataLakeConnection>,
24}
25
26impl ListPartitionsTableFunction {
27 pub fn new(lake: Arc<DataLakeConnection>) -> Self {
28 Self { lake }
29 }
30}
31
32impl TableFunctionImpl for ListPartitionsTableFunction {
33 fn call(
34 &self,
35 _args: &[datafusion::prelude::Expr],
36 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
37 Ok(Arc::new(ListPartitionsTableProvider {
38 lake: self.lake.clone(),
39 }))
40 }
41}
42
43#[derive(Debug)]
45pub struct ListPartitionsTableProvider {
46 pub lake: Arc<DataLakeConnection>,
47}
48
49#[async_trait]
50impl TableProvider for ListPartitionsTableProvider {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn schema(&self) -> SchemaRef {
56 Arc::new(Schema::new(vec![
57 Field::new("view_set_name", DataType::Utf8, false),
58 Field::new("view_instance_id", DataType::Utf8, false),
59 Field::new(
60 "begin_insert_time",
61 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
62 false,
63 ),
64 Field::new(
65 "end_insert_time",
66 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
67 false,
68 ),
69 Field::new(
70 "min_event_time",
71 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
72 true,
73 ),
74 Field::new(
75 "max_event_time",
76 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
77 true,
78 ),
79 Field::new(
80 "updated",
81 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
82 false,
83 ),
84 Field::new("file_path", DataType::Utf8, true),
85 Field::new("file_size", DataType::Int64, false),
86 Field::new("file_schema_hash", DataType::Binary, false),
87 Field::new("source_data_hash", DataType::Binary, false),
88 Field::new("num_rows", DataType::Int64, false),
89 Field::new("partition_format_version", DataType::Int32, false),
90 ]))
91 }
92
93 fn table_type(&self) -> TableType {
94 TableType::Temporary
95 }
96
97 async fn scan(
98 &self,
99 _state: &dyn Session,
100 projection: Option<&Vec<usize>>,
101 _filters: &[Expr],
102 limit: Option<usize>,
103 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
104 let query = if let Some(n) = limit {
111 format!(
112 "SELECT view_set_name,
113 view_instance_id,
114 begin_insert_time,
115 end_insert_time,
116 min_event_time,
117 max_event_time,
118 updated,
119 file_path,
120 file_size,
121 file_schema_hash,
122 source_data_hash,
123 num_rows,
124 partition_format_version
125 FROM lakehouse_partitions
126 LIMIT {n};"
127 )
128 } else {
129 "SELECT view_set_name,
130 view_instance_id,
131 begin_insert_time,
132 end_insert_time,
133 min_event_time,
134 max_event_time,
135 updated,
136 file_path,
137 file_size,
138 file_schema_hash,
139 source_data_hash,
140 num_rows,
141 partition_format_version
142 FROM lakehouse_partitions;"
143 .to_string()
144 };
145
146 let rows = sqlx::query(&query)
147 .fetch_all(&self.lake.db_pool)
148 .await
149 .map_err(|e| DataFusionError::External(e.into()))?;
150 let rb = rows_to_record_batch(&rows).map_err(|e| DataFusionError::External(e.into()))?;
151
152 let source = MemorySourceConfig::try_new(
153 &[vec![rb]],
154 self.schema(),
155 projection.map(|v| v.to_owned()),
156 )?;
157 Ok(DataSourceExec::from_data_source(source))
158 }
159}