micromegas_analytics/lakehouse/otel/
spans_block_processor.rs

1//! `BlockProcessor` for OTLP `ResourceSpans` payloads โ†’ `otel_spans` rows.
2//!
3//! Materializes one row per span. `events` and `links` go in plain `Binary` columns
4//! carrying JSONB bytes โ€” see plan ยง"Span events and links as `List<Struct>` vs JSONB"
5//! for the rationale.
6
7use super::attrs::{any_value_to_jsonb, attrs_to_jsonb, scope_extras, to_jsonb_bytes};
8use crate::lakehouse::{
9    block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10    write_partition::PartitionRowSet,
11};
12use crate::metadata::ProcessMetadata;
13use crate::payload::fetch_block_payload;
14use crate::time::TimeRange;
15use anyhow::{Context, Result};
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::arrow::array::{
19    BinaryBuilder, BinaryDictionaryBuilder, FixedSizeBinaryBuilder, PrimitiveBuilder,
20    StringBuilder, StringDictionaryBuilder,
21};
22use datafusion::arrow::datatypes::{Int32Type, Int64Type, TimestampNanosecondType};
23use datafusion::arrow::record_batch::RecordBatch;
24use jsonb::Value as JsonbValue;
25use micromegas_telemetry::blob_storage::BlobStorage;
26use micromegas_tracing::prelude::*;
27use opentelemetry_proto::tonic::common::v1::InstrumentationScope;
28use opentelemetry_proto::tonic::trace::v1::{
29    ResourceSpans, Span, span as span_proto, status::StatusCode as ProtoStatusCode,
30};
31use prost::Message;
32use std::borrow::Cow;
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36#[derive(Debug)]
37pub struct OtelSpansBlockProcessor {}
38
39#[async_trait]
40impl BlockProcessor for OtelSpansBlockProcessor {
41    #[span_fn]
42    async fn process(
43        &self,
44        blob_storage: Arc<BlobStorage>,
45        src_block: Arc<PartitionSourceBlock>,
46    ) -> Result<Option<PartitionRowSet>> {
47        let payload = fetch_block_payload(
48            blob_storage,
49            sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
50            sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
51            sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
52        )
53        .await
54        .with_context(|| "fetch_block_payload")?;
55
56        let resource_spans = ResourceSpans::decode(payload.objects.as_slice())
57            .with_context(|| "decoding ResourceSpans proto")?;
58
59        let insert_time_nanos = src_block
60            .block
61            .insert_time
62            .timestamp_nanos_opt()
63            .with_context(|| "block.insert_time โ†’ nanos")?;
64        let mut builder = OtelSpansRowBuilder::new(
65            src_block.process.process_id.to_string(),
66            src_block.block.stream_id.to_string(),
67            src_block.block.block_id.to_string(),
68            insert_time_nanos,
69            src_block.process.clone(),
70        );
71
72        for scope_spans in &resource_spans.scope_spans {
73            let scope = scope_spans.scope.as_ref();
74            for span in &scope_spans.spans {
75                builder.append(span, scope, &scope_spans.schema_url)?;
76            }
77        }
78
79        builder.finish()
80    }
81}
82
83/// Per-block accumulator for `otel_spans` rows: owns the column builders, time
84/// bounds, and per-block constants so `append` only takes per-span inputs.
85struct OtelSpansRowBuilder {
86    process_ids: StringDictionaryBuilder<Int32Type>,
87    stream_ids: StringDictionaryBuilder<Int32Type>,
88    block_ids: StringDictionaryBuilder<Int32Type>,
89    insert_times: PrimitiveBuilder<TimestampNanosecondType>,
90    exes: StringBuilder,
91    usernames: StringBuilder,
92    computers: StringBuilder,
93    process_properties: BinaryDictionaryBuilder<Int32Type>,
94    trace_ids: FixedSizeBinaryBuilder,
95    span_ids: FixedSizeBinaryBuilder,
96    parent_span_ids: FixedSizeBinaryBuilder,
97    start_times: PrimitiveBuilder<TimestampNanosecondType>,
98    end_times: PrimitiveBuilder<TimestampNanosecondType>,
99    durations: PrimitiveBuilder<Int64Type>,
100    names: StringDictionaryBuilder<Int32Type>,
101    kinds: StringDictionaryBuilder<Int32Type>,
102    statuses: StringDictionaryBuilder<Int32Type>,
103    status_messages: StringBuilder,
104    properties: BinaryDictionaryBuilder<Int32Type>,
105    events: BinaryBuilder,
106    links: BinaryBuilder,
107    min_time: i64,
108    max_time: i64,
109    nb_appended: usize,
110    process_id_str: String,
111    stream_id_str: String,
112    block_id_str: String,
113    insert_time_nanos: i64,
114    process: Arc<ProcessMetadata>,
115}
116
117impl OtelSpansRowBuilder {
118    fn new(
119        process_id_str: String,
120        stream_id_str: String,
121        block_id_str: String,
122        insert_time_nanos: i64,
123        process: Arc<ProcessMetadata>,
124    ) -> Self {
125        Self {
126            process_ids: StringDictionaryBuilder::new(),
127            stream_ids: StringDictionaryBuilder::new(),
128            block_ids: StringDictionaryBuilder::new(),
129            insert_times: PrimitiveBuilder::new(),
130            exes: StringBuilder::new(),
131            usernames: StringBuilder::new(),
132            computers: StringBuilder::new(),
133            process_properties: BinaryDictionaryBuilder::new(),
134            trace_ids: FixedSizeBinaryBuilder::new(16),
135            span_ids: FixedSizeBinaryBuilder::new(8),
136            parent_span_ids: FixedSizeBinaryBuilder::new(8),
137            start_times: PrimitiveBuilder::new(),
138            end_times: PrimitiveBuilder::new(),
139            durations: PrimitiveBuilder::new(),
140            names: StringDictionaryBuilder::new(),
141            kinds: StringDictionaryBuilder::new(),
142            statuses: StringDictionaryBuilder::new(),
143            status_messages: StringBuilder::new(),
144            properties: BinaryDictionaryBuilder::new(),
145            events: BinaryBuilder::new(),
146            links: BinaryBuilder::new(),
147            min_time: i64::MAX,
148            max_time: i64::MIN,
149            nb_appended: 0,
150            process_id_str,
151            stream_id_str,
152            block_id_str,
153            insert_time_nanos,
154            process,
155        }
156    }
157
158    fn append(
159        &mut self,
160        span: &Span,
161        scope: Option<&InstrumentationScope>,
162        schema_url: &str,
163    ) -> Result<()> {
164        let start_nanos = span.start_time_unix_nano as i64;
165        let end_nanos = span.end_time_unix_nano as i64;
166        if start_nanos == 0 || end_nanos == 0 {
167            debug!(
168                "OTel span without start/end time, skipping (block={})",
169                self.block_id_str
170            );
171            return Ok(());
172        }
173        if span.trace_id.len() != 16 || span.span_id.len() != 8 {
174            debug!(
175                "OTel span with bad trace_id ({}b) / span_id ({}b), skipping",
176                span.trace_id.len(),
177                span.span_id.len(),
178            );
179            return Ok(());
180        }
181        self.min_time = self.min_time.min(start_nanos);
182        self.max_time = self.max_time.max(end_nanos);
183
184        self.trace_ids
185            .append_value(&span.trace_id)
186            .with_context(|| "appending trace_id")?;
187        self.span_ids
188            .append_value(&span.span_id)
189            .with_context(|| "appending span_id")?;
190        if span.parent_span_id.len() == 8 {
191            self.parent_span_ids
192                .append_value(&span.parent_span_id)
193                .with_context(|| "appending parent_span_id")?;
194        } else {
195            self.parent_span_ids.append_null();
196        }
197
198        self.process_ids.append_value(&self.process_id_str);
199        self.stream_ids.append_value(&self.stream_id_str);
200        self.block_ids.append_value(&self.block_id_str);
201        self.insert_times.append_value(self.insert_time_nanos);
202        self.exes.append_value(&self.process.exe);
203        self.usernames.append_value(&self.process.username);
204        self.computers.append_value(&self.process.computer);
205        self.process_properties
206            .append_value(&**self.process.properties);
207
208        self.start_times.append_value(start_nanos);
209        self.end_times.append_value(end_nanos);
210        self.durations.append_value(end_nanos - start_nanos);
211        self.names.append_value(&span.name);
212        self.kinds.append_value(span_kind_str(span.kind));
213        let (status_code, status_message) = match span.status.as_ref() {
214            Some(s) => (proto_status_code_str(s.code), Some(s.message.clone())),
215            None => ("UNSET", None),
216        };
217        self.statuses.append_value(status_code);
218        match status_message.as_deref() {
219            Some(msg) if !msg.is_empty() => self.status_messages.append_value(msg),
220            _ => self.status_messages.append_null(),
221        }
222
223        let extras = scope_extras(scope, schema_url);
224        let props_jsonb = attrs_to_jsonb(&span.attributes, &extras);
225        self.properties.append_value(&props_jsonb);
226
227        // Events as JSONB array.
228        let events_array: Vec<JsonbValue<'static>> = span
229            .events
230            .iter()
231            .map(|ev| {
232                let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
233                map.insert(
234                    "time".into(),
235                    JsonbValue::Number(jsonb::Number::Int64(ev.time_unix_nano as i64)),
236                );
237                map.insert(
238                    "name".into(),
239                    JsonbValue::String(Cow::Owned(ev.name.clone())),
240                );
241                let mut attrs_map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
242                for kv in &ev.attributes {
243                    if let Some(v) = kv.value.as_ref() {
244                        attrs_map.insert(kv.key.clone(), any_value_to_jsonb(v));
245                    }
246                }
247                map.insert("attributes".into(), JsonbValue::Object(attrs_map));
248                JsonbValue::Object(map)
249            })
250            .collect();
251        self.events
252            .append_value(to_jsonb_bytes(JsonbValue::Array(events_array)));
253
254        // Links as JSONB array.
255        let links_array: Vec<JsonbValue<'static>> = span
256            .links
257            .iter()
258            .map(|link| {
259                let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
260                map.insert(
261                    "trace_id".into(),
262                    JsonbValue::String(Cow::Owned(hex::encode(&link.trace_id))),
263                );
264                map.insert(
265                    "span_id".into(),
266                    JsonbValue::String(Cow::Owned(hex::encode(&link.span_id))),
267                );
268                let mut attrs_map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
269                for kv in &link.attributes {
270                    if let Some(v) = kv.value.as_ref() {
271                        attrs_map.insert(kv.key.clone(), any_value_to_jsonb(v));
272                    }
273                }
274                map.insert("attributes".into(), JsonbValue::Object(attrs_map));
275                JsonbValue::Object(map)
276            })
277            .collect();
278        self.links
279            .append_value(to_jsonb_bytes(JsonbValue::Array(links_array)));
280
281        self.nb_appended += 1;
282        Ok(())
283    }
284
285    fn finish(mut self) -> Result<Option<PartitionRowSet>> {
286        if self.nb_appended == 0 {
287            return Ok(None);
288        }
289        let schema = Arc::new(crate::lakehouse::otel::spans_table::otel_spans_table_schema());
290        let batch = RecordBatch::try_new(
291            schema,
292            vec![
293                Arc::new(self.process_ids.finish()),
294                Arc::new(self.stream_ids.finish()),
295                Arc::new(self.block_ids.finish()),
296                Arc::new(self.insert_times.finish().with_timezone_utc()),
297                Arc::new(self.exes.finish()),
298                Arc::new(self.usernames.finish()),
299                Arc::new(self.computers.finish()),
300                Arc::new(self.process_properties.finish()),
301                Arc::new(self.trace_ids.finish()),
302                Arc::new(self.span_ids.finish()),
303                Arc::new(self.parent_span_ids.finish()),
304                Arc::new(self.start_times.finish().with_timezone_utc()),
305                Arc::new(self.end_times.finish().with_timezone_utc()),
306                Arc::new(self.durations.finish()),
307                Arc::new(self.names.finish()),
308                Arc::new(self.kinds.finish()),
309                Arc::new(self.statuses.finish()),
310                Arc::new(self.status_messages.finish()),
311                Arc::new(self.properties.finish()),
312                Arc::new(self.events.finish()),
313                Arc::new(self.links.finish()),
314            ],
315        )
316        .with_context(|| "building otel_spans batch")?;
317
318        Ok(Some(PartitionRowSet::new(
319            TimeRange::new(
320                DateTime::from_timestamp_nanos(self.min_time),
321                DateTime::from_timestamp_nanos(self.max_time),
322            ),
323            batch,
324        )))
325    }
326}
327
328fn span_kind_str(kind: i32) -> &'static str {
329    match span_proto::SpanKind::try_from(kind).unwrap_or(span_proto::SpanKind::Unspecified) {
330        span_proto::SpanKind::Unspecified => "UNSPECIFIED",
331        span_proto::SpanKind::Internal => "INTERNAL",
332        span_proto::SpanKind::Server => "SERVER",
333        span_proto::SpanKind::Client => "CLIENT",
334        span_proto::SpanKind::Producer => "PRODUCER",
335        span_proto::SpanKind::Consumer => "CONSUMER",
336    }
337}
338
339fn proto_status_code_str(code: i32) -> &'static str {
340    match ProtoStatusCode::try_from(code).unwrap_or(ProtoStatusCode::Unset) {
341        ProtoStatusCode::Unset => "UNSET",
342        ProtoStatusCode::Ok => "OK",
343        ProtoStatusCode::Error => "ERROR",
344    }
345}