1use super::attrs::{any_value_to_jsonb, attrs_to_jsonb, scope_extras, to_jsonb_bytes};
8use crate::lakehouse::{
9 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10 write_partition::PartitionRowSet,
11};
12use crate::metadata::ProcessMetadata;
13use crate::payload::fetch_block_payload;
14use crate::time::TimeRange;
15use anyhow::{Context, Result};
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::arrow::array::{
19 BinaryBuilder, BinaryDictionaryBuilder, FixedSizeBinaryBuilder, PrimitiveBuilder,
20 StringBuilder, StringDictionaryBuilder,
21};
22use datafusion::arrow::datatypes::{Int32Type, Int64Type, TimestampNanosecondType};
23use datafusion::arrow::record_batch::RecordBatch;
24use jsonb::Value as JsonbValue;
25use micromegas_telemetry::blob_storage::BlobStorage;
26use micromegas_tracing::prelude::*;
27use opentelemetry_proto::tonic::common::v1::InstrumentationScope;
28use opentelemetry_proto::tonic::trace::v1::{
29 ResourceSpans, Span, span as span_proto, status::StatusCode as ProtoStatusCode,
30};
31use prost::Message;
32use std::borrow::Cow;
33use std::collections::BTreeMap;
34use std::sync::Arc;
35
36#[derive(Debug)]
37pub struct OtelSpansBlockProcessor {}
38
39#[async_trait]
40impl BlockProcessor for OtelSpansBlockProcessor {
41 #[span_fn]
42 async fn process(
43 &self,
44 blob_storage: Arc<BlobStorage>,
45 src_block: Arc<PartitionSourceBlock>,
46 ) -> Result<Option<PartitionRowSet>> {
47 let payload = fetch_block_payload(
48 blob_storage,
49 sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
50 sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
51 sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
52 )
53 .await
54 .with_context(|| "fetch_block_payload")?;
55
56 let resource_spans = ResourceSpans::decode(payload.objects.as_slice())
57 .with_context(|| "decoding ResourceSpans proto")?;
58
59 let insert_time_nanos = src_block
60 .block
61 .insert_time
62 .timestamp_nanos_opt()
63 .with_context(|| "block.insert_time โ nanos")?;
64 let mut builder = OtelSpansRowBuilder::new(
65 src_block.process.process_id.to_string(),
66 src_block.block.stream_id.to_string(),
67 src_block.block.block_id.to_string(),
68 insert_time_nanos,
69 src_block.process.clone(),
70 );
71
72 for scope_spans in &resource_spans.scope_spans {
73 let scope = scope_spans.scope.as_ref();
74 for span in &scope_spans.spans {
75 builder.append(span, scope, &scope_spans.schema_url)?;
76 }
77 }
78
79 builder.finish()
80 }
81}
82
83struct OtelSpansRowBuilder {
86 process_ids: StringDictionaryBuilder<Int32Type>,
87 stream_ids: StringDictionaryBuilder<Int32Type>,
88 block_ids: StringDictionaryBuilder<Int32Type>,
89 insert_times: PrimitiveBuilder<TimestampNanosecondType>,
90 exes: StringBuilder,
91 usernames: StringBuilder,
92 computers: StringBuilder,
93 process_properties: BinaryDictionaryBuilder<Int32Type>,
94 trace_ids: FixedSizeBinaryBuilder,
95 span_ids: FixedSizeBinaryBuilder,
96 parent_span_ids: FixedSizeBinaryBuilder,
97 start_times: PrimitiveBuilder<TimestampNanosecondType>,
98 end_times: PrimitiveBuilder<TimestampNanosecondType>,
99 durations: PrimitiveBuilder<Int64Type>,
100 names: StringDictionaryBuilder<Int32Type>,
101 kinds: StringDictionaryBuilder<Int32Type>,
102 statuses: StringDictionaryBuilder<Int32Type>,
103 status_messages: StringBuilder,
104 properties: BinaryDictionaryBuilder<Int32Type>,
105 events: BinaryBuilder,
106 links: BinaryBuilder,
107 min_time: i64,
108 max_time: i64,
109 nb_appended: usize,
110 process_id_str: String,
111 stream_id_str: String,
112 block_id_str: String,
113 insert_time_nanos: i64,
114 process: Arc<ProcessMetadata>,
115}
116
117impl OtelSpansRowBuilder {
118 fn new(
119 process_id_str: String,
120 stream_id_str: String,
121 block_id_str: String,
122 insert_time_nanos: i64,
123 process: Arc<ProcessMetadata>,
124 ) -> Self {
125 Self {
126 process_ids: StringDictionaryBuilder::new(),
127 stream_ids: StringDictionaryBuilder::new(),
128 block_ids: StringDictionaryBuilder::new(),
129 insert_times: PrimitiveBuilder::new(),
130 exes: StringBuilder::new(),
131 usernames: StringBuilder::new(),
132 computers: StringBuilder::new(),
133 process_properties: BinaryDictionaryBuilder::new(),
134 trace_ids: FixedSizeBinaryBuilder::new(16),
135 span_ids: FixedSizeBinaryBuilder::new(8),
136 parent_span_ids: FixedSizeBinaryBuilder::new(8),
137 start_times: PrimitiveBuilder::new(),
138 end_times: PrimitiveBuilder::new(),
139 durations: PrimitiveBuilder::new(),
140 names: StringDictionaryBuilder::new(),
141 kinds: StringDictionaryBuilder::new(),
142 statuses: StringDictionaryBuilder::new(),
143 status_messages: StringBuilder::new(),
144 properties: BinaryDictionaryBuilder::new(),
145 events: BinaryBuilder::new(),
146 links: BinaryBuilder::new(),
147 min_time: i64::MAX,
148 max_time: i64::MIN,
149 nb_appended: 0,
150 process_id_str,
151 stream_id_str,
152 block_id_str,
153 insert_time_nanos,
154 process,
155 }
156 }
157
158 fn append(
159 &mut self,
160 span: &Span,
161 scope: Option<&InstrumentationScope>,
162 schema_url: &str,
163 ) -> Result<()> {
164 let start_nanos = span.start_time_unix_nano as i64;
165 let end_nanos = span.end_time_unix_nano as i64;
166 if start_nanos == 0 || end_nanos == 0 {
167 debug!(
168 "OTel span without start/end time, skipping (block={})",
169 self.block_id_str
170 );
171 return Ok(());
172 }
173 if span.trace_id.len() != 16 || span.span_id.len() != 8 {
174 debug!(
175 "OTel span with bad trace_id ({}b) / span_id ({}b), skipping",
176 span.trace_id.len(),
177 span.span_id.len(),
178 );
179 return Ok(());
180 }
181 self.min_time = self.min_time.min(start_nanos);
182 self.max_time = self.max_time.max(end_nanos);
183
184 self.trace_ids
185 .append_value(&span.trace_id)
186 .with_context(|| "appending trace_id")?;
187 self.span_ids
188 .append_value(&span.span_id)
189 .with_context(|| "appending span_id")?;
190 if span.parent_span_id.len() == 8 {
191 self.parent_span_ids
192 .append_value(&span.parent_span_id)
193 .with_context(|| "appending parent_span_id")?;
194 } else {
195 self.parent_span_ids.append_null();
196 }
197
198 self.process_ids.append_value(&self.process_id_str);
199 self.stream_ids.append_value(&self.stream_id_str);
200 self.block_ids.append_value(&self.block_id_str);
201 self.insert_times.append_value(self.insert_time_nanos);
202 self.exes.append_value(&self.process.exe);
203 self.usernames.append_value(&self.process.username);
204 self.computers.append_value(&self.process.computer);
205 self.process_properties
206 .append_value(&**self.process.properties);
207
208 self.start_times.append_value(start_nanos);
209 self.end_times.append_value(end_nanos);
210 self.durations.append_value(end_nanos - start_nanos);
211 self.names.append_value(&span.name);
212 self.kinds.append_value(span_kind_str(span.kind));
213 let (status_code, status_message) = match span.status.as_ref() {
214 Some(s) => (proto_status_code_str(s.code), Some(s.message.clone())),
215 None => ("UNSET", None),
216 };
217 self.statuses.append_value(status_code);
218 match status_message.as_deref() {
219 Some(msg) if !msg.is_empty() => self.status_messages.append_value(msg),
220 _ => self.status_messages.append_null(),
221 }
222
223 let extras = scope_extras(scope, schema_url);
224 let props_jsonb = attrs_to_jsonb(&span.attributes, &extras);
225 self.properties.append_value(&props_jsonb);
226
227 let events_array: Vec<JsonbValue<'static>> = span
229 .events
230 .iter()
231 .map(|ev| {
232 let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
233 map.insert(
234 "time".into(),
235 JsonbValue::Number(jsonb::Number::Int64(ev.time_unix_nano as i64)),
236 );
237 map.insert(
238 "name".into(),
239 JsonbValue::String(Cow::Owned(ev.name.clone())),
240 );
241 let mut attrs_map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
242 for kv in &ev.attributes {
243 if let Some(v) = kv.value.as_ref() {
244 attrs_map.insert(kv.key.clone(), any_value_to_jsonb(v));
245 }
246 }
247 map.insert("attributes".into(), JsonbValue::Object(attrs_map));
248 JsonbValue::Object(map)
249 })
250 .collect();
251 self.events
252 .append_value(to_jsonb_bytes(JsonbValue::Array(events_array)));
253
254 let links_array: Vec<JsonbValue<'static>> = span
256 .links
257 .iter()
258 .map(|link| {
259 let mut map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
260 map.insert(
261 "trace_id".into(),
262 JsonbValue::String(Cow::Owned(hex::encode(&link.trace_id))),
263 );
264 map.insert(
265 "span_id".into(),
266 JsonbValue::String(Cow::Owned(hex::encode(&link.span_id))),
267 );
268 let mut attrs_map: BTreeMap<String, JsonbValue<'static>> = BTreeMap::new();
269 for kv in &link.attributes {
270 if let Some(v) = kv.value.as_ref() {
271 attrs_map.insert(kv.key.clone(), any_value_to_jsonb(v));
272 }
273 }
274 map.insert("attributes".into(), JsonbValue::Object(attrs_map));
275 JsonbValue::Object(map)
276 })
277 .collect();
278 self.links
279 .append_value(to_jsonb_bytes(JsonbValue::Array(links_array)));
280
281 self.nb_appended += 1;
282 Ok(())
283 }
284
285 fn finish(mut self) -> Result<Option<PartitionRowSet>> {
286 if self.nb_appended == 0 {
287 return Ok(None);
288 }
289 let schema = Arc::new(crate::lakehouse::otel::spans_table::otel_spans_table_schema());
290 let batch = RecordBatch::try_new(
291 schema,
292 vec![
293 Arc::new(self.process_ids.finish()),
294 Arc::new(self.stream_ids.finish()),
295 Arc::new(self.block_ids.finish()),
296 Arc::new(self.insert_times.finish().with_timezone_utc()),
297 Arc::new(self.exes.finish()),
298 Arc::new(self.usernames.finish()),
299 Arc::new(self.computers.finish()),
300 Arc::new(self.process_properties.finish()),
301 Arc::new(self.trace_ids.finish()),
302 Arc::new(self.span_ids.finish()),
303 Arc::new(self.parent_span_ids.finish()),
304 Arc::new(self.start_times.finish().with_timezone_utc()),
305 Arc::new(self.end_times.finish().with_timezone_utc()),
306 Arc::new(self.durations.finish()),
307 Arc::new(self.names.finish()),
308 Arc::new(self.kinds.finish()),
309 Arc::new(self.statuses.finish()),
310 Arc::new(self.status_messages.finish()),
311 Arc::new(self.properties.finish()),
312 Arc::new(self.events.finish()),
313 Arc::new(self.links.finish()),
314 ],
315 )
316 .with_context(|| "building otel_spans batch")?;
317
318 Ok(Some(PartitionRowSet::new(
319 TimeRange::new(
320 DateTime::from_timestamp_nanos(self.min_time),
321 DateTime::from_timestamp_nanos(self.max_time),
322 ),
323 batch,
324 )))
325 }
326}
327
328fn span_kind_str(kind: i32) -> &'static str {
329 match span_proto::SpanKind::try_from(kind).unwrap_or(span_proto::SpanKind::Unspecified) {
330 span_proto::SpanKind::Unspecified => "UNSPECIFIED",
331 span_proto::SpanKind::Internal => "INTERNAL",
332 span_proto::SpanKind::Server => "SERVER",
333 span_proto::SpanKind::Client => "CLIENT",
334 span_proto::SpanKind::Producer => "PRODUCER",
335 span_proto::SpanKind::Consumer => "CONSUMER",
336 }
337}
338
339fn proto_status_code_str(code: i32) -> &'static str {
340 match ProtoStatusCode::try_from(code).unwrap_or(ProtoStatusCode::Unset) {
341 ProtoStatusCode::Unset => "UNSET",
342 ProtoStatusCode::Ok => "OK",
343 ProtoStatusCode::Error => "ERROR",
344 }
345}