micromegas_analytics/lakehouse/
query.rs1use super::{
2 answer::Answer, get_payload_function::GetPayload, lakehouse_context::LakehouseContext,
3 list_partitions_table_function::ListPartitionsTableFunction,
4 list_view_sets_table_function::ListViewSetsTableFunction,
5 materialize_partitions_table_function::MaterializePartitionsTableFunction,
6 parse_block_table_function::ParseBlockTableFunction, partition::Partition,
7 partition_cache::QueryPartitionProvider, partitioned_table_provider::PartitionedTableProvider,
8 perfetto_trace_table_function::PerfettoTraceTableFunction,
9 process_spans_table_function::ProcessSpansTableFunction, reader_factory::ReaderFactory,
10 retire_partition_by_file_udf::make_retire_partition_by_file_udf,
11 retire_partition_by_metadata_udf::make_retire_partition_by_metadata_udf,
12 retire_partitions_table_function::RetirePartitionsTableFunction,
13 session_configurator::SessionConfigurator, view::View, view_factory::ViewFactory,
14};
15use crate::{
16 lakehouse::{
17 materialized_view::MaterializedView, table_scan_rewrite::TableScanRewrite,
18 view_instance_table_function::ViewInstanceTableFunction,
19 },
20 properties::{
21 properties_to_dict_udf::PropertiesToDict, properties_to_jsonb_udf::PropertiesToJsonb,
22 },
23 time::TimeRange,
24};
25use anyhow::{Context, Result};
26use datafusion::{
27 arrow::{array::RecordBatch, datatypes::SchemaRef},
28 execution::{context::SessionContext, object_store::ObjectStoreUrl, runtime_env::RuntimeEnv},
29 logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
30 prelude::*,
31 sql::TableReference,
32};
33use micromegas_tracing::prelude::*;
34use std::sync::Arc;
35
36#[span_fn]
37async fn register_table(
38 lakehouse: Arc<LakehouseContext>,
39 reader_factory: Arc<ReaderFactory>,
40 part_provider: Arc<dyn QueryPartitionProvider>,
41 query_range: Option<TimeRange>,
42 ctx: &SessionContext,
43 view: Arc<dyn View>,
44) -> Result<()> {
45 let table = MaterializedView::new(
46 lakehouse,
47 reader_factory,
48 view.clone(),
49 part_provider,
50 query_range,
51 );
52 view.register_table(ctx, table).await
53}
54
55#[span_fn]
57pub async fn query_partitions_context(
58 runtime: Arc<RuntimeEnv>,
59 reader_factory: Arc<ReaderFactory>,
60 object_store: Arc<dyn object_store::ObjectStore>,
61 schema: SchemaRef,
62 partitions: Arc<Vec<Partition>>,
63) -> Result<SessionContext> {
64 let table = PartitionedTableProvider::new(schema, reader_factory, partitions);
65 let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
66 let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
67 ctx.register_object_store(object_store_url.as_ref(), object_store);
68 ctx.register_table(
69 TableReference::Bare {
70 table: "source".into(),
71 },
72 Arc::new(table),
73 )?;
74 register_extension_functions(&ctx);
75 Ok(ctx)
76}
77
78#[span_fn]
80pub async fn query_partitions(
81 runtime: Arc<RuntimeEnv>,
82 reader_factory: Arc<ReaderFactory>,
83 object_store: Arc<dyn object_store::ObjectStore>,
84 schema: SchemaRef,
85 partitions: Arc<Vec<Partition>>,
86 sql: &str,
87) -> Result<DataFrame> {
88 let ctx =
89 query_partitions_context(runtime, reader_factory, object_store, schema, partitions).await?;
90 Ok(ctx.sql(sql).await?)
91}
92
93#[span_fn]
95pub fn register_lakehouse_functions(
96 ctx: &SessionContext,
97 lakehouse: Arc<LakehouseContext>,
98 part_provider: Arc<dyn QueryPartitionProvider>,
99 query_range: Option<TimeRange>,
100 view_factory: Arc<ViewFactory>,
101) {
102 ctx.register_udtf(
103 "view_instance",
104 Arc::new(ViewInstanceTableFunction::new(
105 lakehouse.clone(),
106 view_factory.clone(),
107 part_provider.clone(),
108 query_range,
109 )),
110 );
111 ctx.register_udtf(
112 "list_partitions",
113 Arc::new(ListPartitionsTableFunction::new(lakehouse.lake().clone())),
114 );
115 ctx.register_udtf(
116 "list_view_sets",
117 Arc::new(ListViewSetsTableFunction::new(view_factory.clone())),
118 );
119 ctx.register_udtf(
120 "retire_partitions",
121 Arc::new(RetirePartitionsTableFunction::new(lakehouse.lake().clone())),
122 );
123 ctx.register_udtf(
124 "perfetto_trace_chunks",
125 Arc::new(PerfettoTraceTableFunction::new(
126 lakehouse.clone(),
127 view_factory.clone(),
128 part_provider.clone(),
129 )),
130 );
131 ctx.register_udtf(
132 "materialize_partitions",
133 Arc::new(MaterializePartitionsTableFunction::new(
134 lakehouse.clone(),
135 view_factory.clone(),
136 )),
137 );
138 ctx.register_udtf(
139 "parse_block",
140 Arc::new(ParseBlockTableFunction::new(
141 lakehouse.clone(),
142 view_factory.clone(),
143 part_provider.clone(),
144 query_range,
145 )),
146 );
147 ctx.register_udtf(
148 "process_spans",
149 Arc::new(ProcessSpansTableFunction::new(
150 lakehouse.clone(),
151 view_factory.clone(),
152 part_provider.clone(),
153 query_range,
154 )),
155 );
156 ctx.register_udf(
157 AsyncScalarUDF::new(Arc::new(GetPayload::new(lakehouse.lake().clone()))).into_scalar_udf(),
158 );
159 ctx.register_udf(make_retire_partition_by_file_udf(lakehouse.lake().clone()).into_scalar_udf());
160 ctx.register_udf(
161 make_retire_partition_by_metadata_udf(lakehouse.lake().clone()).into_scalar_udf(),
162 );
163}
164
165#[span_fn]
167pub fn register_extension_functions(ctx: &SessionContext) {
168 ctx.register_udf(ScalarUDF::from(PropertiesToDict::new()));
169 ctx.register_udf(ScalarUDF::from(PropertiesToJsonb::new()));
170 micromegas_datafusion_extensions::register_extension_udfs(ctx);
171}
172
173#[span_fn]
174pub fn register_functions(
175 ctx: &SessionContext,
176 lakehouse: Arc<LakehouseContext>,
177 part_provider: Arc<dyn QueryPartitionProvider>,
178 query_range: Option<TimeRange>,
179 view_factory: Arc<ViewFactory>,
180) {
181 register_lakehouse_functions(ctx, lakehouse, part_provider, query_range, view_factory);
182 register_extension_functions(ctx);
183}
184
185#[span_fn]
186pub async fn make_session_context(
187 lakehouse: Arc<LakehouseContext>,
188 part_provider: Arc<dyn QueryPartitionProvider>,
189 query_range: Option<TimeRange>,
190 view_factory: Arc<ViewFactory>,
191 configurator: Arc<dyn SessionConfigurator>,
192) -> Result<SessionContext> {
193 let config = SessionConfig::default()
197 .set_bool("datafusion.execution.parquet.enable_page_index", false)
198 .with_information_schema(true);
199 let ctx = SessionContext::new_with_config_rt(config, lakehouse.runtime().clone());
200 if let Some(range) = &query_range {
201 ctx.add_analyzer_rule(Arc::new(TableScanRewrite::new(*range)));
202 }
203 let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
204 let object_store = lakehouse.lake().blob_storage.inner();
205 ctx.register_object_store(object_store_url.as_ref(), object_store);
206 let reader_factory = lakehouse.reader_factory().clone();
207 register_functions(
208 &ctx,
209 lakehouse.clone(),
210 part_provider.clone(),
211 query_range,
212 view_factory.clone(),
213 );
214 for view in view_factory.get_global_views() {
215 register_table(
216 lakehouse.clone(),
217 reader_factory.clone(),
218 part_provider.clone(),
219 query_range,
220 &ctx,
221 view.clone(),
222 )
223 .await?;
224 }
225 configurator.configure(&ctx).await?;
227 Ok(ctx)
228}
229
230#[span_fn]
231pub async fn query(
232 lakehouse: Arc<LakehouseContext>,
233 part_provider: Arc<dyn QueryPartitionProvider>,
234 query_range: Option<TimeRange>,
235 sql: &str,
236 view_factory: Arc<ViewFactory>,
237 configurator: Arc<dyn SessionConfigurator>,
238) -> Result<Answer> {
239 info!("query sql={sql}");
240 let ctx = make_session_context(
241 lakehouse,
242 part_provider,
243 query_range,
244 view_factory,
245 configurator,
246 )
247 .await
248 .with_context(|| "make_session_context")?;
249 let df = ctx.sql(sql).await?;
250 let schema = df.schema().inner().clone();
251 let batches: Vec<RecordBatch> = df.collect().await?;
252 Ok(Answer::new(schema, batches))
253}