micromegas_analytics/lakehouse/
materialized_view.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    partitioned_execution_plan::make_partitioned_execution_plan, reader_factory::ReaderFactory,
4    view::View,
5};
6use crate::time::TimeRange;
7use async_trait::async_trait;
8use datafusion::{
9    arrow::datatypes::SchemaRef,
10    catalog::{Session, TableProvider},
11    datasource::TableType,
12    error::DataFusionError,
13    logical_expr::{Expr, TableProviderFilterPushDown},
14    physical_plan::ExecutionPlan,
15};
16use micromegas_tracing::prelude::*;
17use std::{any::Any, sync::Arc};
18
19/// A DataFusion `TableProvider` for materialized views.
20#[derive(Debug)]
21pub struct MaterializedView {
22    lakehouse: Arc<LakehouseContext>,
23    reader_factory: Arc<ReaderFactory>,
24    view: Arc<dyn View>,
25    part_provider: Arc<dyn QueryPartitionProvider>,
26    query_range: Option<TimeRange>,
27}
28
29impl MaterializedView {
30    pub fn new(
31        lakehouse: Arc<LakehouseContext>,
32        reader_factory: Arc<ReaderFactory>,
33        view: Arc<dyn View>,
34        part_provider: Arc<dyn QueryPartitionProvider>,
35        query_range: Option<TimeRange>,
36    ) -> Self {
37        Self {
38            lakehouse,
39            reader_factory,
40            view,
41            part_provider,
42            query_range,
43        }
44    }
45
46    pub fn get_view(&self) -> Arc<dyn View> {
47        self.view.clone()
48    }
49}
50
51#[async_trait]
52impl TableProvider for MaterializedView {
53    fn as_any(&self) -> &dyn Any {
54        self
55    }
56
57    fn schema(&self) -> SchemaRef {
58        self.view.get_file_schema()
59    }
60
61    fn table_type(&self) -> TableType {
62        TableType::Base
63    }
64
65    #[span_fn]
66    async fn scan(
67        &self,
68        state: &dyn Session,
69        projection: Option<&Vec<usize>>,
70        filters: &[Expr],
71        limit: Option<usize>,
72    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
73        self.view
74            .jit_update(self.lakehouse.clone(), self.query_range)
75            .await
76            .map_err(|e| DataFusionError::External(e.into()))?;
77
78        let partitions = self
79            .part_provider
80            .fetch(
81                &self.view.get_view_set_name(),
82                &self.view.get_view_instance_id(),
83                self.query_range,
84                self.view.get_file_schema_hash(),
85            )
86            .await
87            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
88        trace!("MaterializedView::scan nb_partitions={}", partitions.len());
89
90        make_partitioned_execution_plan(
91            self.schema(),
92            self.reader_factory.clone(),
93            state,
94            projection,
95            filters,
96            limit,
97            Arc::new(partitions),
98        )
99    }
100
101    /// Tell DataFusion to push filters down to the scan method
102    fn supports_filters_pushdown(
103        &self,
104        filters: &[&Expr],
105    ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
106        // Inexact because the pruning can't handle all expressions and pruning
107        // is not done at the row level -- there may be rows in returned files
108        // that do not pass the filter
109        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
110    }
111}