micromegas_analytics/lakehouse/
list_partitions_table_function.rs

1use crate::sql_arrow_bridge::rows_to_record_batch;
2use async_trait::async_trait;
3use datafusion::arrow::datatypes::DataType;
4use datafusion::arrow::datatypes::Field;
5use datafusion::arrow::datatypes::Schema;
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::arrow::datatypes::TimeUnit;
8use datafusion::catalog::Session;
9use datafusion::catalog::TableFunctionImpl;
10use datafusion::catalog::TableProvider;
11use datafusion::datasource::TableType;
12use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
13use datafusion::error::DataFusionError;
14use datafusion::physical_plan::ExecutionPlan;
15use datafusion::prelude::Expr;
16use micromegas_ingestion::data_lake_connection::DataLakeConnection;
17use std::any::Any;
18use std::sync::Arc;
19
20/// A DataFusion `TableFunctionImpl` for listing lakehouse partitions.
21#[derive(Debug)]
22pub struct ListPartitionsTableFunction {
23    lake: Arc<DataLakeConnection>,
24}
25
26impl ListPartitionsTableFunction {
27    pub fn new(lake: Arc<DataLakeConnection>) -> Self {
28        Self { lake }
29    }
30}
31
32impl TableFunctionImpl for ListPartitionsTableFunction {
33    fn call(
34        &self,
35        _args: &[datafusion::prelude::Expr],
36    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
37        Ok(Arc::new(ListPartitionsTableProvider {
38            lake: self.lake.clone(),
39        }))
40    }
41}
42
43/// A DataFusion `TableProvider` for listing lakehouse partitions.
44#[derive(Debug)]
45pub struct ListPartitionsTableProvider {
46    pub lake: Arc<DataLakeConnection>,
47}
48
49#[async_trait]
50impl TableProvider for ListPartitionsTableProvider {
51    fn as_any(&self) -> &dyn Any {
52        self
53    }
54
55    fn schema(&self) -> SchemaRef {
56        Arc::new(Schema::new(vec![
57            Field::new("view_set_name", DataType::Utf8, false),
58            Field::new("view_instance_id", DataType::Utf8, false),
59            Field::new(
60                "begin_insert_time",
61                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
62                false,
63            ),
64            Field::new(
65                "end_insert_time",
66                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
67                false,
68            ),
69            Field::new(
70                "min_event_time",
71                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
72                true,
73            ),
74            Field::new(
75                "max_event_time",
76                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
77                true,
78            ),
79            Field::new(
80                "updated",
81                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
82                false,
83            ),
84            Field::new("file_path", DataType::Utf8, true),
85            Field::new("file_size", DataType::Int64, false),
86            Field::new("file_schema_hash", DataType::Binary, false),
87            Field::new("source_data_hash", DataType::Binary, false),
88            Field::new("num_rows", DataType::Int64, false),
89            Field::new("partition_format_version", DataType::Int32, false),
90        ]))
91    }
92
93    fn table_type(&self) -> TableType {
94        TableType::Temporary
95    }
96
97    async fn scan(
98        &self,
99        _state: &dyn Session,
100        projection: Option<&Vec<usize>>,
101        _filters: &[Expr],
102        limit: Option<usize>,
103    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
104        // Build query with optional LIMIT clause pushed down to PostgreSQL.
105        // DataFusion only pushes the limit when it's safe to do so (i.e., when there
106        // are no WHERE clauses that could filter rows). When filters are present,
107        // DataFusion passes limit=None and applies the limit after filtering.
108        // Important: DataFusion trusts us to apply the limit - if we ignore it,
109        // too many rows will be returned to the client.
110        let query = if let Some(n) = limit {
111            format!(
112                "SELECT view_set_name,
113                        view_instance_id,
114                        begin_insert_time,
115                        end_insert_time,
116                        min_event_time,
117                        max_event_time,
118                        updated,
119                        file_path,
120                        file_size,
121                        file_schema_hash,
122                        source_data_hash,
123                        num_rows,
124                        partition_format_version
125                 FROM lakehouse_partitions
126                 LIMIT {n};"
127            )
128        } else {
129            "SELECT view_set_name,
130                    view_instance_id,
131                    begin_insert_time,
132                    end_insert_time,
133                    min_event_time,
134                    max_event_time,
135                    updated,
136                    file_path,
137                    file_size,
138                    file_schema_hash,
139                    source_data_hash,
140                    num_rows,
141                    partition_format_version
142             FROM lakehouse_partitions;"
143                .to_string()
144        };
145
146        let rows = sqlx::query(&query)
147            .fetch_all(&self.lake.db_pool)
148            .await
149            .map_err(|e| DataFusionError::External(e.into()))?;
150        let rb = rows_to_record_batch(&rows).map_err(|e| DataFusionError::External(e.into()))?;
151
152        let source = MemorySourceConfig::try_new(
153            &[vec![rb]],
154            self.schema(),
155            projection.map(|v| v.to_owned()),
156        )?;
157        Ok(DataSourceExec::from_data_source(source))
158    }
159}