micromegas_analytics/lakehouse/
parse_block_table_function.rs1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 session_configurator::NoOpSessionConfigurator, view_factory::ViewFactory,
4};
5use crate::{
6 dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
7 metadata::StreamMetadata,
8 payload::{fetch_block_payload, parse_block},
9 time::TimeRange,
10};
11use anyhow::Context;
12use async_trait::async_trait;
13use datafusion::{
14 arrow::{
15 array::{BinaryBuilder, Int64Array, Int64Builder, RecordBatch, StringBuilder},
16 datatypes::{DataType, Field, Schema, SchemaRef},
17 },
18 catalog::{Session, TableFunctionImpl, TableProvider},
19 common::plan_err,
20 datasource::{
21 TableType,
22 memory::{DataSourceExec, MemorySourceConfig},
23 },
24 error::DataFusionError,
25 physical_plan::ExecutionPlan,
26 prelude::Expr,
27};
28use jsonb::Value as JsonbValue;
29use micromegas_ingestion::web_ingestion_service::FORMAT_TRANSIT;
30use micromegas_tracing::prelude::*;
31use micromegas_transit::{UserDefinedType, value::Value as TransitValue};
32use std::{any::Any, borrow::Cow, collections::BTreeMap, sync::Arc};
33use uuid::Uuid;
34
35use crate::dfext::expressions::exp_to_string;
36
37fn output_schema() -> SchemaRef {
38 Arc::new(Schema::new(vec![
39 Field::new("object_index", DataType::Int64, false),
40 Field::new("type_name", DataType::Utf8, false),
41 Field::new("value", DataType::Binary, false),
42 ]))
43}
44
45pub fn transit_value_to_jsonb(value: &TransitValue) -> JsonbValue<'_> {
47 match value {
48 TransitValue::String(s) => JsonbValue::String(Cow::Borrowed(s.as_str())),
49 TransitValue::Object(obj) => {
50 let mut map = BTreeMap::new();
51 map.insert(
52 "__type".to_string(),
53 JsonbValue::String(Cow::Borrowed(obj.type_name.as_str())),
54 );
55 for (name, val) in &obj.members {
56 map.insert(name.as_ref().clone(), transit_value_to_jsonb(val));
57 }
58 JsonbValue::Object(map)
59 }
60 TransitValue::U8(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
61 TransitValue::U32(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
62 TransitValue::U64(v) => JsonbValue::Number(jsonb::Number::UInt64(*v)),
63 TransitValue::I64(v) => JsonbValue::Number(jsonb::Number::Int64(*v)),
64 TransitValue::F64(v) => JsonbValue::Number(jsonb::Number::Float64(*v)),
65 TransitValue::None => JsonbValue::Null,
66 }
67}
68
69async fn fetch_block_metadata(
72 lakehouse: Arc<LakehouseContext>,
73 part_provider: Arc<dyn QueryPartitionProvider>,
74 query_range: Option<TimeRange>,
75 view_factory: Arc<ViewFactory>,
76 block_id_str: &str,
77) -> anyhow::Result<Option<(Uuid, i64, StreamMetadata)>> {
78 let ctx = super::query::make_session_context(
79 lakehouse,
80 part_provider,
81 query_range,
82 view_factory,
83 Arc::new(NoOpSessionConfigurator),
84 )
85 .await?;
86
87 let sql = format!(
88 "SELECT block_id, stream_id, process_id, object_offset,
89 \"streams.dependencies_metadata\", \"streams.objects_metadata\", \"streams.format\"
90 FROM blocks
91 WHERE block_id = '{block_id_str}'"
92 );
93 let df = ctx.sql(&sql).await?;
94 let batches = df.collect().await?;
95
96 if batches.is_empty() || batches[0].num_rows() == 0 {
97 return Ok(None);
98 }
99
100 let batch = &batches[0];
101
102 let block_id_col = string_column_by_name(batch, "block_id")?;
103 let stream_id_col = string_column_by_name(batch, "stream_id")?;
104 let process_id_col = string_column_by_name(batch, "process_id")?;
105 let object_offset_col: &Int64Array = typed_column_by_name(batch, "object_offset")?;
106 let format_col = string_column_by_name(batch, "streams.format")?;
107 let format = format_col.value(0)?;
108 if format != FORMAT_TRANSIT {
109 anyhow::bail!(
110 "parse_block does not support format={format} (only {FORMAT_TRANSIT}). \
111 Query `log_entries`/`measures`/`otel_spans` instead for OTel data."
112 );
113 }
114
115 let block_id = Uuid::parse_str(block_id_col.value(0)?)?;
116 let stream_id = Uuid::parse_str(stream_id_col.value(0)?)?;
117 let process_id = Uuid::parse_str(process_id_col.value(0)?)?;
118 let object_offset = object_offset_col.value(0);
119
120 let deps_col = batch
121 .column_by_name("streams.dependencies_metadata")
122 .context("streams.dependencies_metadata column not found")?;
123 let deps_binary: &datafusion::arrow::array::BinaryArray = deps_col
124 .as_any()
125 .downcast_ref()
126 .context("failed to cast dependencies_metadata to BinaryArray")?;
127 let deps_bytes = deps_binary.value(0);
128 let dependencies_metadata: Vec<UserDefinedType> =
129 ciborium::from_reader(deps_bytes).context("decoding dependencies_metadata")?;
130
131 let objs_col = batch
132 .column_by_name("streams.objects_metadata")
133 .context("streams.objects_metadata column not found")?;
134 let objs_binary: &datafusion::arrow::array::BinaryArray = objs_col
135 .as_any()
136 .downcast_ref()
137 .context("failed to cast objects_metadata to BinaryArray")?;
138 let objs_bytes = objs_binary.value(0);
139 let objects_metadata: Vec<UserDefinedType> =
140 ciborium::from_reader(objs_bytes).context("decoding objects_metadata")?;
141
142 let stream_metadata = StreamMetadata {
143 process_id,
144 stream_id,
145 dependencies_metadata,
146 objects_metadata,
147 tags: vec![],
148 properties: Arc::new(vec![]),
149 };
150
151 Ok(Some((block_id, object_offset, stream_metadata)))
152}
153
154fn parse_block_objects(
156 stream_metadata: &StreamMetadata,
157 payload: µmegas_telemetry::block_wire_format::BlockPayload,
158 object_offset: i64,
159 early_limit: Option<usize>,
160) -> anyhow::Result<RecordBatch> {
161 let mut index_builder = Int64Builder::new();
162 let mut name_builder = StringBuilder::new();
163 let mut value_builder = BinaryBuilder::new();
164 let mut local_index: i64 = 0;
165 let mut nb_objects: usize = 0;
166
167 parse_block(stream_metadata, payload, |value| {
168 if let TransitValue::Object(obj) = &value {
169 let jsonb_val = transit_value_to_jsonb(&value);
170 let mut buf = Vec::new();
171 jsonb_val.write_to_vec(&mut buf);
172
173 index_builder.append_value(object_offset + local_index);
174 name_builder.append_value(obj.type_name.as_ref());
175 value_builder.append_value(&buf);
176 nb_objects += 1;
177 } else {
178 warn!(
179 "parse_block: skipping non-Object value at index {}",
180 object_offset + local_index
181 );
182 }
183 local_index += 1;
184
185 if let Some(lim) = early_limit {
186 Ok(nb_objects < lim)
187 } else {
188 Ok(true)
189 }
190 })?;
191
192 Ok(RecordBatch::try_new(
193 output_schema(),
194 vec![
195 Arc::new(index_builder.finish()),
196 Arc::new(name_builder.finish()),
197 Arc::new(value_builder.finish()),
198 ],
199 )?)
200}
201
202#[derive(Debug)]
205pub struct ParseBlockTableFunction {
206 lakehouse: Arc<LakehouseContext>,
207 view_factory: Arc<ViewFactory>,
208 part_provider: Arc<dyn QueryPartitionProvider>,
209 query_range: Option<TimeRange>,
210}
211
212impl ParseBlockTableFunction {
213 pub fn new(
214 lakehouse: Arc<LakehouseContext>,
215 view_factory: Arc<ViewFactory>,
216 part_provider: Arc<dyn QueryPartitionProvider>,
217 query_range: Option<TimeRange>,
218 ) -> Self {
219 Self {
220 lakehouse,
221 view_factory,
222 part_provider,
223 query_range,
224 }
225 }
226}
227
228impl TableFunctionImpl for ParseBlockTableFunction {
229 fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
230 let arg = exprs.first().map(exp_to_string);
231 let Some(Ok(block_id)) = arg else {
232 return plan_err!(
233 "First argument to parse_block must be a string (the block ID), given {:?}",
234 arg
235 );
236 };
237 Ok(Arc::new(ParseBlockProvider {
238 block_id,
239 lakehouse: self.lakehouse.clone(),
240 view_factory: self.view_factory.clone(),
241 part_provider: self.part_provider.clone(),
242 query_range: self.query_range,
243 }))
244 }
245}
246
247#[derive(Debug)]
248struct ParseBlockProvider {
249 block_id: String,
250 lakehouse: Arc<LakehouseContext>,
251 view_factory: Arc<ViewFactory>,
252 part_provider: Arc<dyn QueryPartitionProvider>,
253 query_range: Option<TimeRange>,
254}
255
256#[async_trait]
257impl TableProvider for ParseBlockProvider {
258 fn as_any(&self) -> &dyn Any {
259 self
260 }
261
262 fn schema(&self) -> SchemaRef {
263 output_schema()
264 }
265
266 fn table_type(&self) -> TableType {
267 TableType::Temporary
268 }
269
270 async fn scan(
271 &self,
272 _state: &dyn Session,
273 projection: Option<&Vec<usize>>,
274 filters: &[Expr],
275 limit: Option<usize>,
276 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
277 let block_id_str = &self.block_id;
278
279 let Some((block_id, object_offset, stream_metadata)) = fetch_block_metadata(
280 self.lakehouse.clone(),
281 self.part_provider.clone(),
282 self.query_range,
283 self.view_factory.clone(),
284 block_id_str,
285 )
286 .await
287 .map_err(|e| DataFusionError::External(e.into()))?
288 else {
289 let source = MemorySourceConfig::try_new(
290 &[vec![]],
291 self.schema(),
292 projection.map(|v| v.to_owned()),
293 )?;
294 return Ok(DataSourceExec::from_data_source(source));
295 };
296
297 let blob_storage = self.lakehouse.lake().blob_storage.clone();
299 let payload = fetch_block_payload(
300 blob_storage,
301 sqlx::types::Uuid::from_bytes(*stream_metadata.process_id.as_bytes()),
302 sqlx::types::Uuid::from_bytes(*stream_metadata.stream_id.as_bytes()),
303 sqlx::types::Uuid::from_bytes(*block_id.as_bytes()),
304 )
305 .await
306 .map_err(|e| DataFusionError::External(e.into()))?;
307
308 let early_limit = if filters.is_empty() { limit } else { None };
310 let rb = parse_block_objects(&stream_metadata, &payload, object_offset, early_limit)
311 .with_context(|| format!("parsing block {block_id_str}"))
312 .map_err(|e| DataFusionError::External(e.into()))?;
313
314 let source = MemorySourceConfig::try_new(
315 &[vec![rb]],
316 self.schema(),
317 projection.map(|v| v.to_owned()),
318 )?;
319 Ok(DataSourceExec::from_data_source(source))
320 }
321}