micromegas_analytics/
net_span_tree.rs

1use crate::metadata::StreamMetadata;
2use crate::net_block_processing::{NetBlockProcessor, parse_net_block};
3use crate::net_spans_table::{NetSpanRecord, NetSpanRecordBuilder};
4use crate::time::ConvertTicks;
5use anyhow::Result;
6use micromegas_telemetry::blob_storage::BlobStorage;
7use micromegas_telemetry::types::block::BlockMetadata;
8use micromegas_tracing::prelude::*;
9use std::sync::Arc;
10
11lazy_static::lazy_static! {
12    static ref KIND_CONNECTION: Arc<String> = Arc::new(String::from("connection"));
13    static ref KIND_OBJECT: Arc<String> = Arc::new(String::from("object"));
14    static ref KIND_PROPERTY: Arc<String> = Arc::new(String::from("property"));
15    static ref KIND_RPC: Arc<String> = Arc::new(String::from("rpc"));
16    static ref EMPTY_NAME: Arc<String> = Arc::new(String::new());
17}
18
19/// Sentinel `parent_span_id` used for Connection roots and for any span whose
20/// parent is missing. Set to `-1` so it can never collide with a real `span_id`
21/// (which is the per-event `event_id`, a non-negative offset within the stream).
22pub const ROOT_PARENT_SPAN_ID: i64 = -1;
23
24#[derive(Debug)]
25struct OpenSpan {
26    span_id: i64,
27    parent_span_id: i64,
28    depth: u32,
29    kind: Arc<String>,
30    name: Arc<String>,
31    connection_name: Arc<String>,
32    is_outgoing: bool,
33    begin_time_ns: i64,
34    /// Cumulative bits consumed by already-closed children of this open span.
35    child_bits_consumed: i64,
36}
37
38/// Builds `net_spans` rows from a stream of net events.
39///
40/// The builder owns an open-span stack that persists across `parse_net_block` calls;
41/// callers drive multiple blocks through the same builder to stitch spans across
42/// block boundaries within a contiguous block group.
43pub struct NetSpanTreeBuilder<'a> {
44    record_builder: &'a mut NetSpanRecordBuilder,
45    stack: Vec<OpenSpan>,
46    process_id: Arc<String>,
47    stream_id: Arc<String>,
48    convert_ticks: ConvertTicks,
49}
50
51impl<'a> NetSpanTreeBuilder<'a> {
52    pub fn new(
53        record_builder: &'a mut NetSpanRecordBuilder,
54        process_id: Arc<String>,
55        stream_id: Arc<String>,
56        convert_ticks: ConvertTicks,
57    ) -> Self {
58        Self {
59            record_builder,
60            stack: Vec::new(),
61            process_id,
62            stream_id,
63            convert_ticks,
64        }
65    }
66
67    /// Returns the connection context (name + direction) inherited from the stack root.
68    /// The stack root is the Connection span; descendants inherit its connection info.
69    fn connection_context(&self) -> (Arc<String>, bool) {
70        if let Some(root) = self.stack.first() {
71            (root.connection_name.clone(), root.is_outgoing)
72        } else {
73            (EMPTY_NAME.clone(), false)
74        }
75    }
76
77    fn parent_of_new_child(&self) -> (i64, u32, i64) {
78        if let Some(top) = self.stack.last() {
79            (top.span_id, top.depth + 1, top.child_bits_consumed)
80        } else {
81            // No parent on stack — use root sentinel and zero offsets.
82            (ROOT_PARENT_SPAN_ID, 0, 0)
83        }
84    }
85
86    /// Ends an open span (Connection / Object / RPC) at `event_time_ns` with `bit_size`.
87    /// Pops the matching stack top and emits a row. Silently skips if the stack is empty,
88    /// or if the top span's kind does not match `expected_kind` — popping a mismatched
89    /// span would emit a row with the wrong `bit_size` and strand the real parent.
90    fn close_span(
91        &mut self,
92        expected_kind: &Arc<String>,
93        event_time_ns: i64,
94        bit_size: i64,
95    ) -> Result<bool> {
96        match self.stack.last() {
97            None => {
98                // "End with no matching Begin" — bit attribution is unrecoverable; skip.
99                debug!(
100                    "net span end event with no matching begin (expected kind={})",
101                    expected_kind
102                );
103                return Ok(true);
104            }
105            Some(top) if !Arc::ptr_eq(&top.kind, expected_kind) && *top.kind != **expected_kind => {
106                // Stack top is a different kind than the End event expects. Popping would
107                // emit a row with the wrong bit_size and leave the real parent open.
108                // Skip instead — the matching End (if any) will close it correctly.
109                debug!(
110                    "net span stack mismatch: expected {}, got {}; skipping end event",
111                    expected_kind, top.kind
112                );
113                return Ok(true);
114            }
115            Some(_) => {}
116        }
117        let open = self.stack.pop().expect("peeked above");
118        let begin_bits = if let Some(parent) = self.stack.last() {
119            parent.child_bits_consumed
120        } else {
121            0
122        };
123        let end_bits = begin_bits + bit_size;
124        let connection_name = if self.stack.is_empty() {
125            open.connection_name.clone()
126        } else {
127            self.stack[0].connection_name.clone()
128        };
129        let is_outgoing = if self.stack.is_empty() {
130            open.is_outgoing
131        } else {
132            self.stack[0].is_outgoing
133        };
134        let record = NetSpanRecord {
135            process_id: self.process_id.clone(),
136            stream_id: self.stream_id.clone(),
137            span_id: open.span_id,
138            parent_span_id: open.parent_span_id,
139            depth: open.depth,
140            kind: open.kind.clone(),
141            name: open.name.clone(),
142            connection_name,
143            is_outgoing,
144            begin_bits,
145            end_bits,
146            bit_size,
147            begin_time: open.begin_time_ns,
148            end_time: event_time_ns,
149        };
150        self.record_builder.append(&record)?;
151        if let Some(parent) = self.stack.last_mut() {
152            parent.child_bits_consumed += bit_size;
153        }
154        Ok(true)
155    }
156
157    /// Discards any open spans without emitting synthetic rows. Bit attribution
158    /// for unclosed spans is unrecoverable; this is logged at debug level.
159    pub fn finish(self) {
160        if !self.stack.is_empty() {
161            debug!(
162                "net span tree finishing with {} unclosed span(s); dropping",
163                self.stack.len()
164            );
165        }
166    }
167}
168
169impl<'a> NetBlockProcessor for NetSpanTreeBuilder<'a> {
170    fn on_connection_begin(
171        &mut self,
172        event_id: i64,
173        time: i64,
174        connection_name: Arc<String>,
175        is_outgoing: bool,
176    ) -> Result<bool> {
177        let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
178        self.stack.push(OpenSpan {
179            span_id: event_id,
180            parent_span_id: ROOT_PARENT_SPAN_ID,
181            depth: 0,
182            kind: KIND_CONNECTION.clone(),
183            name: connection_name.clone(),
184            connection_name,
185            is_outgoing,
186            begin_time_ns,
187            child_bits_consumed: 0,
188        });
189        Ok(true)
190    }
191
192    fn on_connection_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
193        let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
194        self.close_span(&KIND_CONNECTION, end_time_ns, bit_size)
195    }
196
197    fn on_object_begin(
198        &mut self,
199        event_id: i64,
200        time: i64,
201        object_name: Arc<String>,
202    ) -> Result<bool> {
203        let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
204        let (parent_span_id, depth, _) = self.parent_of_new_child();
205        let (connection_name, is_outgoing) = self.connection_context();
206        self.stack.push(OpenSpan {
207            span_id: event_id,
208            parent_span_id,
209            depth,
210            kind: KIND_OBJECT.clone(),
211            name: object_name,
212            connection_name,
213            is_outgoing,
214            begin_time_ns,
215            child_bits_consumed: 0,
216        });
217        Ok(true)
218    }
219
220    fn on_object_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
221        let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
222        self.close_span(&KIND_OBJECT, end_time_ns, bit_size)
223    }
224
225    fn on_property(
226        &mut self,
227        event_id: i64,
228        time: i64,
229        property_name: Arc<String>,
230        bit_size: i64,
231    ) -> Result<bool> {
232        let event_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
233        let (parent_span_id, depth, begin_bits) = self.parent_of_new_child();
234        let end_bits = begin_bits + bit_size;
235        let (connection_name, is_outgoing) = self.connection_context();
236        let record = NetSpanRecord {
237            process_id: self.process_id.clone(),
238            stream_id: self.stream_id.clone(),
239            span_id: event_id,
240            parent_span_id,
241            depth,
242            kind: KIND_PROPERTY.clone(),
243            name: property_name,
244            connection_name,
245            is_outgoing,
246            begin_bits,
247            end_bits,
248            bit_size,
249            begin_time: event_time_ns,
250            end_time: event_time_ns,
251        };
252        self.record_builder.append(&record)?;
253        if let Some(parent) = self.stack.last_mut() {
254            parent.child_bits_consumed += bit_size;
255        }
256        Ok(true)
257    }
258
259    fn on_rpc_begin(
260        &mut self,
261        event_id: i64,
262        time: i64,
263        function_name: Arc<String>,
264    ) -> Result<bool> {
265        let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
266        let (parent_span_id, depth, _) = self.parent_of_new_child();
267        let (connection_name, is_outgoing) = self.connection_context();
268        self.stack.push(OpenSpan {
269            span_id: event_id,
270            parent_span_id,
271            depth,
272            kind: KIND_RPC.clone(),
273            name: function_name,
274            connection_name,
275            is_outgoing,
276            begin_time_ns,
277            child_bits_consumed: 0,
278        });
279        Ok(true)
280    }
281
282    fn on_rpc_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
283        let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
284        self.close_span(&KIND_RPC, end_time_ns, bit_size)
285    }
286}
287
288/// Drives a `NetSpanTreeBuilder` across a contiguous group of net event blocks,
289/// stitching open spans across block boundaries.
290#[span_fn]
291pub async fn make_net_span_tree(
292    blocks: &[BlockMetadata],
293    record_builder: &mut NetSpanRecordBuilder,
294    blob_storage: Arc<BlobStorage>,
295    stream: &StreamMetadata,
296    process_id: Arc<String>,
297    convert_ticks: ConvertTicks,
298) -> Result<()> {
299    let stream_id = Arc::new(stream.stream_id.to_string());
300    let mut builder = NetSpanTreeBuilder::new(record_builder, process_id, stream_id, convert_ticks);
301    for block in blocks {
302        parse_net_block(
303            blob_storage.clone(),
304            stream,
305            block.block_id,
306            block.object_offset,
307            &mut builder,
308        )
309        .await?;
310    }
311    builder.finish();
312    Ok(())
313}