micromegas_analytics/lakehouse/
parse_block_table_function.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    session_configurator::NoOpSessionConfigurator, view_factory::ViewFactory,
4};
5use crate::{
6    dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
7    metadata::StreamMetadata,
8    payload::{fetch_block_payload, parse_block},
9    time::TimeRange,
10};
11use anyhow::Context;
12use async_trait::async_trait;
13use datafusion::{
14    arrow::{
15        array::{BinaryBuilder, Int64Array, Int64Builder, RecordBatch, StringBuilder},
16        datatypes::{DataType, Field, Schema, SchemaRef},
17    },
18    catalog::{Session, TableFunctionImpl, TableProvider},
19    common::plan_err,
20    datasource::{
21        TableType,
22        memory::{DataSourceExec, MemorySourceConfig},
23    },
24    error::DataFusionError,
25    physical_plan::ExecutionPlan,
26    prelude::Expr,
27};
28use jsonb::Value as JsonbValue;
29use micromegas_tracing::prelude::*;
30use micromegas_transit::{UserDefinedType, value::Value as TransitValue};
31use std::{any::Any, borrow::Cow, collections::BTreeMap, sync::Arc};
32use uuid::Uuid;
33
34use crate::dfext::expressions::exp_to_string;
35
36fn output_schema() -> SchemaRef {
37    Arc::new(Schema::new(vec![
38        Field::new("object_index", DataType::Int64, false),
39        Field::new("type_name", DataType::Utf8, false),
40        Field::new("value", DataType::Binary, false),
41    ]))
42}
43
44/// Converts a `transit::Value` to a `jsonb::Value`.
45pub fn transit_value_to_jsonb(value: &TransitValue) -> JsonbValue<'_> {
46    match value {
47        TransitValue::String(s) => JsonbValue::String(Cow::Borrowed(s.as_str())),
48        TransitValue::Object(obj) => {
49            let mut map = BTreeMap::new();
50            map.insert(
51                "__type".to_string(),
52                JsonbValue::String(Cow::Borrowed(obj.type_name.as_str())),
53            );
54            for (name, val) in &obj.members {
55                map.insert(name.as_ref().clone(), transit_value_to_jsonb(val));
56            }
57            JsonbValue::Object(map)
58        }
59        TransitValue::U8(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
60        TransitValue::U32(v) => JsonbValue::Number(jsonb::Number::UInt64(u64::from(*v))),
61        TransitValue::U64(v) => JsonbValue::Number(jsonb::Number::UInt64(*v)),
62        TransitValue::I64(v) => JsonbValue::Number(jsonb::Number::Int64(*v)),
63        TransitValue::F64(v) => JsonbValue::Number(jsonb::Number::Float64(*v)),
64        TransitValue::None => JsonbValue::Null,
65    }
66}
67
68/// Queries the global blocks view for a block's metadata and constructs a `StreamMetadata`.
69/// Returns `None` if the block is not found.
70async fn fetch_block_metadata(
71    lakehouse: Arc<LakehouseContext>,
72    part_provider: Arc<dyn QueryPartitionProvider>,
73    query_range: Option<TimeRange>,
74    view_factory: Arc<ViewFactory>,
75    block_id_str: &str,
76) -> anyhow::Result<Option<(Uuid, i64, StreamMetadata)>> {
77    let ctx = super::query::make_session_context(
78        lakehouse,
79        part_provider,
80        query_range,
81        view_factory,
82        Arc::new(NoOpSessionConfigurator),
83    )
84    .await?;
85
86    let sql = format!(
87        "SELECT block_id, stream_id, process_id, object_offset,
88                \"streams.dependencies_metadata\", \"streams.objects_metadata\"
89         FROM blocks
90         WHERE block_id = '{block_id_str}'"
91    );
92    let df = ctx.sql(&sql).await?;
93    let batches = df.collect().await?;
94
95    if batches.is_empty() || batches[0].num_rows() == 0 {
96        return Ok(None);
97    }
98
99    let batch = &batches[0];
100
101    let block_id_col = string_column_by_name(batch, "block_id")?;
102    let stream_id_col = string_column_by_name(batch, "stream_id")?;
103    let process_id_col = string_column_by_name(batch, "process_id")?;
104    let object_offset_col: &Int64Array = typed_column_by_name(batch, "object_offset")?;
105
106    let block_id = Uuid::parse_str(block_id_col.value(0)?)?;
107    let stream_id = Uuid::parse_str(stream_id_col.value(0)?)?;
108    let process_id = Uuid::parse_str(process_id_col.value(0)?)?;
109    let object_offset = object_offset_col.value(0);
110
111    let deps_col = batch
112        .column_by_name("streams.dependencies_metadata")
113        .context("streams.dependencies_metadata column not found")?;
114    let deps_binary: &datafusion::arrow::array::BinaryArray = deps_col
115        .as_any()
116        .downcast_ref()
117        .context("failed to cast dependencies_metadata to BinaryArray")?;
118    let deps_bytes = deps_binary.value(0);
119    let dependencies_metadata: Vec<UserDefinedType> =
120        ciborium::from_reader(deps_bytes).context("decoding dependencies_metadata")?;
121
122    let objs_col = batch
123        .column_by_name("streams.objects_metadata")
124        .context("streams.objects_metadata column not found")?;
125    let objs_binary: &datafusion::arrow::array::BinaryArray = objs_col
126        .as_any()
127        .downcast_ref()
128        .context("failed to cast objects_metadata to BinaryArray")?;
129    let objs_bytes = objs_binary.value(0);
130    let objects_metadata: Vec<UserDefinedType> =
131        ciborium::from_reader(objs_bytes).context("decoding objects_metadata")?;
132
133    let stream_metadata = StreamMetadata {
134        process_id,
135        stream_id,
136        dependencies_metadata,
137        objects_metadata,
138        tags: vec![],
139        properties: Arc::new(vec![]),
140    };
141
142    Ok(Some((block_id, object_offset, stream_metadata)))
143}
144
145/// Parses transit objects from a block payload and returns them as a RecordBatch.
146fn parse_block_objects(
147    stream_metadata: &StreamMetadata,
148    payload: &micromegas_telemetry::block_wire_format::BlockPayload,
149    object_offset: i64,
150    early_limit: Option<usize>,
151) -> anyhow::Result<RecordBatch> {
152    let mut index_builder = Int64Builder::new();
153    let mut name_builder = StringBuilder::new();
154    let mut value_builder = BinaryBuilder::new();
155    let mut local_index: i64 = 0;
156    let mut nb_objects: usize = 0;
157
158    parse_block(stream_metadata, payload, |value| {
159        if let TransitValue::Object(obj) = &value {
160            let jsonb_val = transit_value_to_jsonb(&value);
161            let mut buf = Vec::new();
162            jsonb_val.write_to_vec(&mut buf);
163
164            index_builder.append_value(object_offset + local_index);
165            name_builder.append_value(obj.type_name.as_ref());
166            value_builder.append_value(&buf);
167            nb_objects += 1;
168        } else {
169            warn!(
170                "parse_block: skipping non-Object value at index {}",
171                object_offset + local_index
172            );
173        }
174        local_index += 1;
175
176        if let Some(lim) = early_limit {
177            Ok(nb_objects < lim)
178        } else {
179            Ok(true)
180        }
181    })?;
182
183    Ok(RecordBatch::try_new(
184        output_schema(),
185        vec![
186            Arc::new(index_builder.finish()),
187            Arc::new(name_builder.finish()),
188            Arc::new(value_builder.finish()),
189        ],
190    )?)
191}
192
193/// A DataFusion `TableFunctionImpl` that parses a block's transit-serialized objects
194/// and returns each object as a row with its type name and full content as JSONB.
195#[derive(Debug)]
196pub struct ParseBlockTableFunction {
197    lakehouse: Arc<LakehouseContext>,
198    view_factory: Arc<ViewFactory>,
199    part_provider: Arc<dyn QueryPartitionProvider>,
200    query_range: Option<TimeRange>,
201}
202
203impl ParseBlockTableFunction {
204    pub fn new(
205        lakehouse: Arc<LakehouseContext>,
206        view_factory: Arc<ViewFactory>,
207        part_provider: Arc<dyn QueryPartitionProvider>,
208        query_range: Option<TimeRange>,
209    ) -> Self {
210        Self {
211            lakehouse,
212            view_factory,
213            part_provider,
214            query_range,
215        }
216    }
217}
218
219impl TableFunctionImpl for ParseBlockTableFunction {
220    fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
221        let arg = exprs.first().map(exp_to_string);
222        let Some(Ok(block_id)) = arg else {
223            return plan_err!(
224                "First argument to parse_block must be a string (the block ID), given {:?}",
225                arg
226            );
227        };
228        Ok(Arc::new(ParseBlockProvider {
229            block_id,
230            lakehouse: self.lakehouse.clone(),
231            view_factory: self.view_factory.clone(),
232            part_provider: self.part_provider.clone(),
233            query_range: self.query_range,
234        }))
235    }
236}
237
238#[derive(Debug)]
239struct ParseBlockProvider {
240    block_id: String,
241    lakehouse: Arc<LakehouseContext>,
242    view_factory: Arc<ViewFactory>,
243    part_provider: Arc<dyn QueryPartitionProvider>,
244    query_range: Option<TimeRange>,
245}
246
247#[async_trait]
248impl TableProvider for ParseBlockProvider {
249    fn as_any(&self) -> &dyn Any {
250        self
251    }
252
253    fn schema(&self) -> SchemaRef {
254        output_schema()
255    }
256
257    fn table_type(&self) -> TableType {
258        TableType::Temporary
259    }
260
261    async fn scan(
262        &self,
263        _state: &dyn Session,
264        projection: Option<&Vec<usize>>,
265        filters: &[Expr],
266        limit: Option<usize>,
267    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
268        let block_id_str = &self.block_id;
269
270        let Some((block_id, object_offset, stream_metadata)) = fetch_block_metadata(
271            self.lakehouse.clone(),
272            self.part_provider.clone(),
273            self.query_range,
274            self.view_factory.clone(),
275            block_id_str,
276        )
277        .await
278        .map_err(|e| DataFusionError::External(e.into()))?
279        else {
280            let source = MemorySourceConfig::try_new(
281                &[vec![]],
282                self.schema(),
283                projection.map(|v| v.to_owned()),
284            )?;
285            return Ok(DataSourceExec::from_data_source(source));
286        };
287
288        // Fetch and parse the block payload
289        let blob_storage = self.lakehouse.lake().blob_storage.clone();
290        let payload = fetch_block_payload(
291            blob_storage,
292            sqlx::types::Uuid::from_bytes(*stream_metadata.process_id.as_bytes()),
293            sqlx::types::Uuid::from_bytes(*stream_metadata.stream_id.as_bytes()),
294            sqlx::types::Uuid::from_bytes(*block_id.as_bytes()),
295        )
296        .await
297        .map_err(|e| DataFusionError::External(e.into()))?;
298
299        // Parse transit objects and convert to JSONB
300        let early_limit = if filters.is_empty() { limit } else { None };
301        let rb = parse_block_objects(&stream_metadata, &payload, object_offset, early_limit)
302            .with_context(|| format!("parsing block {block_id_str}"))
303            .map_err(|e| DataFusionError::External(e.into()))?;
304
305        let source = MemorySourceConfig::try_new(
306            &[vec![rb]],
307            self.schema(),
308            projection.map(|v| v.to_owned()),
309        )?;
310        Ok(DataSourceExec::from_data_source(source))
311    }
312}