micromegas_analytics/lakehouse/
parse_block_table_function.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    session_configurator::NoOpSessionConfigurator, view_factory::ViewFactory,
4};
5use crate::{
6    dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
7    metadata::StreamMetadata,
8    payload::{fetch_block_payload, parse_block},
9    time::TimeRange,
10};
11use anyhow::Context;
12use async_trait::async_trait;
13use datafusion::{
14    arrow::{
15        array::{BinaryBuilder, Int64Array, Int64Builder, RecordBatch, StringBuilder},
16        datatypes::{DataType, Field, Schema, SchemaRef},
17    },
18    catalog::{Session, TableFunctionImpl, TableProvider},
19    common::plan_err,
20    datasource::{
21        TableType,
22        memory::{DataSourceExec, MemorySourceConfig},
23    },
24    error::DataFusionError,
25    physical_plan::ExecutionPlan,
26    prelude::Expr,
27};
28use jsonb::Value as JsonbValue;
29use micromegas_ingestion::web_ingestion_service::FORMAT_TRANSIT;
30use micromegas_tracing::prelude::*;
31use micromegas_transit::{UserDefinedType, value::Value as TransitValue};
32use std::{any::Any, borrow::Cow, collections::BTreeMap, sync::Arc};
33use uuid::Uuid;
34
35use crate::dfext::expressions::exp_to_string;
36
37fn output_schema() -> SchemaRef {
38    Arc::new(Schema::new(vec![
39        Field::new("object_index", DataType::Int64, false),
40        Field::new("type_name", DataType::Utf8, false),
41        Field::new("value", DataType::Binary, false),
42    ]))
43}
44
45/// Converts a `transit::Value` to a `jsonb::Value`.
46pub fn transit_value_to_jsonb(value: &TransitValue) -> JsonbValue<'_> {
47    match value {
48        TransitValue::String(s) => JsonbValue::String(Cow::Borrowed(s.as_str())),
49        TransitValue::Object(obj) => {
50            let mut map = BTreeMap::new();
51            map.insert(
52                "__type".to_string(),
53                JsonbValue::String(Cow::Borrowed(obj.type_name.as_str())),
54            );
55            for (name, val) in &obj.members {
56                map.insert(name.as_ref().clone(), transit_value_to_jsonb(val));
57            }
58            JsonbValue::Object(map)
59        }
60        TransitValue::U8(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
61        TransitValue::U32(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
62        TransitValue::U64(v) => JsonbValue::Number(jsonb::Number::UInt64(*v)),
63        TransitValue::I64(v) => JsonbValue::Number(jsonb::Number::Int64(*v)),
64        TransitValue::F64(v) => JsonbValue::Number(jsonb::Number::Float64(*v)),
65        TransitValue::None => JsonbValue::Null,
66    }
67}
68
69/// Queries the global blocks view for a block's metadata and constructs a `StreamMetadata`.
70/// Returns `None` if the block is not found.
71async fn fetch_block_metadata(
72    lakehouse: Arc<LakehouseContext>,
73    part_provider: Arc<dyn QueryPartitionProvider>,
74    query_range: Option<TimeRange>,
75    view_factory: Arc<ViewFactory>,
76    block_id_str: &str,
77) -> anyhow::Result<Option<(Uuid, i64, StreamMetadata)>> {
78    let ctx = super::query::make_session_context(
79        lakehouse,
80        part_provider,
81        query_range,
82        view_factory,
83        Arc::new(NoOpSessionConfigurator),
84    )
85    .await?;
86
87    let sql = format!(
88        "SELECT block_id, stream_id, process_id, object_offset,
89                \"streams.dependencies_metadata\", \"streams.objects_metadata\", \"streams.format\"
90         FROM blocks
91         WHERE block_id = '{block_id_str}'"
92    );
93    let df = ctx.sql(&sql).await?;
94    let batches = df.collect().await?;
95
96    if batches.is_empty() || batches[0].num_rows() == 0 {
97        return Ok(None);
98    }
99
100    let batch = &batches[0];
101
102    let block_id_col = string_column_by_name(batch, "block_id")?;
103    let stream_id_col = string_column_by_name(batch, "stream_id")?;
104    let process_id_col = string_column_by_name(batch, "process_id")?;
105    let object_offset_col: &Int64Array = typed_column_by_name(batch, "object_offset")?;
106    let format_col = string_column_by_name(batch, "streams.format")?;
107    let format = format_col.value(0)?;
108    if format != FORMAT_TRANSIT {
109        anyhow::bail!(
110            "parse_block does not support format={format} (only {FORMAT_TRANSIT}). \
111             Query `log_entries`/`measures`/`otel_spans` instead for OTel data."
112        );
113    }
114
115    let block_id = Uuid::parse_str(block_id_col.value(0)?)?;
116    let stream_id = Uuid::parse_str(stream_id_col.value(0)?)?;
117    let process_id = Uuid::parse_str(process_id_col.value(0)?)?;
118    let object_offset = object_offset_col.value(0);
119
120    let deps_col = batch
121        .column_by_name("streams.dependencies_metadata")
122        .context("streams.dependencies_metadata column not found")?;
123    let deps_binary: &datafusion::arrow::array::BinaryArray = deps_col
124        .as_any()
125        .downcast_ref()
126        .context("failed to cast dependencies_metadata to BinaryArray")?;
127    let deps_bytes = deps_binary.value(0);
128    let dependencies_metadata: Vec<UserDefinedType> =
129        ciborium::from_reader(deps_bytes).context("decoding dependencies_metadata")?;
130
131    let objs_col = batch
132        .column_by_name("streams.objects_metadata")
133        .context("streams.objects_metadata column not found")?;
134    let objs_binary: &datafusion::arrow::array::BinaryArray = objs_col
135        .as_any()
136        .downcast_ref()
137        .context("failed to cast objects_metadata to BinaryArray")?;
138    let objs_bytes = objs_binary.value(0);
139    let objects_metadata: Vec<UserDefinedType> =
140        ciborium::from_reader(objs_bytes).context("decoding objects_metadata")?;
141
142    let stream_metadata = StreamMetadata {
143        process_id,
144        stream_id,
145        dependencies_metadata,
146        objects_metadata,
147        tags: vec![],
148        properties: Arc::new(vec![]),
149    };
150
151    Ok(Some((block_id, object_offset, stream_metadata)))
152}
153
154/// Parses transit objects from a block payload and returns them as a RecordBatch.
155fn parse_block_objects(
156    stream_metadata: &StreamMetadata,
157    payload: &micromegas_telemetry::block_wire_format::BlockPayload,
158    object_offset: i64,
159    early_limit: Option<usize>,
160) -> anyhow::Result<RecordBatch> {
161    let mut index_builder = Int64Builder::new();
162    let mut name_builder = StringBuilder::new();
163    let mut value_builder = BinaryBuilder::new();
164    let mut local_index: i64 = 0;
165    let mut nb_objects: usize = 0;
166
167    parse_block(stream_metadata, payload, |value| {
168        if let TransitValue::Object(obj) = &value {
169            let jsonb_val = transit_value_to_jsonb(&value);
170            let mut buf = Vec::new();
171            jsonb_val.write_to_vec(&mut buf);
172
173            index_builder.append_value(object_offset + local_index);
174            name_builder.append_value(obj.type_name.as_ref());
175            value_builder.append_value(&buf);
176            nb_objects += 1;
177        } else {
178            warn!(
179                "parse_block: skipping non-Object value at index {}",
180                object_offset + local_index
181            );
182        }
183        local_index += 1;
184
185        if let Some(lim) = early_limit {
186            Ok(nb_objects < lim)
187        } else {
188            Ok(true)
189        }
190    })?;
191
192    Ok(RecordBatch::try_new(
193        output_schema(),
194        vec![
195            Arc::new(index_builder.finish()),
196            Arc::new(name_builder.finish()),
197            Arc::new(value_builder.finish()),
198        ],
199    )?)
200}
201
202/// A DataFusion `TableFunctionImpl` that parses a block's transit-serialized objects
203/// and returns each object as a row with its type name and full content as JSONB.
204#[derive(Debug)]
205pub struct ParseBlockTableFunction {
206    lakehouse: Arc<LakehouseContext>,
207    view_factory: Arc<ViewFactory>,
208    part_provider: Arc<dyn QueryPartitionProvider>,
209    query_range: Option<TimeRange>,
210}
211
212impl ParseBlockTableFunction {
213    pub fn new(
214        lakehouse: Arc<LakehouseContext>,
215        view_factory: Arc<ViewFactory>,
216        part_provider: Arc<dyn QueryPartitionProvider>,
217        query_range: Option<TimeRange>,
218    ) -> Self {
219        Self {
220            lakehouse,
221            view_factory,
222            part_provider,
223            query_range,
224        }
225    }
226}
227
228impl TableFunctionImpl for ParseBlockTableFunction {
229    fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
230        let arg = exprs.first().map(exp_to_string);
231        let Some(Ok(block_id)) = arg else {
232            return plan_err!(
233                "First argument to parse_block must be a string (the block ID), given {:?}",
234                arg
235            );
236        };
237        Ok(Arc::new(ParseBlockProvider {
238            block_id,
239            lakehouse: self.lakehouse.clone(),
240            view_factory: self.view_factory.clone(),
241            part_provider: self.part_provider.clone(),
242            query_range: self.query_range,
243        }))
244    }
245}
246
247#[derive(Debug)]
248struct ParseBlockProvider {
249    block_id: String,
250    lakehouse: Arc<LakehouseContext>,
251    view_factory: Arc<ViewFactory>,
252    part_provider: Arc<dyn QueryPartitionProvider>,
253    query_range: Option<TimeRange>,
254}
255
256#[async_trait]
257impl TableProvider for ParseBlockProvider {
258    fn as_any(&self) -> &dyn Any {
259        self
260    }
261
262    fn schema(&self) -> SchemaRef {
263        output_schema()
264    }
265
266    fn table_type(&self) -> TableType {
267        TableType::Temporary
268    }
269
270    async fn scan(
271        &self,
272        _state: &dyn Session,
273        projection: Option<&Vec<usize>>,
274        filters: &[Expr],
275        limit: Option<usize>,
276    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
277        let block_id_str = &self.block_id;
278
279        let Some((block_id, object_offset, stream_metadata)) = fetch_block_metadata(
280            self.lakehouse.clone(),
281            self.part_provider.clone(),
282            self.query_range,
283            self.view_factory.clone(),
284            block_id_str,
285        )
286        .await
287        .map_err(|e| DataFusionError::External(e.into()))?
288        else {
289            let source = MemorySourceConfig::try_new(
290                &[vec![]],
291                self.schema(),
292                projection.map(|v| v.to_owned()),
293            )?;
294            return Ok(DataSourceExec::from_data_source(source));
295        };
296
297        // Fetch and parse the block payload
298        let blob_storage = self.lakehouse.lake().blob_storage.clone();
299        let payload = fetch_block_payload(
300            blob_storage,
301            sqlx::types::Uuid::from_bytes(*stream_metadata.process_id.as_bytes()),
302            sqlx::types::Uuid::from_bytes(*stream_metadata.stream_id.as_bytes()),
303            sqlx::types::Uuid::from_bytes(*block_id.as_bytes()),
304        )
305        .await
306        .map_err(|e| DataFusionError::External(e.into()))?;
307
308        // Parse transit objects and convert to JSONB
309        let early_limit = if filters.is_empty() { limit } else { None };
310        let rb = parse_block_objects(&stream_metadata, &payload, object_offset, early_limit)
311            .with_context(|| format!("parsing block {block_id_str}"))
312            .map_err(|e| DataFusionError::External(e.into()))?;
313
314        let source = MemorySourceConfig::try_new(
315            &[vec![rb]],
316            self.schema(),
317            projection.map(|v| v.to_owned()),
318        )?;
319        Ok(DataSourceExec::from_data_source(source))
320    }
321}