micromegas_analytics/lakehouse/otel/
logs_block_processor.rs1use super::attrs::{any_value_to_string, attrs_to_jsonb, scope_extras, severity_number_to_level};
8use crate::lakehouse::{
9 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
10 write_partition::PartitionRowSet,
11};
12use crate::payload::fetch_block_payload;
13use crate::time::TimeRange;
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use chrono::DateTime;
17use datafusion::arrow::array::{
18 BinaryDictionaryBuilder, PrimitiveBuilder, StringBuilder, StringDictionaryBuilder,
19};
20use datafusion::arrow::datatypes::{Int16Type, Int32Type, TimestampNanosecondType};
21use datafusion::arrow::record_batch::RecordBatch;
22use jsonb::Value as JsonbValue;
23use micromegas_telemetry::blob_storage::BlobStorage;
24use micromegas_tracing::prelude::*;
25use opentelemetry_proto::tonic::logs::v1::ResourceLogs;
26use prost::Message;
27use std::borrow::Cow;
28use std::sync::Arc;
29
30#[derive(Debug)]
31pub struct OtelLogsBlockProcessor {}
32
33#[async_trait]
34impl BlockProcessor for OtelLogsBlockProcessor {
35 #[span_fn]
36 async fn process(
37 &self,
38 blob_storage: Arc<BlobStorage>,
39 src_block: Arc<PartitionSourceBlock>,
40 ) -> Result<Option<PartitionRowSet>> {
41 let payload = fetch_block_payload(
42 blob_storage,
43 sqlx::types::Uuid::from_bytes(*src_block.block.process_id.as_bytes()),
44 sqlx::types::Uuid::from_bytes(*src_block.block.stream_id.as_bytes()),
45 sqlx::types::Uuid::from_bytes(*src_block.block.block_id.as_bytes()),
46 )
47 .await
48 .with_context(|| "fetch_block_payload")?;
49
50 let resource_logs = ResourceLogs::decode(payload.objects.as_slice())
53 .with_context(|| "decoding ResourceLogs proto")?;
54
55 let process = &src_block.process;
56 let stream_id_str = src_block.block.stream_id.to_string();
57 let block_id_str = src_block.block.block_id.to_string();
58 let process_id_str = process.process_id.to_string();
59 let insert_time_nanos = src_block
60 .block
61 .insert_time
62 .timestamp_nanos_opt()
63 .with_context(|| "block.insert_time → nanos")?;
64
65 let row_count: usize = resource_logs
67 .scope_logs
68 .iter()
69 .map(|s| s.log_records.len())
70 .sum();
71 if row_count == 0 {
72 return Ok(None);
73 }
74
75 let mut process_ids = StringDictionaryBuilder::<Int16Type>::new();
76 let mut stream_ids = StringDictionaryBuilder::<Int16Type>::new();
77 let mut block_ids = StringDictionaryBuilder::<Int16Type>::new();
78 let mut insert_times =
79 PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(row_count);
80 let mut exes = StringDictionaryBuilder::<Int16Type>::new();
81 let mut usernames = StringDictionaryBuilder::<Int16Type>::new();
82 let mut computers = StringDictionaryBuilder::<Int16Type>::new();
83 let mut times = PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(row_count);
84 let mut targets = StringDictionaryBuilder::<Int16Type>::new();
85 let mut levels = PrimitiveBuilder::<Int32Type>::with_capacity(row_count);
86 let mut msgs = StringBuilder::new();
87 let mut properties = BinaryDictionaryBuilder::<Int32Type>::new();
88 let mut process_properties = BinaryDictionaryBuilder::<Int32Type>::new();
89
90 let mut min_time = i64::MAX;
91 let mut max_time = i64::MIN;
92 let mut nb_appended = 0usize;
93 let mut nb_dropped_no_timestamp = 0usize;
94 let mut nb_severity_out_of_range = 0usize;
95
96 for scope_logs in &resource_logs.scope_logs {
97 let scope = scope_logs.scope.as_ref();
98 let scope_name = scope.map(|s| s.name.clone()).unwrap_or_default();
100
101 for record in &scope_logs.log_records {
102 let time_nanos = if record.time_unix_nano != 0 {
104 record.time_unix_nano as i64
105 } else if record.observed_time_unix_nano != 0 {
106 record.observed_time_unix_nano as i64
107 } else {
108 nb_dropped_no_timestamp += 1;
111 continue;
112 };
113 min_time = min_time.min(time_nanos);
114 max_time = max_time.max(time_nanos);
115
116 let level = severity_number_to_level(record.severity_number);
117 if !(0..=24).contains(&record.severity_number) {
118 nb_severity_out_of_range += 1;
119 }
120
121 let msg = record
124 .body
125 .as_ref()
126 .map(any_value_to_string)
127 .unwrap_or_default();
128
129 let mut extras = scope_extras(scope, &scope_logs.schema_url);
134 if !record.trace_id.is_empty() {
139 if record.trace_id.len() == 16 {
140 let hex = hex::encode(&record.trace_id);
141 extras.push((
142 "otel.trace_id".to_string(),
143 JsonbValue::String(Cow::Owned(hex)),
144 ));
145 } else {
146 debug!(
147 "OTel log record with bad trace_id ({}b), dropping property",
148 record.trace_id.len()
149 );
150 }
151 }
152 if !record.span_id.is_empty() {
153 if record.span_id.len() == 8 {
154 let hex = hex::encode(&record.span_id);
155 extras.push((
156 "otel.span_id".to_string(),
157 JsonbValue::String(Cow::Owned(hex)),
158 ));
159 } else {
160 debug!(
161 "OTel log record with bad span_id ({}b), dropping property",
162 record.span_id.len()
163 );
164 }
165 }
166 if !record.severity_text.is_empty() {
167 extras.push((
168 "otel.severity_text".to_string(),
169 JsonbValue::String(Cow::Owned(record.severity_text.clone())),
170 ));
171 }
172 if !record.event_name.is_empty() {
173 extras.push((
174 "otel.event_name".to_string(),
175 JsonbValue::String(Cow::Owned(record.event_name.clone())),
176 ));
177 }
178
179 let props_jsonb = attrs_to_jsonb(&record.attributes, &extras);
180
181 process_ids.append_value(&process_id_str);
182 stream_ids.append_value(&stream_id_str);
183 block_ids.append_value(&block_id_str);
184 insert_times.append_value(insert_time_nanos);
185 exes.append_value(&process.exe);
186 usernames.append_value(&process.username);
187 computers.append_value(&process.computer);
188 times.append_value(time_nanos);
189 targets.append_value(&scope_name);
190 levels.append_value(level);
191 msgs.append_value(&msg);
192 properties.append_value(&props_jsonb);
193 process_properties.append_value(&**process.properties);
194
195 nb_appended += 1;
196 }
197 }
198
199 if nb_dropped_no_timestamp > 0 {
200 debug!(
201 "OTel log records without timestamp dropped (block_id={block_id_str}, count={nb_dropped_no_timestamp})"
202 );
203 }
204 if nb_severity_out_of_range > 0 {
205 debug!(
206 "OTel log records with out-of-range severity_number treated as Info (block_id={block_id_str}, count={nb_severity_out_of_range})"
207 );
208 }
209
210 if nb_appended == 0 {
211 return Ok(None);
212 }
213
214 let schema = Arc::new(crate::log_entries_table::log_table_schema());
215 let batch = RecordBatch::try_new(
216 schema,
217 vec![
218 Arc::new(process_ids.finish()),
219 Arc::new(stream_ids.finish()),
220 Arc::new(block_ids.finish()),
221 Arc::new(insert_times.finish().with_timezone_utc()),
222 Arc::new(exes.finish()),
223 Arc::new(usernames.finish()),
224 Arc::new(computers.finish()),
225 Arc::new(times.finish().with_timezone_utc()),
226 Arc::new(targets.finish()),
227 Arc::new(levels.finish()),
228 Arc::new(msgs.finish()),
229 Arc::new(properties.finish()),
230 Arc::new(process_properties.finish()),
231 ],
232 )
233 .with_context(|| "building OTel log_entries batch")?;
234
235 Ok(Some(PartitionRowSet::new(
236 TimeRange::new(
237 DateTime::from_timestamp_nanos(min_time),
238 DateTime::from_timestamp_nanos(max_time),
239 ),
240 batch,
241 )))
242 }
243}