micromegas_analytics/lakehouse/
parse_block_table_function.rs1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 session_configurator::NoOpSessionConfigurator, view_factory::ViewFactory,
4};
5use crate::{
6 dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
7 metadata::StreamMetadata,
8 payload::{fetch_block_payload, parse_block},
9 time::TimeRange,
10};
11use anyhow::Context;
12use async_trait::async_trait;
13use datafusion::{
14 arrow::{
15 array::{BinaryBuilder, Int64Array, Int64Builder, RecordBatch, StringBuilder},
16 datatypes::{DataType, Field, Schema, SchemaRef},
17 },
18 catalog::{Session, TableFunctionImpl, TableProvider},
19 common::plan_err,
20 datasource::{
21 TableType,
22 memory::{DataSourceExec, MemorySourceConfig},
23 },
24 error::DataFusionError,
25 physical_plan::ExecutionPlan,
26 prelude::Expr,
27};
28use jsonb::Value as JsonbValue;
29use micromegas_tracing::prelude::*;
30use micromegas_transit::{UserDefinedType, value::Value as TransitValue};
31use std::{any::Any, borrow::Cow, collections::BTreeMap, sync::Arc};
32use uuid::Uuid;
33
34use crate::dfext::expressions::exp_to_string;
35
36fn output_schema() -> SchemaRef {
37 Arc::new(Schema::new(vec![
38 Field::new("object_index", DataType::Int64, false),
39 Field::new("type_name", DataType::Utf8, false),
40 Field::new("value", DataType::Binary, false),
41 ]))
42}
43
44pub fn transit_value_to_jsonb(value: &TransitValue) -> JsonbValue<'_> {
46 match value {
47 TransitValue::String(s) => JsonbValue::String(Cow::Borrowed(s.as_str())),
48 TransitValue::Object(obj) => {
49 let mut map = BTreeMap::new();
50 map.insert(
51 "__type".to_string(),
52 JsonbValue::String(Cow::Borrowed(obj.type_name.as_str())),
53 );
54 for (name, val) in &obj.members {
55 map.insert(name.as_ref().clone(), transit_value_to_jsonb(val));
56 }
57 JsonbValue::Object(map)
58 }
59 TransitValue::U8(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
60 TransitValue::U32(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
61 TransitValue::U64(v) => JsonbValue::Number(jsonb::Number::UInt64(*v)),
62 TransitValue::I64(v) => JsonbValue::Number(jsonb::Number::Int64(*v)),
63 TransitValue::F64(v) => JsonbValue::Number(jsonb::Number::Float64(*v)),
64 TransitValue::None => JsonbValue::Null,
65 }
66}
67
68async fn fetch_block_metadata(
71 lakehouse: Arc<LakehouseContext>,
72 part_provider: Arc<dyn QueryPartitionProvider>,
73 query_range: Option<TimeRange>,
74 view_factory: Arc<ViewFactory>,
75 block_id_str: &str,
76) -> anyhow::Result<Option<(Uuid, i64, StreamMetadata)>> {
77 let ctx = super::query::make_session_context(
78 lakehouse,
79 part_provider,
80 query_range,
81 view_factory,
82 Arc::new(NoOpSessionConfigurator),
83 )
84 .await?;
85
86 let sql = format!(
87 "SELECT block_id, stream_id, process_id, object_offset,
88 \"streams.dependencies_metadata\", \"streams.objects_metadata\"
89 FROM blocks
90 WHERE block_id = '{block_id_str}'"
91 );
92 let df = ctx.sql(&sql).await?;
93 let batches = df.collect().await?;
94
95 if batches.is_empty() || batches[0].num_rows() == 0 {
96 return Ok(None);
97 }
98
99 let batch = &batches[0];
100
101 let block_id_col = string_column_by_name(batch, "block_id")?;
102 let stream_id_col = string_column_by_name(batch, "stream_id")?;
103 let process_id_col = string_column_by_name(batch, "process_id")?;
104 let object_offset_col: &Int64Array = typed_column_by_name(batch, "object_offset")?;
105
106 let block_id = Uuid::parse_str(block_id_col.value(0)?)?;
107 let stream_id = Uuid::parse_str(stream_id_col.value(0)?)?;
108 let process_id = Uuid::parse_str(process_id_col.value(0)?)?;
109 let object_offset = object_offset_col.value(0);
110
111 let deps_col = batch
112 .column_by_name("streams.dependencies_metadata")
113 .context("streams.dependencies_metadata column not found")?;
114 let deps_binary: &datafusion::arrow::array::BinaryArray = deps_col
115 .as_any()
116 .downcast_ref()
117 .context("failed to cast dependencies_metadata to BinaryArray")?;
118 let deps_bytes = deps_binary.value(0);
119 let dependencies_metadata: Vec<UserDefinedType> =
120 ciborium::from_reader(deps_bytes).context("decoding dependencies_metadata")?;
121
122 let objs_col = batch
123 .column_by_name("streams.objects_metadata")
124 .context("streams.objects_metadata column not found")?;
125 let objs_binary: &datafusion::arrow::array::BinaryArray = objs_col
126 .as_any()
127 .downcast_ref()
128 .context("failed to cast objects_metadata to BinaryArray")?;
129 let objs_bytes = objs_binary.value(0);
130 let objects_metadata: Vec<UserDefinedType> =
131 ciborium::from_reader(objs_bytes).context("decoding objects_metadata")?;
132
133 let stream_metadata = StreamMetadata {
134 process_id,
135 stream_id,
136 dependencies_metadata,
137 objects_metadata,
138 tags: vec![],
139 properties: Arc::new(vec![]),
140 };
141
142 Ok(Some((block_id, object_offset, stream_metadata)))
143}
144
145fn parse_block_objects(
147 stream_metadata: &StreamMetadata,
148 payload: µmegas_telemetry::block_wire_format::BlockPayload,
149 object_offset: i64,
150 early_limit: Option<usize>,
151) -> anyhow::Result<RecordBatch> {
152 let mut index_builder = Int64Builder::new();
153 let mut name_builder = StringBuilder::new();
154 let mut value_builder = BinaryBuilder::new();
155 let mut local_index: i64 = 0;
156 let mut nb_objects: usize = 0;
157
158 parse_block(stream_metadata, payload, |value| {
159 if let TransitValue::Object(obj) = &value {
160 let jsonb_val = transit_value_to_jsonb(&value);
161 let mut buf = Vec::new();
162 jsonb_val.write_to_vec(&mut buf);
163
164 index_builder.append_value(object_offset + local_index);
165 name_builder.append_value(obj.type_name.as_ref());
166 value_builder.append_value(&buf);
167 nb_objects += 1;
168 } else {
169 warn!(
170 "parse_block: skipping non-Object value at index {}",
171 object_offset + local_index
172 );
173 }
174 local_index += 1;
175
176 if let Some(lim) = early_limit {
177 Ok(nb_objects < lim)
178 } else {
179 Ok(true)
180 }
181 })?;
182
183 Ok(RecordBatch::try_new(
184 output_schema(),
185 vec![
186 Arc::new(index_builder.finish()),
187 Arc::new(name_builder.finish()),
188 Arc::new(value_builder.finish()),
189 ],
190 )?)
191}
192
193#[derive(Debug)]
196pub struct ParseBlockTableFunction {
197 lakehouse: Arc<LakehouseContext>,
198 view_factory: Arc<ViewFactory>,
199 part_provider: Arc<dyn QueryPartitionProvider>,
200 query_range: Option<TimeRange>,
201}
202
203impl ParseBlockTableFunction {
204 pub fn new(
205 lakehouse: Arc<LakehouseContext>,
206 view_factory: Arc<ViewFactory>,
207 part_provider: Arc<dyn QueryPartitionProvider>,
208 query_range: Option<TimeRange>,
209 ) -> Self {
210 Self {
211 lakehouse,
212 view_factory,
213 part_provider,
214 query_range,
215 }
216 }
217}
218
219impl TableFunctionImpl for ParseBlockTableFunction {
220 fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
221 let arg = exprs.first().map(exp_to_string);
222 let Some(Ok(block_id)) = arg else {
223 return plan_err!(
224 "First argument to parse_block must be a string (the block ID), given {:?}",
225 arg
226 );
227 };
228 Ok(Arc::new(ParseBlockProvider {
229 block_id,
230 lakehouse: self.lakehouse.clone(),
231 view_factory: self.view_factory.clone(),
232 part_provider: self.part_provider.clone(),
233 query_range: self.query_range,
234 }))
235 }
236}
237
238#[derive(Debug)]
239struct ParseBlockProvider {
240 block_id: String,
241 lakehouse: Arc<LakehouseContext>,
242 view_factory: Arc<ViewFactory>,
243 part_provider: Arc<dyn QueryPartitionProvider>,
244 query_range: Option<TimeRange>,
245}
246
247#[async_trait]
248impl TableProvider for ParseBlockProvider {
249 fn as_any(&self) -> &dyn Any {
250 self
251 }
252
253 fn schema(&self) -> SchemaRef {
254 output_schema()
255 }
256
257 fn table_type(&self) -> TableType {
258 TableType::Temporary
259 }
260
261 async fn scan(
262 &self,
263 _state: &dyn Session,
264 projection: Option<&Vec<usize>>,
265 filters: &[Expr],
266 limit: Option<usize>,
267 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
268 let block_id_str = &self.block_id;
269
270 let Some((block_id, object_offset, stream_metadata)) = fetch_block_metadata(
271 self.lakehouse.clone(),
272 self.part_provider.clone(),
273 self.query_range,
274 self.view_factory.clone(),
275 block_id_str,
276 )
277 .await
278 .map_err(|e| DataFusionError::External(e.into()))?
279 else {
280 let source = MemorySourceConfig::try_new(
281 &[vec![]],
282 self.schema(),
283 projection.map(|v| v.to_owned()),
284 )?;
285 return Ok(DataSourceExec::from_data_source(source));
286 };
287
288 let blob_storage = self.lakehouse.lake().blob_storage.clone();
290 let payload = fetch_block_payload(
291 blob_storage,
292 sqlx::types::Uuid::from_bytes(*stream_metadata.process_id.as_bytes()),
293 sqlx::types::Uuid::from_bytes(*stream_metadata.stream_id.as_bytes()),
294 sqlx::types::Uuid::from_bytes(*block_id.as_bytes()),
295 )
296 .await
297 .map_err(|e| DataFusionError::External(e.into()))?;
298
299 let early_limit = if filters.is_empty() { limit } else { None };
301 let rb = parse_block_objects(&stream_metadata, &payload, object_offset, early_limit)
302 .with_context(|| format!("parsing block {block_id_str}"))
303 .map_err(|e| DataFusionError::External(e.into()))?;
304
305 let source = MemorySourceConfig::try_new(
306 &[vec![rb]],
307 self.schema(),
308 projection.map(|v| v.to_owned()),
309 )?;
310 Ok(DataSourceExec::from_data_source(source))
311 }
312}