micromegas_analytics/lakehouse/
materialized_view.rs1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 partitioned_execution_plan::make_partitioned_execution_plan, reader_factory::ReaderFactory,
4 view::View,
5};
6use crate::time::TimeRange;
7use async_trait::async_trait;
8use datafusion::{
9 arrow::datatypes::SchemaRef,
10 catalog::{Session, TableProvider},
11 datasource::TableType,
12 error::DataFusionError,
13 logical_expr::{Expr, TableProviderFilterPushDown},
14 physical_plan::ExecutionPlan,
15};
16use micromegas_tracing::prelude::*;
17use std::{any::Any, sync::Arc};
18
19#[derive(Debug)]
21pub struct MaterializedView {
22 lakehouse: Arc<LakehouseContext>,
23 reader_factory: Arc<ReaderFactory>,
24 view: Arc<dyn View>,
25 part_provider: Arc<dyn QueryPartitionProvider>,
26 query_range: Option<TimeRange>,
27}
28
29impl MaterializedView {
30 pub fn new(
31 lakehouse: Arc<LakehouseContext>,
32 reader_factory: Arc<ReaderFactory>,
33 view: Arc<dyn View>,
34 part_provider: Arc<dyn QueryPartitionProvider>,
35 query_range: Option<TimeRange>,
36 ) -> Self {
37 Self {
38 lakehouse,
39 reader_factory,
40 view,
41 part_provider,
42 query_range,
43 }
44 }
45
46 pub fn get_view(&self) -> Arc<dyn View> {
47 self.view.clone()
48 }
49}
50
51#[async_trait]
52impl TableProvider for MaterializedView {
53 fn as_any(&self) -> &dyn Any {
54 self
55 }
56
57 fn schema(&self) -> SchemaRef {
58 self.view.get_file_schema()
59 }
60
61 fn table_type(&self) -> TableType {
62 TableType::Base
63 }
64
65 #[span_fn]
66 async fn scan(
67 &self,
68 state: &dyn Session,
69 projection: Option<&Vec<usize>>,
70 filters: &[Expr],
71 limit: Option<usize>,
72 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
73 self.view
74 .jit_update(self.lakehouse.clone(), self.query_range)
75 .await
76 .map_err(|e| DataFusionError::External(e.into()))?;
77
78 let partitions = self
79 .part_provider
80 .fetch(
81 &self.view.get_view_set_name(),
82 &self.view.get_view_instance_id(),
83 self.query_range,
84 self.view.get_file_schema_hash(),
85 )
86 .await
87 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
88 trace!("MaterializedView::scan nb_partitions={}", partitions.len());
89
90 make_partitioned_execution_plan(
91 self.schema(),
92 self.reader_factory.clone(),
93 state,
94 projection,
95 filters,
96 limit,
97 Arc::new(partitions),
98 )
99 }
100
101 fn supports_filters_pushdown(
103 &self,
104 filters: &[&Expr],
105 ) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
106 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
110 }
111}