micromegas_analytics/lakehouse/
partitioned_table_provider.rs1use super::{
2 partition::Partition, partitioned_execution_plan::make_partitioned_execution_plan,
3 reader_factory::ReaderFactory,
4};
5use async_trait::async_trait;
6use datafusion::{
7 arrow::datatypes::SchemaRef,
8 catalog::{Session, TableProvider},
9 datasource::TableType,
10 logical_expr::TableProviderFilterPushDown,
11 physical_plan::ExecutionPlan,
12 prelude::*,
13};
14use std::{any::Any, sync::Arc};
15
16pub struct PartitionedTableProvider {
18 schema: SchemaRef,
19 reader_factory: Arc<ReaderFactory>,
20 partitions: Arc<Vec<Partition>>,
21}
22
23impl std::fmt::Debug for PartitionedTableProvider {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("PartitionedTableProvider")
26 .field("schema", &self.schema)
27 .field("partitions_count", &self.partitions.len())
28 .finish()
29 }
30}
31
32impl PartitionedTableProvider {
33 pub fn new(
34 schema: SchemaRef,
35 reader_factory: Arc<ReaderFactory>,
36 partitions: Arc<Vec<Partition>>,
37 ) -> Self {
38 Self {
39 schema,
40 reader_factory,
41 partitions,
42 }
43 }
44}
45
46#[async_trait]
47impl TableProvider for PartitionedTableProvider {
48 fn as_any(&self) -> &dyn Any {
49 self
50 }
51
52 fn schema(&self) -> SchemaRef {
53 self.schema.clone()
54 }
55
56 fn table_type(&self) -> TableType {
57 TableType::Base
58 }
59
60 async fn scan(
61 &self,
62 state: &dyn Session,
63 projection: Option<&Vec<usize>>,
64 filters: &[Expr],
65 limit: Option<usize>,
66 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
67 make_partitioned_execution_plan(
68 self.schema(),
69 self.reader_factory.clone(),
70 state,
71 projection,
72 filters,
73 limit,
74 self.partitions.clone(),
75 )
76 }
77
78 fn supports_filters_pushdown(
80 &self,
81 filters: &[&Expr],
82 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
83 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
87 }
88}