micromegas_analytics/lakehouse/
partitioned_table_provider.rs

1use super::{
2    partition::Partition, partitioned_execution_plan::make_partitioned_execution_plan,
3    reader_factory::ReaderFactory,
4};
5use async_trait::async_trait;
6use datafusion::{
7    arrow::datatypes::SchemaRef,
8    catalog::{Session, TableProvider},
9    datasource::TableType,
10    logical_expr::TableProviderFilterPushDown,
11    physical_plan::ExecutionPlan,
12    prelude::*,
13};
14use std::{any::Any, sync::Arc};
15
16/// A DataFusion `TableProvider` for a set of pre-defined partitions.
17pub struct PartitionedTableProvider {
18    schema: SchemaRef,
19    reader_factory: Arc<ReaderFactory>,
20    partitions: Arc<Vec<Partition>>,
21}
22
23impl std::fmt::Debug for PartitionedTableProvider {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("PartitionedTableProvider")
26            .field("schema", &self.schema)
27            .field("partitions_count", &self.partitions.len())
28            .finish()
29    }
30}
31
32impl PartitionedTableProvider {
33    pub fn new(
34        schema: SchemaRef,
35        reader_factory: Arc<ReaderFactory>,
36        partitions: Arc<Vec<Partition>>,
37    ) -> Self {
38        Self {
39            schema,
40            reader_factory,
41            partitions,
42        }
43    }
44}
45
46#[async_trait]
47impl TableProvider for PartitionedTableProvider {
48    fn as_any(&self) -> &dyn Any {
49        self
50    }
51
52    fn schema(&self) -> SchemaRef {
53        self.schema.clone()
54    }
55
56    fn table_type(&self) -> TableType {
57        TableType::Base
58    }
59
60    async fn scan(
61        &self,
62        state: &dyn Session,
63        projection: Option<&Vec<usize>>,
64        filters: &[Expr],
65        limit: Option<usize>,
66    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
67        make_partitioned_execution_plan(
68            self.schema(),
69            self.reader_factory.clone(),
70            state,
71            projection,
72            filters,
73            limit,
74            self.partitions.clone(),
75        )
76    }
77
78    /// Tell DataFusion to push filters down to the scan method
79    fn supports_filters_pushdown(
80        &self,
81        filters: &[&Expr],
82    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
83        // Inexact because the pruning can't handle all expressions and pruning
84        // is not done at the row level -- there may be rows in returned files
85        // that do not pass the filter
86        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
87    }
88}