micromegas_analytics/lakehouse/otel/
logs_block_processor.rs

1//! `BlockProcessor` for OTLP `ResourceLogs` payloads → `log_entries` rows.
2//!
3//! The block payload carries a single `ResourceLogs` message (one resource per block —
4//! see Plan §"One block per Resource"). We prost-decode it, walk each scope and log
5//! record, emit one row per `LogRecord`.
6
7use super::attrs::{any_value_to_string, attrs_to_jsonb, scope_extras, severity_number_to_level};
8use crate::lakehouse::{
9    block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10    write_partition::PartitionRowSet,
11};
12use crate::payload::fetch_block_payload;
13use crate::time::TimeRange;
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use chrono::DateTime;
17use datafusion::arrow::array::{
18    BinaryDictionaryBuilder, PrimitiveBuilder, StringBuilder, StringDictionaryBuilder,
19};
20use datafusion::arrow::datatypes::{Int16Type, Int32Type, TimestampNanosecondType};
21use datafusion::arrow::record_batch::RecordBatch;
22use jsonb::Value as JsonbValue;
23use micromegas_telemetry::blob_storage::BlobStorage;
24use micromegas_tracing::prelude::*;
25use opentelemetry_proto::tonic::logs::v1::ResourceLogs;
26use prost::Message;
27use std::borrow::Cow;
28use std::sync::Arc;
29
30#[derive(Debug)]
31pub struct OtelLogsBlockProcessor {}
32
33#[async_trait]
34impl BlockProcessor for OtelLogsBlockProcessor {
35    #[span_fn]
36    async fn process(
37        &self,
38        blob_storage: Arc<BlobStorage>,
39        src_block: Arc<PartitionSourceBlock>,
40    ) -> Result<Option<PartitionRowSet>> {
41        let payload = fetch_block_payload(
42            blob_storage,
43            sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
44            sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
45            sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
46        )
47        .await
48        .with_context(|| "fetch_block_payload")?;
49
50        // Block payload format: BlockPayload { dependencies: [], objects: <ResourceLogs proto> }.
51        // (`fetch_block_payload` already CBOR-decodes the envelope; we get the proto bytes.)
52        let resource_logs = ResourceLogs::decode(payload.objects.as_slice())
53            .with_context(|| "decoding ResourceLogs proto")?;
54
55        let process = &src_block.process;
56        let stream_id_str = src_block.block.stream_id.to_string();
57        let block_id_str = src_block.block.block_id.to_string();
58        let process_id_str = process.process_id.to_string();
59        let insert_time_nanos = src_block
60            .block
61            .insert_time
62            .timestamp_nanos_opt()
63            .with_context(|| "block.insert_time → nanos")?;
64
65        // Pre-count rows so dictionary builders can size their backing storage.
66        let row_count: usize = resource_logs
67            .scope_logs
68            .iter()
69            .map(|s| s.log_records.len())
70            .sum();
71        if row_count == 0 {
72            return Ok(None);
73        }
74
75        let mut process_ids = StringDictionaryBuilder::<Int16Type>::new();
76        let mut stream_ids = StringDictionaryBuilder::<Int16Type>::new();
77        let mut block_ids = StringDictionaryBuilder::<Int16Type>::new();
78        let mut insert_times =
79            PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(row_count);
80        let mut exes = StringDictionaryBuilder::<Int16Type>::new();
81        let mut usernames = StringDictionaryBuilder::<Int16Type>::new();
82        let mut computers = StringDictionaryBuilder::<Int16Type>::new();
83        let mut times = PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(row_count);
84        let mut targets = StringDictionaryBuilder::<Int16Type>::new();
85        let mut levels = PrimitiveBuilder::<Int32Type>::with_capacity(row_count);
86        let mut msgs = StringBuilder::new();
87        let mut properties = BinaryDictionaryBuilder::<Int32Type>::new();
88        let mut process_properties = BinaryDictionaryBuilder::<Int32Type>::new();
89
90        let mut min_time = i64::MAX;
91        let mut max_time = i64::MIN;
92        let mut nb_appended = 0usize;
93        let mut nb_dropped_no_timestamp = 0usize;
94        let mut nb_severity_out_of_range = 0usize;
95
96        for scope_logs in &resource_logs.scope_logs {
97            let scope = scope_logs.scope.as_ref();
98            // `target` mirrors the existing native semantics (logger name).
99            let scope_name = scope.map(|s| s.name.clone()).unwrap_or_default();
100
101            for record in &scope_logs.log_records {
102                // time_unix_nano is optional — fall back to observed_time per OTLP spec.
103                let time_nanos = if record.time_unix_nano != 0 {
104                    record.time_unix_nano as i64
105                } else if record.observed_time_unix_nano != 0 {
106                    record.observed_time_unix_nano as i64
107                } else {
108                    // No timestamp at all — skip so it doesn't anchor the partition
109                    // at 1970-01-01. Aggregated below to one log line per block.
110                    nb_dropped_no_timestamp += 1;
111                    continue;
112                };
113                min_time = min_time.min(time_nanos);
114                max_time = max_time.max(time_nanos);
115
116                let level = severity_number_to_level(record.severity_number);
117                if !(0..=24).contains(&record.severity_number) {
118                    nb_severity_out_of_range += 1;
119                }
120
121                // Body → msg. String body lands directly; structured body gets stringified
122                // (deferred parsing — see the plan's "Logs → log_entries" mapping table).
123                let msg = record
124                    .body
125                    .as_ref()
126                    .map(any_value_to_string)
127                    .unwrap_or_default();
128
129                // Properties: log record attributes + scope info (otel.scope.*) +
130                // optional trace correlation + raw severity_text. Built per-row because
131                // OTel attributes vary record-to-record; dictionary dedup happens inside
132                // BinaryDictionaryBuilder by content hash.
133                let mut extras = scope_extras(scope, &scope_logs.schema_url);
134                // W3C Trace Context: trace_id is 16 bytes, span_id is 8 bytes.
135                // `otel_spans` enforces these lengths and skips bad rows; we mirror
136                // that here so a buggy SDK can't write half-size hex strings that
137                // silently fail correlation joins against the spans view.
138                if !record.trace_id.is_empty() {
139                    if record.trace_id.len() == 16 {
140                        let hex = hex::encode(&record.trace_id);
141                        extras.push((
142                            "otel.trace_id".to_string(),
143                            JsonbValue::String(Cow::Owned(hex)),
144                        ));
145                    } else {
146                        debug!(
147                            "OTel log record with bad trace_id ({}b), dropping property",
148                            record.trace_id.len()
149                        );
150                    }
151                }
152                if !record.span_id.is_empty() {
153                    if record.span_id.len() == 8 {
154                        let hex = hex::encode(&record.span_id);
155                        extras.push((
156                            "otel.span_id".to_string(),
157                            JsonbValue::String(Cow::Owned(hex)),
158                        ));
159                    } else {
160                        debug!(
161                            "OTel log record with bad span_id ({}b), dropping property",
162                            record.span_id.len()
163                        );
164                    }
165                }
166                if !record.severity_text.is_empty() {
167                    extras.push((
168                        "otel.severity_text".to_string(),
169                        JsonbValue::String(Cow::Owned(record.severity_text.clone())),
170                    ));
171                }
172                if !record.event_name.is_empty() {
173                    extras.push((
174                        "otel.event_name".to_string(),
175                        JsonbValue::String(Cow::Owned(record.event_name.clone())),
176                    ));
177                }
178
179                let props_jsonb = attrs_to_jsonb(&record.attributes, &extras);
180
181                process_ids.append_value(&process_id_str);
182                stream_ids.append_value(&stream_id_str);
183                block_ids.append_value(&block_id_str);
184                insert_times.append_value(insert_time_nanos);
185                exes.append_value(&process.exe);
186                usernames.append_value(&process.username);
187                computers.append_value(&process.computer);
188                times.append_value(time_nanos);
189                targets.append_value(&scope_name);
190                levels.append_value(level);
191                msgs.append_value(&msg);
192                properties.append_value(&props_jsonb);
193                process_properties.append_value(&**process.properties);
194
195                nb_appended += 1;
196            }
197        }
198
199        if nb_dropped_no_timestamp > 0 {
200            debug!(
201                "OTel log records without timestamp dropped (block_id={block_id_str}, count={nb_dropped_no_timestamp})"
202            );
203        }
204        if nb_severity_out_of_range > 0 {
205            debug!(
206                "OTel log records with out-of-range severity_number treated as Info (block_id={block_id_str}, count={nb_severity_out_of_range})"
207            );
208        }
209
210        if nb_appended == 0 {
211            return Ok(None);
212        }
213
214        let schema = Arc::new(crate::log_entries_table::log_table_schema());
215        let batch = RecordBatch::try_new(
216            schema,
217            vec![
218                Arc::new(process_ids.finish()),
219                Arc::new(stream_ids.finish()),
220                Arc::new(block_ids.finish()),
221                Arc::new(insert_times.finish().with_timezone_utc()),
222                Arc::new(exes.finish()),
223                Arc::new(usernames.finish()),
224                Arc::new(computers.finish()),
225                Arc::new(times.finish().with_timezone_utc()),
226                Arc::new(targets.finish()),
227                Arc::new(levels.finish()),
228                Arc::new(msgs.finish()),
229                Arc::new(properties.finish()),
230                Arc::new(process_properties.finish()),
231            ],
232        )
233        .with_context(|| "building OTel log_entries batch")?;
234
235        Ok(Some(PartitionRowSet::new(
236            TimeRange::new(
237                DateTime::from_timestamp_nanos(min_time),
238                DateTime::from_timestamp_nanos(max_time),
239            ),
240            batch,
241        )))
242    }
243}