micromegas_analytics/lakehouse/otel/
metrics_block_processor.rs1use super::attrs::attrs_to_jsonb;
8use crate::lakehouse::{
9 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10 write_partition::PartitionRowSet,
11};
12use crate::metadata::ProcessMetadata;
13use crate::payload::fetch_block_payload;
14use crate::time::TimeRange;
15use anyhow::{Context, Result};
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::arrow::array::{
19 BinaryDictionaryBuilder, PrimitiveBuilder, StringDictionaryBuilder,
20};
21use datafusion::arrow::datatypes::{Float64Type, Int16Type, Int32Type, TimestampNanosecondType};
22use datafusion::arrow::record_batch::RecordBatch;
23use jsonb::Value as JsonbValue;
24use micromegas_telemetry::blob_storage::BlobStorage;
25use micromegas_tracing::prelude::*;
26use opentelemetry_proto::tonic::metrics::v1::{
27 NumberDataPoint, ResourceMetrics, metric::Data, number_data_point,
28};
29use prost::Message;
30use std::borrow::Cow;
31use std::sync::Arc;
32
33#[derive(Debug)]
34pub struct OtelMetricsBlockProcessor {}
35
36#[async_trait]
37impl BlockProcessor for OtelMetricsBlockProcessor {
38 #[span_fn]
39 async fn process(
40 &self,
41 blob_storage: Arc<BlobStorage>,
42 src_block: Arc<PartitionSourceBlock>,
43 ) -> Result<Option<PartitionRowSet>> {
44 let payload = fetch_block_payload(
45 blob_storage,
46 sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
47 sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
48 sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
49 )
50 .await
51 .with_context(|| "fetch_block_payload")?;
52
53 let resource_metrics = ResourceMetrics::decode(payload.objects.as_slice())
54 .with_context(|| "decoding ResourceMetrics proto")?;
55
56 let insert_time_nanos = src_block
57 .block
58 .insert_time
59 .timestamp_nanos_opt()
60 .with_context(|| "block.insert_time โ nanos")?;
61 let mut builder = MeasuresRowBuilder::new(
62 src_block.process.process_id.to_string(),
63 src_block.block.stream_id.to_string(),
64 src_block.block.block_id.to_string(),
65 insert_time_nanos,
66 src_block.process.clone(),
67 );
68
69 for scope_metrics in &resource_metrics.scope_metrics {
70 let scope = scope_metrics.scope.as_ref();
71 let scope_name = scope.map(|s| s.name.clone()).unwrap_or_default();
72
73 for metric in &scope_metrics.metrics {
74 match metric.data.as_ref() {
75 Some(Data::Sum(sum)) => {
76 let extras = [
77 (
78 "otel.metric.aggregation_temporality".to_string(),
79 JsonbValue::Number(jsonb::Number::Int64(
80 sum.aggregation_temporality as i64,
81 )),
82 ),
83 (
84 "otel.metric.is_monotonic".to_string(),
85 JsonbValue::Bool(sum.is_monotonic),
86 ),
87 (
88 "otel.metric.kind".to_string(),
89 JsonbValue::String(Cow::Borrowed("sum")),
90 ),
91 ];
92 for dp in &sum.data_points {
93 builder.append(&scope_name, &metric.name, &metric.unit, dp, &extras);
94 }
95 }
96 Some(Data::Gauge(gauge)) => {
97 let extras = [(
98 "otel.metric.kind".to_string(),
99 JsonbValue::String(Cow::Borrowed("gauge")),
100 )];
101 for dp in &gauge.data_points {
102 builder.append(&scope_name, &metric.name, &metric.unit, dp, &extras);
103 }
104 }
105 Some(Data::Histogram(h)) => {
106 debug!(
107 "OTel histogram dropped (deferred to v2): name={} unit={} points={}",
108 metric.name,
109 metric.unit,
110 h.data_points.len()
111 );
112 }
113 Some(Data::ExponentialHistogram(h)) => {
114 debug!(
115 "OTel exponential_histogram dropped (deferred to v2): name={} unit={} points={}",
116 metric.name,
117 metric.unit,
118 h.data_points.len()
119 );
120 }
121 Some(Data::Summary(s)) => {
122 debug!(
123 "OTel summary dropped (deprecated in OTel): name={} unit={} points={}",
124 metric.name,
125 metric.unit,
126 s.data_points.len()
127 );
128 }
129 None => {}
130 }
131 }
132 }
133
134 builder.finish()
135 }
136}
137
138struct MeasuresRowBuilder {
141 process_ids: StringDictionaryBuilder<Int16Type>,
142 stream_ids: StringDictionaryBuilder<Int16Type>,
143 block_ids: StringDictionaryBuilder<Int16Type>,
144 insert_times: PrimitiveBuilder<TimestampNanosecondType>,
145 exes: StringDictionaryBuilder<Int16Type>,
146 usernames: StringDictionaryBuilder<Int16Type>,
147 computers: StringDictionaryBuilder<Int16Type>,
148 times: PrimitiveBuilder<TimestampNanosecondType>,
149 targets: StringDictionaryBuilder<Int16Type>,
150 names: StringDictionaryBuilder<Int16Type>,
151 units: StringDictionaryBuilder<Int16Type>,
152 values: PrimitiveBuilder<Float64Type>,
153 properties: BinaryDictionaryBuilder<Int32Type>,
154 process_properties: BinaryDictionaryBuilder<Int32Type>,
155 min_time: i64,
156 max_time: i64,
157 nb_appended: usize,
158 process_id_str: String,
159 stream_id_str: String,
160 block_id_str: String,
161 insert_time_nanos: i64,
162 process: Arc<ProcessMetadata>,
163}
164
165impl MeasuresRowBuilder {
166 fn new(
167 process_id_str: String,
168 stream_id_str: String,
169 block_id_str: String,
170 insert_time_nanos: i64,
171 process: Arc<ProcessMetadata>,
172 ) -> Self {
173 Self {
174 process_ids: StringDictionaryBuilder::new(),
175 stream_ids: StringDictionaryBuilder::new(),
176 block_ids: StringDictionaryBuilder::new(),
177 insert_times: PrimitiveBuilder::new(),
178 exes: StringDictionaryBuilder::new(),
179 usernames: StringDictionaryBuilder::new(),
180 computers: StringDictionaryBuilder::new(),
181 times: PrimitiveBuilder::new(),
182 targets: StringDictionaryBuilder::new(),
183 names: StringDictionaryBuilder::new(),
184 units: StringDictionaryBuilder::new(),
185 values: PrimitiveBuilder::new(),
186 properties: BinaryDictionaryBuilder::new(),
187 process_properties: BinaryDictionaryBuilder::new(),
188 min_time: i64::MAX,
189 max_time: i64::MIN,
190 nb_appended: 0,
191 process_id_str,
192 stream_id_str,
193 block_id_str,
194 insert_time_nanos,
195 process,
196 }
197 }
198
199 fn append(
200 &mut self,
201 scope_name: &str,
202 metric_name: &str,
203 unit: &str,
204 dp: &NumberDataPoint,
205 extras: &[(String, JsonbValue<'static>)],
206 ) {
207 let time_nanos = dp.time_unix_nano as i64;
208 if time_nanos == 0 {
209 debug!("OTel metric data point for {metric_name} dropped (time_unix_nano=0)");
210 return;
211 }
212
213 let value = match dp.value.as_ref() {
214 Some(number_data_point::Value::AsDouble(d)) => *d,
215 Some(number_data_point::Value::AsInt(i)) => *i as f64,
216 None => {
217 debug!("OTel data point for {metric_name} has no value, skipping");
218 return;
219 }
220 };
221
222 self.min_time = self.min_time.min(time_nanos);
223 self.max_time = self.max_time.max(time_nanos);
224
225 let props_jsonb = attrs_to_jsonb(&dp.attributes, extras);
226
227 self.process_ids.append_value(&self.process_id_str);
228 self.stream_ids.append_value(&self.stream_id_str);
229 self.block_ids.append_value(&self.block_id_str);
230 self.insert_times.append_value(self.insert_time_nanos);
231 self.exes.append_value(&self.process.exe);
232 self.usernames.append_value(&self.process.username);
233 self.computers.append_value(&self.process.computer);
234 self.times.append_value(time_nanos);
235 self.targets.append_value(scope_name);
236 self.names.append_value(metric_name);
237 self.units.append_value(unit);
238 self.values.append_value(value);
239 self.properties.append_value(&props_jsonb);
240 self.process_properties
241 .append_value(&**self.process.properties);
242
243 self.nb_appended += 1;
244 }
245
246 fn finish(mut self) -> Result<Option<PartitionRowSet>> {
247 if self.nb_appended == 0 {
248 return Ok(None);
249 }
250 let schema = Arc::new(crate::metrics_table::metrics_table_schema());
251 let batch = RecordBatch::try_new(
252 schema,
253 vec![
254 Arc::new(self.process_ids.finish()),
255 Arc::new(self.stream_ids.finish()),
256 Arc::new(self.block_ids.finish()),
257 Arc::new(self.insert_times.finish().with_timezone_utc()),
258 Arc::new(self.exes.finish()),
259 Arc::new(self.usernames.finish()),
260 Arc::new(self.computers.finish()),
261 Arc::new(self.times.finish().with_timezone_utc()),
262 Arc::new(self.targets.finish()),
263 Arc::new(self.names.finish()),
264 Arc::new(self.units.finish()),
265 Arc::new(self.values.finish()),
266 Arc::new(self.properties.finish()),
267 Arc::new(self.process_properties.finish()),
268 ],
269 )
270 .with_context(|| "building OTel measures batch")?;
271
272 Ok(Some(PartitionRowSet::new(
273 TimeRange::new(
274 DateTime::from_timestamp_nanos(self.min_time),
275 DateTime::from_timestamp_nanos(self.max_time),
276 ),
277 batch,
278 )))
279 }
280}