1use crate::metadata::StreamMetadata;
2use crate::net_block_processing::{NetBlockProcessor, parse_net_block};
3use crate::net_spans_table::{NetSpanRecord, NetSpanRecordBuilder};
4use crate::time::ConvertTicks;
5use anyhow::Result;
6use micromegas_telemetry::blob_storage::BlobStorage;
7use micromegas_telemetry::types::block::BlockMetadata;
8use micromegas_tracing::prelude::*;
9use std::sync::Arc;
10
11lazy_static::lazy_static! {
12 static ref KIND_CONNECTION: Arc<String> = Arc::new(String::from("connection"));
13 static ref KIND_OBJECT: Arc<String> = Arc::new(String::from("object"));
14 static ref KIND_PROPERTY: Arc<String> = Arc::new(String::from("property"));
15 static ref KIND_RPC: Arc<String> = Arc::new(String::from("rpc"));
16 static ref EMPTY_NAME: Arc<String> = Arc::new(String::new());
17}
18
19pub const ROOT_PARENT_SPAN_ID: i64 = -1;
23
24#[derive(Debug)]
25struct OpenSpan {
26 span_id: i64,
27 parent_span_id: i64,
28 depth: u32,
29 kind: Arc<String>,
30 name: Arc<String>,
31 connection_name: Arc<String>,
32 is_outgoing: bool,
33 begin_time_ns: i64,
34 child_bits_consumed: i64,
36}
37
38pub struct NetSpanTreeBuilder<'a> {
44 record_builder: &'a mut NetSpanRecordBuilder,
45 stack: Vec<OpenSpan>,
46 process_id: Arc<String>,
47 stream_id: Arc<String>,
48 convert_ticks: ConvertTicks,
49}
50
51impl<'a> NetSpanTreeBuilder<'a> {
52 pub fn new(
53 record_builder: &'a mut NetSpanRecordBuilder,
54 process_id: Arc<String>,
55 stream_id: Arc<String>,
56 convert_ticks: ConvertTicks,
57 ) -> Self {
58 Self {
59 record_builder,
60 stack: Vec::new(),
61 process_id,
62 stream_id,
63 convert_ticks,
64 }
65 }
66
67 fn connection_context(&self) -> (Arc<String>, bool) {
70 if let Some(root) = self.stack.first() {
71 (root.connection_name.clone(), root.is_outgoing)
72 } else {
73 (EMPTY_NAME.clone(), false)
74 }
75 }
76
77 fn parent_of_new_child(&self) -> (i64, u32, i64) {
78 if let Some(top) = self.stack.last() {
79 (top.span_id, top.depth + 1, top.child_bits_consumed)
80 } else {
81 (ROOT_PARENT_SPAN_ID, 0, 0)
83 }
84 }
85
86 fn close_span(
91 &mut self,
92 expected_kind: &Arc<String>,
93 event_time_ns: i64,
94 bit_size: i64,
95 ) -> Result<bool> {
96 match self.stack.last() {
97 None => {
98 debug!(
100 "net span end event with no matching begin (expected kind={})",
101 expected_kind
102 );
103 return Ok(true);
104 }
105 Some(top) if !Arc::ptr_eq(&top.kind, expected_kind) && *top.kind != **expected_kind => {
106 debug!(
110 "net span stack mismatch: expected {}, got {}; skipping end event",
111 expected_kind, top.kind
112 );
113 return Ok(true);
114 }
115 Some(_) => {}
116 }
117 let open = self.stack.pop().expect("peeked above");
118 let begin_bits = if let Some(parent) = self.stack.last() {
119 parent.child_bits_consumed
120 } else {
121 0
122 };
123 let end_bits = begin_bits + bit_size;
124 let connection_name = if self.stack.is_empty() {
125 open.connection_name.clone()
126 } else {
127 self.stack[0].connection_name.clone()
128 };
129 let is_outgoing = if self.stack.is_empty() {
130 open.is_outgoing
131 } else {
132 self.stack[0].is_outgoing
133 };
134 let record = NetSpanRecord {
135 process_id: self.process_id.clone(),
136 stream_id: self.stream_id.clone(),
137 span_id: open.span_id,
138 parent_span_id: open.parent_span_id,
139 depth: open.depth,
140 kind: open.kind.clone(),
141 name: open.name.clone(),
142 connection_name,
143 is_outgoing,
144 begin_bits,
145 end_bits,
146 bit_size,
147 begin_time: open.begin_time_ns,
148 end_time: event_time_ns,
149 };
150 self.record_builder.append(&record)?;
151 if let Some(parent) = self.stack.last_mut() {
152 parent.child_bits_consumed += bit_size;
153 }
154 Ok(true)
155 }
156
157 pub fn finish(self) {
160 if !self.stack.is_empty() {
161 debug!(
162 "net span tree finishing with {} unclosed span(s); dropping",
163 self.stack.len()
164 );
165 }
166 }
167}
168
169impl<'a> NetBlockProcessor for NetSpanTreeBuilder<'a> {
170 fn on_connection_begin(
171 &mut self,
172 event_id: i64,
173 time: i64,
174 connection_name: Arc<String>,
175 is_outgoing: bool,
176 ) -> Result<bool> {
177 let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
178 self.stack.push(OpenSpan {
179 span_id: event_id,
180 parent_span_id: ROOT_PARENT_SPAN_ID,
181 depth: 0,
182 kind: KIND_CONNECTION.clone(),
183 name: connection_name.clone(),
184 connection_name,
185 is_outgoing,
186 begin_time_ns,
187 child_bits_consumed: 0,
188 });
189 Ok(true)
190 }
191
192 fn on_connection_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
193 let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
194 self.close_span(&KIND_CONNECTION, end_time_ns, bit_size)
195 }
196
197 fn on_object_begin(
198 &mut self,
199 event_id: i64,
200 time: i64,
201 object_name: Arc<String>,
202 ) -> Result<bool> {
203 let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
204 let (parent_span_id, depth, _) = self.parent_of_new_child();
205 let (connection_name, is_outgoing) = self.connection_context();
206 self.stack.push(OpenSpan {
207 span_id: event_id,
208 parent_span_id,
209 depth,
210 kind: KIND_OBJECT.clone(),
211 name: object_name,
212 connection_name,
213 is_outgoing,
214 begin_time_ns,
215 child_bits_consumed: 0,
216 });
217 Ok(true)
218 }
219
220 fn on_object_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
221 let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
222 self.close_span(&KIND_OBJECT, end_time_ns, bit_size)
223 }
224
225 fn on_property(
226 &mut self,
227 event_id: i64,
228 time: i64,
229 property_name: Arc<String>,
230 bit_size: i64,
231 ) -> Result<bool> {
232 let event_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
233 let (parent_span_id, depth, begin_bits) = self.parent_of_new_child();
234 let end_bits = begin_bits + bit_size;
235 let (connection_name, is_outgoing) = self.connection_context();
236 let record = NetSpanRecord {
237 process_id: self.process_id.clone(),
238 stream_id: self.stream_id.clone(),
239 span_id: event_id,
240 parent_span_id,
241 depth,
242 kind: KIND_PROPERTY.clone(),
243 name: property_name,
244 connection_name,
245 is_outgoing,
246 begin_bits,
247 end_bits,
248 bit_size,
249 begin_time: event_time_ns,
250 end_time: event_time_ns,
251 };
252 self.record_builder.append(&record)?;
253 if let Some(parent) = self.stack.last_mut() {
254 parent.child_bits_consumed += bit_size;
255 }
256 Ok(true)
257 }
258
259 fn on_rpc_begin(
260 &mut self,
261 event_id: i64,
262 time: i64,
263 function_name: Arc<String>,
264 ) -> Result<bool> {
265 let begin_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
266 let (parent_span_id, depth, _) = self.parent_of_new_child();
267 let (connection_name, is_outgoing) = self.connection_context();
268 self.stack.push(OpenSpan {
269 span_id: event_id,
270 parent_span_id,
271 depth,
272 kind: KIND_RPC.clone(),
273 name: function_name,
274 connection_name,
275 is_outgoing,
276 begin_time_ns,
277 child_bits_consumed: 0,
278 });
279 Ok(true)
280 }
281
282 fn on_rpc_end(&mut self, _event_id: i64, time: i64, bit_size: i64) -> Result<bool> {
283 let end_time_ns = self.convert_ticks.ticks_to_nanoseconds(time);
284 self.close_span(&KIND_RPC, end_time_ns, bit_size)
285 }
286}
287
288#[span_fn]
291pub async fn make_net_span_tree(
292 blocks: &[BlockMetadata],
293 record_builder: &mut NetSpanRecordBuilder,
294 blob_storage: Arc<BlobStorage>,
295 stream: &StreamMetadata,
296 process_id: Arc<String>,
297 convert_ticks: ConvertTicks,
298) -> Result<()> {
299 let stream_id = Arc::new(stream.stream_id.to_string());
300 let mut builder = NetSpanTreeBuilder::new(record_builder, process_id, stream_id, convert_ticks);
301 for block in blocks {
302 parse_net_block(
303 blob_storage.clone(),
304 stream,
305 block.block_id,
306 block.object_offset,
307 &mut builder,
308 )
309 .await?;
310 }
311 builder.finish();
312 Ok(())
313}