micromegas_analytics/lakehouse/
partitioned_execution_plan.rs1use super::{partition::Partition, reader_factory::ReaderFactory};
2use crate::dfext::predicate::filters_to_predicate;
3use datafusion::{
4 arrow::datatypes::SchemaRef,
5 catalog::{Session, memory::DataSourceExec},
6 datasource::{
7 listing::PartitionedFile,
8 physical_plan::{FileScanConfigBuilder, ParquetSource},
9 },
10 execution::object_store::ObjectStoreUrl,
11 physical_plan::ExecutionPlan,
12 prelude::*,
13};
14use micromegas_tracing::prelude::*;
15use std::sync::Arc;
16
17#[span_fn]
19pub fn make_partitioned_execution_plan(
20 schema: SchemaRef,
21 reader_factory: Arc<ReaderFactory>,
22 state: &dyn Session,
23 projection: Option<&Vec<usize>>,
24 filters: &[Expr],
25 limit: Option<usize>,
26 partitions: Arc<Vec<Partition>>,
27) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
28 let predicate = filters_to_predicate(schema.clone(), state, filters)?;
29
30 let mut file_group = vec![];
32 for part in &*partitions {
33 if !part.is_empty() {
34 let file_path = part.file_path.as_ref().ok_or_else(|| {
35 datafusion::error::DataFusionError::Internal(format!(
36 "non-empty partition has no file_path: num_rows={}",
37 part.num_rows
38 ))
39 })?;
40 file_group.push(PartitionedFile::new(file_path, part.file_size as u64));
41 }
42 }
43
44 if file_group.is_empty() {
46 use datafusion::physical_plan::empty::EmptyExec;
47 let projected_schema = if let Some(projection) = projection {
48 Arc::new(schema.project(projection)?)
49 } else {
50 schema
51 };
52 return Ok(Arc::new(EmptyExec::new(projected_schema)));
53 }
54
55 let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
56 let source = Arc::new(
57 ParquetSource::new(schema)
58 .with_predicate(predicate)
59 .with_parquet_file_reader_factory(reader_factory),
60 );
61 let file_scan_config = FileScanConfigBuilder::new(object_store_url, source)
62 .with_limit(limit)
63 .with_projection_indices(projection.cloned())?
64 .with_file_groups(vec![file_group.into()])
65 .build();
66 Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
67}