micromegas_analytics/lakehouse/
table_scan_rewrite.rs1use crate::{lakehouse::materialized_view::MaterializedView, time::TimeRange};
2use datafusion::error::DataFusionError;
3use datafusion::logical_expr::Filter;
4use datafusion::logical_expr::utils::conjunction;
5use datafusion::{
6 common::tree_node::Transformed, config::ConfigOptions, datasource::DefaultTableSource,
7 logical_expr::LogicalPlan, optimizer::AnalyzerRule,
8};
9use std::sync::Arc;
10
11#[derive(Debug)]
13pub struct TableScanRewrite {
14 query_range: TimeRange,
15}
16
17impl TableScanRewrite {
18 pub fn new(query_range: TimeRange) -> Self {
19 Self { query_range }
20 }
21
22 fn rewrite_plan(
23 &self,
24 plan: LogicalPlan,
25 _options: &ConfigOptions,
26 ) -> datafusion::error::Result<Transformed<LogicalPlan>> {
27 if let LogicalPlan::TableScan(ts) = &plan {
28 let table_source = ts
29 .source
30 .as_any()
31 .downcast_ref::<DefaultTableSource>()
32 .ok_or_else(|| {
33 DataFusionError::Execution(String::from(
34 "error casting table source as DefaultTableSource",
35 ))
36 })?;
37 let Some(mat_view) = table_source
39 .table_provider
40 .as_any()
41 .downcast_ref::<MaterializedView>()
42 else {
43 return Ok(Transformed::no(plan));
45 };
46 let view = mat_view.get_view();
47 let filters = view
48 .make_time_filter(self.query_range.begin, self.query_range.end)
49 .map_err(|e| DataFusionError::External(e.into()))?;
50 let pred = conjunction(filters).ok_or_else(|| {
51 DataFusionError::Execution(String::from("error making a conjunction"))
52 })?;
53 let filter = Filter::try_new(pred, Arc::new(plan.clone()))?;
54 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
55 } else {
56 Ok(Transformed::no(plan))
57 }
58 }
59}
60
61impl AnalyzerRule for TableScanRewrite {
62 fn name(&self) -> &str {
63 "table_scan_rewrite"
64 }
65
66 fn analyze(
67 &self,
68 plan: LogicalPlan,
69 options: &ConfigOptions,
70 ) -> datafusion::error::Result<LogicalPlan> {
71 plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan, options))
72 .map(|res| res.data)
73 }
74}