micromegas_analytics/lakehouse/
table_scan_rewrite.rs

1use crate::{lakehouse::materialized_view::MaterializedView, time::TimeRange};
2use datafusion::error::DataFusionError;
3use datafusion::logical_expr::Filter;
4use datafusion::logical_expr::utils::conjunction;
5use datafusion::{
6    common::tree_node::Transformed, config::ConfigOptions, datasource::DefaultTableSource,
7    logical_expr::LogicalPlan, optimizer::AnalyzerRule,
8};
9use std::sync::Arc;
10
11/// An analyzer rule that rewrites table scans to include time-based filters.
12#[derive(Debug)]
13pub struct TableScanRewrite {
14    query_range: TimeRange,
15}
16
17impl TableScanRewrite {
18    pub fn new(query_range: TimeRange) -> Self {
19        Self { query_range }
20    }
21
22    fn rewrite_plan(
23        &self,
24        plan: LogicalPlan,
25        _options: &ConfigOptions,
26    ) -> datafusion::error::Result<Transformed<LogicalPlan>> {
27        if let LogicalPlan::TableScan(ts) = &plan {
28            let table_source = ts
29                .source
30                .as_any()
31                .downcast_ref::<DefaultTableSource>()
32                .ok_or_else(|| {
33                    DataFusionError::Execution(String::from(
34                        "error casting table source as DefaultTableSource",
35                    ))
36                })?;
37            // Only rewrite MaterializedView tables, skip others (like table functions)
38            let Some(mat_view) = table_source
39                .table_provider
40                .as_any()
41                .downcast_ref::<MaterializedView>()
42            else {
43                // This is not a MaterializedView (e.g., a table function), skip rewriting
44                return Ok(Transformed::no(plan));
45            };
46            let view = mat_view.get_view();
47            let filters = view
48                .make_time_filter(self.query_range.begin, self.query_range.end)
49                .map_err(|e| DataFusionError::External(e.into()))?;
50            let pred = conjunction(filters).ok_or_else(|| {
51                DataFusionError::Execution(String::from("error making a conjunction"))
52            })?;
53            let filter = Filter::try_new(pred, Arc::new(plan.clone()))?;
54            Ok(Transformed::yes(LogicalPlan::Filter(filter)))
55        } else {
56            Ok(Transformed::no(plan))
57        }
58    }
59}
60
61impl AnalyzerRule for TableScanRewrite {
62    fn name(&self) -> &str {
63        "table_scan_rewrite"
64    }
65
66    fn analyze(
67        &self,
68        plan: LogicalPlan,
69        options: &ConfigOptions,
70    ) -> datafusion::error::Result<LogicalPlan> {
71        plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan, options))
72            .map(|res| res.data)
73    }
74}