micromegas_analytics/lakehouse/
partitioned_execution_plan.rs

1use super::{partition::Partition, reader_factory::ReaderFactory};
2use crate::dfext::predicate::filters_to_predicate;
3use datafusion::{
4    arrow::datatypes::SchemaRef,
5    catalog::{Session, memory::DataSourceExec},
6    datasource::{
7        listing::PartitionedFile,
8        physical_plan::{FileScanConfigBuilder, ParquetSource},
9    },
10    execution::object_store::ObjectStoreUrl,
11    physical_plan::ExecutionPlan,
12    prelude::*,
13};
14use micromegas_tracing::prelude::*;
15use std::sync::Arc;
16
17/// Creates a partitioned execution plan for scanning Parquet files.
18#[span_fn]
19pub fn make_partitioned_execution_plan(
20    schema: SchemaRef,
21    reader_factory: Arc<ReaderFactory>,
22    state: &dyn Session,
23    projection: Option<&Vec<usize>>,
24    filters: &[Expr],
25    limit: Option<usize>,
26    partitions: Arc<Vec<Partition>>,
27) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
28    let predicate = filters_to_predicate(schema.clone(), state, filters)?;
29
30    // Filter out empty partitions (num_rows = 0, file_path = None)
31    let mut file_group = vec![];
32    for part in &*partitions {
33        if !part.is_empty() {
34            let file_path = part.file_path.as_ref().ok_or_else(|| {
35                datafusion::error::DataFusionError::Internal(format!(
36                    "non-empty partition has no file_path: num_rows={}",
37                    part.num_rows
38                ))
39            })?;
40            file_group.push(PartitionedFile::new(file_path, part.file_size as u64));
41        }
42    }
43
44    // If all partitions are empty, return EmptyExec with projected schema
45    if file_group.is_empty() {
46        use datafusion::physical_plan::empty::EmptyExec;
47        let projected_schema = if let Some(projection) = projection {
48            Arc::new(schema.project(projection)?)
49        } else {
50            schema
51        };
52        return Ok(Arc::new(EmptyExec::new(projected_schema)));
53    }
54
55    let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
56    let source = Arc::new(
57        ParquetSource::new(schema)
58            .with_predicate(predicate)
59            .with_parquet_file_reader_factory(reader_factory),
60    );
61    let file_scan_config = FileScanConfigBuilder::new(object_store_url, source)
62        .with_limit(limit)
63        .with_projection_indices(projection.cloned())?
64        .with_file_groups(vec![file_group.into()])
65        .build();
66    Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
67}