micromegas_analytics/
call_tree.rs1use crate::metadata::{StreamMetadata, get_thread_name_from_stream_metadata};
2use crate::scope::ScopeDesc;
3use crate::scope::ScopeHashMap;
4use crate::thread_block_processor::ThreadBlockProcessor;
5use crate::thread_block_processor::parse_thread_block;
6use crate::time::ConvertTicks;
7use anyhow::Result;
8use micromegas_telemetry::blob_storage::BlobStorage;
9use micromegas_telemetry::types::block::BlockMetadata;
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub struct CallTreeNode {
16 pub id: Option<i64>,
18 pub hash: u32,
20 pub begin: i64, pub end: i64,
24 pub children: Vec<CallTreeNode>,
26}
27
28#[derive(Debug)]
30pub struct CallTree {
31 pub scopes: ScopeHashMap,
33 pub call_tree_root: Option<CallTreeNode>,
36}
37
38pub struct CallTreeBuilder {
40 begin_range_ns: i64,
41 end_range_ns: i64,
42 limit: Option<i64>,
43 nb_spans: i64,
44 stack: Vec<CallTreeNode>,
45 scopes: ScopeHashMap,
46 convert_ticks: ConvertTicks,
47 root_hash: u32,
48}
49
50impl CallTreeBuilder {
51 pub fn new(
52 begin_range_ns: i64,
53 end_range_ns: i64,
54 limit: Option<i64>,
55 convert_ticks: ConvertTicks,
56 thread_name: String,
57 ) -> Self {
58 let thread_scope_desc = ScopeDesc::new(
59 Arc::new(thread_name),
60 Arc::new("".to_owned()),
61 Arc::new("".to_owned()),
62 0,
63 );
64 let mut scopes = ScopeHashMap::new();
65 let root_hash = thread_scope_desc.hash;
66 scopes.insert(root_hash, thread_scope_desc);
67 Self {
68 begin_range_ns,
69 end_range_ns,
70 limit,
71 nb_spans: 0,
72 stack: Vec::new(),
73 scopes,
74 convert_ticks,
75 root_hash,
76 }
77 }
78
79 #[span_fn]
80 pub fn finish(mut self) -> CallTree {
81 if self.stack.is_empty() {
82 return CallTree {
83 scopes: self.scopes,
84 call_tree_root: None,
85 };
86 }
87 while self.stack.len() > 1 {
88 let top = self.stack.pop().unwrap();
89 let last_index = self.stack.len() - 1;
90 let parent = &mut self.stack[last_index];
91 parent.children.push(top);
92 }
93 assert_eq!(1, self.stack.len());
94 CallTree {
95 scopes: self.scopes,
96 call_tree_root: self.stack.pop(),
97 }
98 }
99
100 fn add_child_to_top(&mut self, node: CallTreeNode) {
101 if let Some(mut top) = self.stack.pop() {
102 top.children.push(node);
103 self.stack.push(top);
104 } else {
105 let new_root = CallTreeNode {
106 id: None,
107 hash: self.root_hash,
108 begin: self.begin_range_ns,
109 end: self.end_range_ns,
110 children: vec![node],
111 };
112 self.stack.push(new_root);
113 self.nb_spans += 1;
114 }
115 }
116
117 fn record_scope_desc(&mut self, scope_desc: ScopeDesc) {
118 self.scopes
119 .entry(scope_desc.hash)
120 .or_insert_with(|| scope_desc);
121 }
122}
123
124impl ThreadBlockProcessor for CallTreeBuilder {
125 fn on_begin_thread_scope(
126 &mut self,
127 _block_id: &str,
128 event_id: i64,
129 scope: ScopeDesc,
130 ts: i64,
131 ) -> Result<bool> {
132 if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
133 return Ok(false);
134 }
135 let time = self.convert_ticks.ticks_to_nanoseconds(ts);
136 if time < self.begin_range_ns {
137 return Ok(true);
138 }
139 if time > self.end_range_ns {
140 return Ok(false);
141 }
142 let hash = scope.hash;
143 self.record_scope_desc(scope);
144 let node = CallTreeNode {
145 id: Some(event_id),
146 hash,
147 begin: time,
148 end: self.end_range_ns,
149 children: Vec::new(),
150 };
151 self.stack.push(node);
152 self.nb_spans += 1;
153 Ok(true) }
155
156 fn on_end_thread_scope(
157 &mut self,
158 _block_id: &str,
159 event_id: i64,
160 scope: ScopeDesc,
161 ts: i64,
162 ) -> Result<bool> {
163 let time = self.convert_ticks.ticks_to_nanoseconds(ts);
164 if time < self.begin_range_ns {
165 return Ok(true);
166 }
167 if time > self.end_range_ns {
168 return Ok(false);
169 }
170 let hash = scope.hash;
171 self.record_scope_desc(scope);
172 if let Some(mut old_top) = self.stack.pop() {
173 if old_top.hash == hash {
174 old_top.end = time;
175 self.add_child_to_top(old_top);
176 } else if old_top.hash == self.root_hash {
177 old_top.id = Some(event_id);
178 old_top.hash = hash;
179 old_top.end = time;
180 self.add_child_to_top(old_top);
181 } else {
182 anyhow::bail!("top scope mismatch parsing thread block");
183 }
184 } else {
185 if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
186 return Ok(false);
187 }
188 let node = CallTreeNode {
189 id: Some(event_id),
190 hash,
191 begin: self.begin_range_ns,
192 end: time,
193 children: Vec::new(),
194 };
195 self.add_child_to_top(node);
196 }
197 Ok(true)
198 }
199}
200
201#[allow(clippy::cast_precision_loss)]
203#[span_fn]
204pub async fn make_call_tree(
205 blocks: &[BlockMetadata],
206 begin_range_ns: i64,
207 end_range_ns: i64,
208 limit: Option<i64>,
209 blob_storage: Arc<BlobStorage>,
210 convert_ticks: ConvertTicks,
211 stream: &StreamMetadata,
212) -> Result<CallTree> {
213 let mut builder = CallTreeBuilder::new(
214 begin_range_ns,
215 end_range_ns,
216 limit,
217 convert_ticks,
218 get_thread_name_from_stream_metadata(stream)?,
219 );
220 for block in blocks {
221 parse_thread_block(
222 blob_storage.clone(),
223 stream,
224 block.block_id,
225 block.object_offset,
226 &mut builder,
227 )
228 .await?;
229 }
230 Ok(builder.finish())
231}