micromegas_analytics/lakehouse/otel/
attrs.rs

1//! Converts OTel `KeyValue` arrays + scalar `AnyValue` instances to JSONB bytes.
2//!
3//! Mapping follows the plan's "Attribute value encoding" table:
4//!  - string → JSON string
5//!  - int / double / bool → JSON number / bool
6//!  - bytes → base64-encoded string (existing properties consumers expect text)
7//!  - array / kvlist → recursive JSON
8//!
9//! The output is a JSONB-encoded `{key → value}` blob suitable for the
10//! `properties` columns across `log_entries`, `measures`, and `otel_spans`.
11
12use base64::Engine;
13use jsonb::{Number as JsonbNumber, Value as JsonbValue};
14use opentelemetry_proto::tonic::common::v1::{
15    AnyValue, InstrumentationScope, KeyValue, any_value::Value as Av,
16};
17use std::borrow::Cow;
18use std::collections::BTreeMap;
19
20/// Converts an `AnyValue` to a `jsonb::Value`. Recursively handles arrays and kvlists.
21pub fn any_value_to_jsonb(v: &AnyValue) -> JsonbValue<'static> {
22    match v.value.as_ref() {
23        Some(Av::StringValue(s)) => JsonbValue::String(Cow::Owned(s.clone())),
24        Some(Av::BoolValue(b)) => JsonbValue::Bool(*b),
25        Some(Av::IntValue(i)) => JsonbValue::Number(JsonbNumber::Int64(*i)),
26        Some(Av::DoubleValue(d)) => JsonbValue::Number(JsonbNumber::Float64(*d)),
27        Some(Av::BytesValue(b)) => {
28            // existing JSONB readers (`jsonb_extract_path`, etc.) expect strings,
29            // so we base64-encode bytes rather than emitting a JSON binary type.
30            let encoded = base64::engine::general_purpose::STANDARD.encode(b);
31            JsonbValue::String(Cow::Owned(encoded))
32        }
33        Some(Av::ArrayValue(arr)) => {
34            JsonbValue::Array(arr.values.iter().map(any_value_to_jsonb).collect())
35        }
36        Some(Av::KvlistValue(kvs)) => {
37            let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
38            for kv in &kvs.values {
39                let value = kv
40                    .value
41                    .as_ref()
42                    .map(any_value_to_jsonb)
43                    .unwrap_or(JsonbValue::Null);
44                map.insert(kv.key.clone(), value);
45            }
46            JsonbValue::Object(map)
47        }
48        None => JsonbValue::Null,
49    }
50}
51
52/// Encodes a `JsonbValue` to its on-wire JSONB bytes.
53pub fn to_jsonb_bytes(value: JsonbValue<'_>) -> Vec<u8> {
54    let mut bytes = Vec::new();
55    value.write_to_vec(&mut bytes);
56    bytes
57}
58
59/// Serializes a flat `(key → value)` map (with optional extra entries layered on top)
60/// to JSONB bytes. Output ordering is alphabetical, matching `serialize_properties_to_jsonb`.
61pub fn attrs_to_jsonb(attrs: &[KeyValue], extras: &[(String, JsonbValue<'static>)]) -> Vec<u8> {
62    let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
63    for kv in attrs {
64        let value = kv
65            .value
66            .as_ref()
67            .map(any_value_to_jsonb)
68            .unwrap_or(JsonbValue::Null);
69        map.insert(kv.key.clone(), value);
70    }
71    for (k, v) in extras {
72        map.insert(k.clone(), v.clone());
73    }
74    to_jsonb_bytes(JsonbValue::Object(map))
75}
76
77/// Renders `AnyValue` to a flat string for fields that need a textual form
78/// (e.g., the `msg` column when an OTel log body is structured).
79pub fn any_value_to_string(v: &AnyValue) -> String {
80    match v.value.as_ref() {
81        Some(Av::StringValue(s)) => s.clone(),
82        Some(Av::IntValue(i)) => i.to_string(),
83        Some(Av::DoubleValue(d)) => d.to_string(),
84        Some(Av::BoolValue(b)) => b.to_string(),
85        Some(Av::BytesValue(b)) => base64::engine::general_purpose::STANDARD.encode(b),
86        Some(Av::ArrayValue(arr)) => {
87            // Render via JSONB to keep round-trippable representations.
88            let bytes = to_jsonb_bytes(JsonbValue::Array(
89                arr.values.iter().map(any_value_to_jsonb).collect(),
90            ));
91            jsonb::RawJsonb::new(&bytes).to_string()
92        }
93        Some(Av::KvlistValue(kvs)) => {
94            let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
95            for kv in &kvs.values {
96                let value = kv
97                    .value
98                    .as_ref()
99                    .map(any_value_to_jsonb)
100                    .unwrap_or(JsonbValue::Null);
101                map.insert(kv.key.clone(), value);
102            }
103            let bytes = to_jsonb_bytes(JsonbValue::Object(map));
104            jsonb::RawJsonb::new(&bytes).to_string()
105        }
106        None => String::new(),
107    }
108}
109
110/// Maps OTel `severity_number` (1–24) to micromegas `Level` (1–6).
111///
112/// Per the plan:
113///  - TRACE   1–4   → 6
114///  - DEBUG   5–8   → 5
115///  - INFO    9–12  → 4
116///  - WARN    13–16 → 3
117///  - ERROR   17–20 → 2
118///  - FATAL   21–24 → 1
119///
120/// `severity_number = 0` (UNSPECIFIED) → 4 (Info), so the default
121/// `WHERE level <= 4` filter keeps them visible — the SDK didn't tell us they were
122/// low-priority, so we don't bury them. Out-of-range (negative or `> 24`) → 4 (Info)
123/// as well; promoting an unknown severity to Fatal would silently pollute alerting
124/// when a buggy SDK is off-by-one on the FATAL range.
125pub fn severity_number_to_level(sev: i32) -> i32 {
126    match sev {
127        1..=4 => 6,   // TRACE
128        5..=8 => 5,   // DEBUG
129        9..=12 => 4,  // INFO
130        13..=16 => 3, // WARN
131        17..=20 => 2, // ERROR
132        21..=24 => 1, // FATAL
133        _ => 4,       // UNSPECIFIED (0) or out-of-range → Info (don't fake-Fatal-alert)
134    }
135}
136
137/// Builds the per-row `otel.scope.*` properties (`name`, `version`, `attr.*`,
138/// `schema_url`) that ride alongside row attributes in the JSONB `properties`
139/// column. Skips empty fields so absent scopes don't pollute the output.
140pub fn scope_extras(
141    scope: Option<&InstrumentationScope>,
142    schema_url: &str,
143) -> Vec<(String, JsonbValue<'static>)> {
144    let mut extras: Vec<(String, JsonbValue<'static>)> = Vec::new();
145    if let Some(s) = scope {
146        if !s.name.is_empty() {
147            extras.push((
148                "otel.scope.name".to_string(),
149                JsonbValue::String(Cow::Owned(s.name.clone())),
150            ));
151        }
152        if !s.version.is_empty() {
153            extras.push((
154                "otel.scope.version".to_string(),
155                JsonbValue::String(Cow::Owned(s.version.clone())),
156            ));
157        }
158        for kv in &s.attributes {
159            if let Some(v) = kv.value.as_ref() {
160                extras.push((format!("otel.scope.attr.{}", kv.key), any_value_to_jsonb(v)));
161            }
162        }
163    }
164    if !schema_url.is_empty() {
165        extras.push((
166            "otel.scope.schema_url".to_string(),
167            JsonbValue::String(Cow::Owned(schema_url.to_string())),
168        ));
169    }
170    extras
171}