micromegas_analytics/lakehouse/
query.rs

1use super::{
2    answer::Answer, get_payload_function::GetPayload, lakehouse_context::LakehouseContext,
3    list_partitions_table_function::ListPartitionsTableFunction,
4    list_view_sets_table_function::ListViewSetsTableFunction,
5    materialize_partitions_table_function::MaterializePartitionsTableFunction,
6    parse_block_table_function::ParseBlockTableFunction, partition::Partition,
7    partition_cache::QueryPartitionProvider, partitioned_table_provider::PartitionedTableProvider,
8    perfetto_trace_table_function::PerfettoTraceTableFunction,
9    process_spans_table_function::ProcessSpansTableFunction, reader_factory::ReaderFactory,
10    retire_partition_by_file_udf::make_retire_partition_by_file_udf,
11    retire_partition_by_metadata_udf::make_retire_partition_by_metadata_udf,
12    retire_partitions_table_function::RetirePartitionsTableFunction,
13    session_configurator::SessionConfigurator, view::View, view_factory::ViewFactory,
14};
15use crate::{
16    lakehouse::{
17        materialized_view::MaterializedView, table_scan_rewrite::TableScanRewrite,
18        view_instance_table_function::ViewInstanceTableFunction,
19    },
20    properties::{
21        properties_to_dict_udf::PropertiesToDict, properties_to_jsonb_udf::PropertiesToJsonb,
22    },
23    time::TimeRange,
24};
25use anyhow::{Context, Result};
26use datafusion::{
27    arrow::{array::RecordBatch, datatypes::SchemaRef},
28    execution::{context::SessionContext, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
29    logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
30    prelude::*,
31    sql::TableReference,
32};
33use micromegas_tracing::prelude::*;
34use std::sync::Arc;
35
36#[span_fn]
37async fn register_table(
38    lakehouse: Arc<LakehouseContext>,
39    reader_factory: Arc<ReaderFactory>,
40    part_provider: Arc<dyn QueryPartitionProvider>,
41    query_range: Option<TimeRange>,
42    ctx: &SessionContext,
43    view: Arc<dyn View>,
44) -> Result<()> {
45    let table = MaterializedView::new(
46        lakehouse,
47        reader_factory,
48        view.clone(),
49        part_provider,
50        query_range,
51    );
52    view.register_table(ctx, table).await
53}
54
55/// query_partitions_context returns a context to run queries using the partitions as the "source" table
56#[span_fn]
57pub async fn query_partitions_context(
58    runtime: Arc<RuntimeEnv>,
59    reader_factory: Arc<ReaderFactory>,
60    object_store: Arc<dyn object_store::ObjectStore>,
61    schema: SchemaRef,
62    partitions: Arc<Vec<Partition>>,
63) -> Result<SessionContext> {
64    let table = PartitionedTableProvider::new(schema, reader_factory, partitions);
65    let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
66    let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
67    ctx.register_object_store(object_store_url.as_ref(), object_store);
68    ctx.register_table(
69        TableReference::Bare {
70            table: "source".into(),
71        },
72        Arc::new(table),
73    )?;
74    register_extension_functions(&ctx);
75    Ok(ctx)
76}
77
78// query_partitions returns a dataframe, leaving the option of streaming the results
79#[span_fn]
80pub async fn query_partitions(
81    runtime: Arc<RuntimeEnv>,
82    reader_factory: Arc<ReaderFactory>,
83    object_store: Arc<dyn object_store::ObjectStore>,
84    schema: SchemaRef,
85    partitions: Arc<Vec<Partition>>,
86    sql: &str,
87) -> Result<DataFrame> {
88    let ctx =
89        query_partitions_context(runtime, reader_factory, object_store, schema, partitions).await?;
90    Ok(ctx.sql(sql).await?)
91}
92
93/// register functions that are part of the lakehouse architecture
94#[span_fn]
95pub fn register_lakehouse_functions(
96    ctx: &SessionContext,
97    lakehouse: Arc<LakehouseContext>,
98    part_provider: Arc<dyn QueryPartitionProvider>,
99    query_range: Option<TimeRange>,
100    view_factory: Arc<ViewFactory>,
101) {
102    ctx.register_udtf(
103        "view_instance",
104        Arc::new(ViewInstanceTableFunction::new(
105            lakehouse.clone(),
106            view_factory.clone(),
107            part_provider.clone(),
108            query_range,
109        )),
110    );
111    ctx.register_udtf(
112        "list_partitions",
113        Arc::new(ListPartitionsTableFunction::new(lakehouse.lake().clone())),
114    );
115    ctx.register_udtf(
116        "list_view_sets",
117        Arc::new(ListViewSetsTableFunction::new(view_factory.clone())),
118    );
119    ctx.register_udtf(
120        "retire_partitions",
121        Arc::new(RetirePartitionsTableFunction::new(lakehouse.lake().clone())),
122    );
123    ctx.register_udtf(
124        "perfetto_trace_chunks",
125        Arc::new(PerfettoTraceTableFunction::new(
126            lakehouse.clone(),
127            view_factory.clone(),
128            part_provider.clone(),
129        )),
130    );
131    ctx.register_udtf(
132        "materialize_partitions",
133        Arc::new(MaterializePartitionsTableFunction::new(
134            lakehouse.clone(),
135            view_factory.clone(),
136        )),
137    );
138    ctx.register_udtf(
139        "parse_block",
140        Arc::new(ParseBlockTableFunction::new(
141            lakehouse.clone(),
142            view_factory.clone(),
143            part_provider.clone(),
144            query_range,
145        )),
146    );
147    ctx.register_udtf(
148        "process_spans",
149        Arc::new(ProcessSpansTableFunction::new(
150            lakehouse.clone(),
151            view_factory.clone(),
152            part_provider.clone(),
153            query_range,
154        )),
155    );
156    ctx.register_udf(
157        AsyncScalarUDF::new(Arc::new(GetPayload::new(lakehouse.lake().clone()))).into_scalar_udf(),
158    );
159    ctx.register_udf(make_retire_partition_by_file_udf(lakehouse.lake().clone()).into_scalar_udf());
160    ctx.register_udf(
161        make_retire_partition_by_metadata_udf(lakehouse.lake().clone()).into_scalar_udf(),
162    );
163}
164
165/// register functions that are not depended on the lakehouse architecture
166#[span_fn]
167pub fn register_extension_functions(ctx: &SessionContext) {
168    ctx.register_udf(ScalarUDF::from(PropertiesToDict::new()));
169    ctx.register_udf(ScalarUDF::from(PropertiesToJsonb::new()));
170    micromegas_datafusion_extensions::register_extension_udfs(ctx);
171}
172
173#[span_fn]
174pub fn register_functions(
175    ctx: &SessionContext,
176    lakehouse: Arc<LakehouseContext>,
177    part_provider: Arc<dyn QueryPartitionProvider>,
178    query_range: Option<TimeRange>,
179    view_factory: Arc<ViewFactory>,
180) {
181    register_lakehouse_functions(ctx, lakehouse, part_provider, query_range, view_factory);
182    register_extension_functions(ctx);
183}
184
185#[span_fn]
186pub async fn make_session_context(
187    lakehouse: Arc<LakehouseContext>,
188    part_provider: Arc<dyn QueryPartitionProvider>,
189    query_range: Option<TimeRange>,
190    view_factory: Arc<ViewFactory>,
191    configurator: Arc<dyn SessionConfigurator>,
192) -> Result<SessionContext> {
193    // Disable page index reading for backward compatibility with legacy Parquet files
194    // Legacy files may have incomplete ColumnIndex metadata (missing null_pages field)
195    // which causes errors in DataFusion 51+ with Arrow 57.0 when reading page indexes
196    let config = SessionConfig::default()
197        .set_bool("datafusion.execution.parquet.enable_page_index", false)
198        .with_information_schema(true);
199    let ctx = SessionContext::new_with_config_rt(config, lakehouse.runtime().clone());
200    if let Some(range) = &query_range {
201        ctx.add_analyzer_rule(Arc::new(TableScanRewrite::new(*range)));
202    }
203    let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
204    let object_store = lakehouse.lake().blob_storage.inner();
205    ctx.register_object_store(object_store_url.as_ref(), object_store);
206    let reader_factory = lakehouse.reader_factory().clone();
207    register_functions(
208        &ctx,
209        lakehouse.clone(),
210        part_provider.clone(),
211        query_range,
212        view_factory.clone(),
213    );
214    for view in view_factory.get_global_views() {
215        register_table(
216            lakehouse.clone(),
217            reader_factory.clone(),
218            part_provider.clone(),
219            query_range,
220            &ctx,
221            view.clone(),
222        )
223        .await?;
224    }
225    // Apply custom configuration
226    configurator.configure(&ctx).await?;
227    Ok(ctx)
228}
229
230#[span_fn]
231pub async fn query(
232    lakehouse: Arc<LakehouseContext>,
233    part_provider: Arc<dyn QueryPartitionProvider>,
234    query_range: Option<TimeRange>,
235    sql: &str,
236    view_factory: Arc<ViewFactory>,
237    configurator: Arc<dyn SessionConfigurator>,
238) -> Result<Answer> {
239    info!("query sql={sql}");
240    let ctx = make_session_context(
241        lakehouse,
242        part_provider,
243        query_range,
244        view_factory,
245        configurator,
246    )
247    .await
248    .with_context(|| "make_session_context")?;
249    let df = ctx.sql(sql).await?;
250    let schema = df.schema().inner().clone();
251    let batches: Vec<RecordBatch> = df.collect().await?;
252    Ok(Answer::new(schema, batches))
253}