micromegas_analytics/lakehouse/otel/
metrics_block_processor.rs

1//! `BlockProcessor` for OTLP `ResourceMetrics` payloads โ†’ `measures` rows.
2//!
3//! Handles Sum and Gauge data points; logs and skips Histogram, ExponentialHistogram,
4//! and Summary (deferred to v2 โ€” see plan ยง"Histograms deferred"). Aggregation
5//! temporality and `is_monotonic` ride along on per-row properties.
6
7use super::attrs::attrs_to_jsonb;
8use crate::lakehouse::{
9    block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10    write_partition::PartitionRowSet,
11};
12use crate::metadata::ProcessMetadata;
13use crate::payload::fetch_block_payload;
14use crate::time::TimeRange;
15use anyhow::{Context, Result};
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::arrow::array::{
19    BinaryDictionaryBuilder, PrimitiveBuilder, StringDictionaryBuilder,
20};
21use datafusion::arrow::datatypes::{Float64Type, Int16Type, Int32Type, TimestampNanosecondType};
22use datafusion::arrow::record_batch::RecordBatch;
23use jsonb::Value as JsonbValue;
24use micromegas_telemetry::blob_storage::BlobStorage;
25use micromegas_tracing::prelude::*;
26use opentelemetry_proto::tonic::metrics::v1::{
27    NumberDataPoint, ResourceMetrics, metric::Data, number_data_point,
28};
29use prost::Message;
30use std::borrow::Cow;
31use std::sync::Arc;
32
33#[derive(Debug)]
34pub struct OtelMetricsBlockProcessor {}
35
36#[async_trait]
37impl BlockProcessor for OtelMetricsBlockProcessor {
38    #[span_fn]
39    async fn process(
40        &self,
41        blob_storage: Arc<BlobStorage>,
42        src_block: Arc<PartitionSourceBlock>,
43    ) -> Result<Option<PartitionRowSet>> {
44        let payload = fetch_block_payload(
45            blob_storage,
46            sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
47            sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
48            sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
49        )
50        .await
51        .with_context(|| "fetch_block_payload")?;
52
53        let resource_metrics = ResourceMetrics::decode(payload.objects.as_slice())
54            .with_context(|| "decoding ResourceMetrics proto")?;
55
56        let insert_time_nanos = src_block
57            .block
58            .insert_time
59            .timestamp_nanos_opt()
60            .with_context(|| "block.insert_time โ†’ nanos")?;
61        let mut builder = MeasuresRowBuilder::new(
62            src_block.process.process_id.to_string(),
63            src_block.block.stream_id.to_string(),
64            src_block.block.block_id.to_string(),
65            insert_time_nanos,
66            src_block.process.clone(),
67        );
68
69        for scope_metrics in &resource_metrics.scope_metrics {
70            let scope = scope_metrics.scope.as_ref();
71            let scope_name = scope.map(|s| s.name.clone()).unwrap_or_default();
72
73            for metric in &scope_metrics.metrics {
74                match metric.data.as_ref() {
75                    Some(Data::Sum(sum)) => {
76                        let extras = [
77                            (
78                                "otel.metric.aggregation_temporality".to_string(),
79                                JsonbValue::Number(jsonb::Number::Int64(
80                                    sum.aggregation_temporality as i64,
81                                )),
82                            ),
83                            (
84                                "otel.metric.is_monotonic".to_string(),
85                                JsonbValue::Bool(sum.is_monotonic),
86                            ),
87                            (
88                                "otel.metric.kind".to_string(),
89                                JsonbValue::String(Cow::Borrowed("sum")),
90                            ),
91                        ];
92                        for dp in &sum.data_points {
93                            builder.append(&scope_name, &metric.name, &metric.unit, dp, &extras);
94                        }
95                    }
96                    Some(Data::Gauge(gauge)) => {
97                        let extras = [(
98                            "otel.metric.kind".to_string(),
99                            JsonbValue::String(Cow::Borrowed("gauge")),
100                        )];
101                        for dp in &gauge.data_points {
102                            builder.append(&scope_name, &metric.name, &metric.unit, dp, &extras);
103                        }
104                    }
105                    Some(Data::Histogram(h)) => {
106                        debug!(
107                            "OTel histogram dropped (deferred to v2): name={} unit={} points={}",
108                            metric.name,
109                            metric.unit,
110                            h.data_points.len()
111                        );
112                    }
113                    Some(Data::ExponentialHistogram(h)) => {
114                        debug!(
115                            "OTel exponential_histogram dropped (deferred to v2): name={} unit={} points={}",
116                            metric.name,
117                            metric.unit,
118                            h.data_points.len()
119                        );
120                    }
121                    Some(Data::Summary(s)) => {
122                        debug!(
123                            "OTel summary dropped (deprecated in OTel): name={} unit={} points={}",
124                            metric.name,
125                            metric.unit,
126                            s.data_points.len()
127                        );
128                    }
129                    None => {}
130                }
131            }
132        }
133
134        builder.finish()
135    }
136}
137
138/// Per-block accumulator for `measures` rows: owns the column builders, time
139/// bounds, and per-block constants so `append` only takes per-data-point inputs.
140struct MeasuresRowBuilder {
141    process_ids: StringDictionaryBuilder<Int16Type>,
142    stream_ids: StringDictionaryBuilder<Int16Type>,
143    block_ids: StringDictionaryBuilder<Int16Type>,
144    insert_times: PrimitiveBuilder<TimestampNanosecondType>,
145    exes: StringDictionaryBuilder<Int16Type>,
146    usernames: StringDictionaryBuilder<Int16Type>,
147    computers: StringDictionaryBuilder<Int16Type>,
148    times: PrimitiveBuilder<TimestampNanosecondType>,
149    targets: StringDictionaryBuilder<Int16Type>,
150    names: StringDictionaryBuilder<Int16Type>,
151    units: StringDictionaryBuilder<Int16Type>,
152    values: PrimitiveBuilder<Float64Type>,
153    properties: BinaryDictionaryBuilder<Int32Type>,
154    process_properties: BinaryDictionaryBuilder<Int32Type>,
155    min_time: i64,
156    max_time: i64,
157    nb_appended: usize,
158    process_id_str: String,
159    stream_id_str: String,
160    block_id_str: String,
161    insert_time_nanos: i64,
162    process: Arc<ProcessMetadata>,
163}
164
165impl MeasuresRowBuilder {
166    fn new(
167        process_id_str: String,
168        stream_id_str: String,
169        block_id_str: String,
170        insert_time_nanos: i64,
171        process: Arc<ProcessMetadata>,
172    ) -> Self {
173        Self {
174            process_ids: StringDictionaryBuilder::new(),
175            stream_ids: StringDictionaryBuilder::new(),
176            block_ids: StringDictionaryBuilder::new(),
177            insert_times: PrimitiveBuilder::new(),
178            exes: StringDictionaryBuilder::new(),
179            usernames: StringDictionaryBuilder::new(),
180            computers: StringDictionaryBuilder::new(),
181            times: PrimitiveBuilder::new(),
182            targets: StringDictionaryBuilder::new(),
183            names: StringDictionaryBuilder::new(),
184            units: StringDictionaryBuilder::new(),
185            values: PrimitiveBuilder::new(),
186            properties: BinaryDictionaryBuilder::new(),
187            process_properties: BinaryDictionaryBuilder::new(),
188            min_time: i64::MAX,
189            max_time: i64::MIN,
190            nb_appended: 0,
191            process_id_str,
192            stream_id_str,
193            block_id_str,
194            insert_time_nanos,
195            process,
196        }
197    }
198
199    fn append(
200        &mut self,
201        scope_name: &str,
202        metric_name: &str,
203        unit: &str,
204        dp: &NumberDataPoint,
205        extras: &[(String, JsonbValue<'static>)],
206    ) {
207        let time_nanos = dp.time_unix_nano as i64;
208        if time_nanos == 0 {
209            debug!("OTel metric data point for {metric_name} dropped (time_unix_nano=0)");
210            return;
211        }
212
213        let value = match dp.value.as_ref() {
214            Some(number_data_point::Value::AsDouble(d)) => *d,
215            Some(number_data_point::Value::AsInt(i)) => *i as f64,
216            None => {
217                debug!("OTel data point for {metric_name} has no value, skipping");
218                return;
219            }
220        };
221
222        self.min_time = self.min_time.min(time_nanos);
223        self.max_time = self.max_time.max(time_nanos);
224
225        let props_jsonb = attrs_to_jsonb(&dp.attributes, extras);
226
227        self.process_ids.append_value(&self.process_id_str);
228        self.stream_ids.append_value(&self.stream_id_str);
229        self.block_ids.append_value(&self.block_id_str);
230        self.insert_times.append_value(self.insert_time_nanos);
231        self.exes.append_value(&self.process.exe);
232        self.usernames.append_value(&self.process.username);
233        self.computers.append_value(&self.process.computer);
234        self.times.append_value(time_nanos);
235        self.targets.append_value(scope_name);
236        self.names.append_value(metric_name);
237        self.units.append_value(unit);
238        self.values.append_value(value);
239        self.properties.append_value(&props_jsonb);
240        self.process_properties
241            .append_value(&**self.process.properties);
242
243        self.nb_appended += 1;
244    }
245
246    fn finish(mut self) -> Result<Option<PartitionRowSet>> {
247        if self.nb_appended == 0 {
248            return Ok(None);
249        }
250        let schema = Arc::new(crate::metrics_table::metrics_table_schema());
251        let batch = RecordBatch::try_new(
252            schema,
253            vec![
254                Arc::new(self.process_ids.finish()),
255                Arc::new(self.stream_ids.finish()),
256                Arc::new(self.block_ids.finish()),
257                Arc::new(self.insert_times.finish().with_timezone_utc()),
258                Arc::new(self.exes.finish()),
259                Arc::new(self.usernames.finish()),
260                Arc::new(self.computers.finish()),
261                Arc::new(self.times.finish().with_timezone_utc()),
262                Arc::new(self.targets.finish()),
263                Arc::new(self.names.finish()),
264                Arc::new(self.units.finish()),
265                Arc::new(self.values.finish()),
266                Arc::new(self.properties.finish()),
267                Arc::new(self.process_properties.finish()),
268            ],
269        )
270        .with_context(|| "building OTel measures batch")?;
271
272        Ok(Some(PartitionRowSet::new(
273            TimeRange::new(
274                DateTime::from_timestamp_nanos(self.min_time),
275                DateTime::from_timestamp_nanos(self.max_time),
276            ),
277            batch,
278        )))
279    }
280}