micromegas_analytics/
call_tree.rs

1use crate::metadata::{StreamMetadata, get_thread_name_from_stream_metadata};
2use crate::scope::ScopeDesc;
3use crate::scope::ScopeHashMap;
4use crate::thread_block_processor::ThreadBlockProcessor;
5use crate::thread_block_processor::parse_thread_block;
6use crate::time::ConvertTicks;
7use anyhow::Result;
8use micromegas_telemetry::blob_storage::BlobStorage;
9use micromegas_telemetry::types::block::BlockMetadata;
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13/// A node in a call tree, representing a single scope instance.
14#[derive(Debug)]
15pub struct CallTreeNode {
16    /// The unique identifier of the scope instance.
17    pub id: Option<i64>,
18    /// The hash of the scope description.
19    pub hash: u32,
20    /// The start time of the scope instance in nanoseconds.
21    pub begin: i64, //absolute nanoseconds
22    /// The end time of the scope instance in nanoseconds.
23    pub end: i64,
24    /// The children of this node in the call tree.
25    pub children: Vec<CallTreeNode>,
26}
27
28/// A call tree, representing the execution of a single thread.
29#[derive(Debug)]
30pub struct CallTree {
31    /// A map from scope hash to scope description.
32    pub scopes: ScopeHashMap,
33    /// The root node of the call tree.
34    // the root node corresponds to the thread and has a span equal to the query range
35    pub call_tree_root: Option<CallTreeNode>,
36}
37
38/// A builder for creating a `CallTree` from a stream of thread events.
39pub struct CallTreeBuilder {
40    begin_range_ns: i64,
41    end_range_ns: i64,
42    limit: Option<i64>,
43    nb_spans: i64,
44    stack: Vec<CallTreeNode>,
45    scopes: ScopeHashMap,
46    convert_ticks: ConvertTicks,
47    root_hash: u32,
48}
49
50impl CallTreeBuilder {
51    pub fn new(
52        begin_range_ns: i64,
53        end_range_ns: i64,
54        limit: Option<i64>,
55        convert_ticks: ConvertTicks,
56        thread_name: String,
57    ) -> Self {
58        let thread_scope_desc = ScopeDesc::new(
59            Arc::new(thread_name),
60            Arc::new("".to_owned()),
61            Arc::new("".to_owned()),
62            0,
63        );
64        let mut scopes = ScopeHashMap::new();
65        let root_hash = thread_scope_desc.hash;
66        scopes.insert(root_hash, thread_scope_desc);
67        Self {
68            begin_range_ns,
69            end_range_ns,
70            limit,
71            nb_spans: 0,
72            stack: Vec::new(),
73            scopes,
74            convert_ticks,
75            root_hash,
76        }
77    }
78
79    #[span_fn]
80    pub fn finish(mut self) -> CallTree {
81        if self.stack.is_empty() {
82            return CallTree {
83                scopes: self.scopes,
84                call_tree_root: None,
85            };
86        }
87        while self.stack.len() > 1 {
88            let top = self.stack.pop().unwrap();
89            let last_index = self.stack.len() - 1;
90            let parent = &mut self.stack[last_index];
91            parent.children.push(top);
92        }
93        assert_eq!(1, self.stack.len());
94        CallTree {
95            scopes: self.scopes,
96            call_tree_root: self.stack.pop(),
97        }
98    }
99
100    fn add_child_to_top(&mut self, node: CallTreeNode) {
101        if let Some(mut top) = self.stack.pop() {
102            top.children.push(node);
103            self.stack.push(top);
104        } else {
105            let new_root = CallTreeNode {
106                id: None,
107                hash: self.root_hash,
108                begin: self.begin_range_ns,
109                end: self.end_range_ns,
110                children: vec![node],
111            };
112            self.stack.push(new_root);
113            self.nb_spans += 1;
114        }
115    }
116
117    fn record_scope_desc(&mut self, scope_desc: ScopeDesc) {
118        self.scopes
119            .entry(scope_desc.hash)
120            .or_insert_with(|| scope_desc);
121    }
122}
123
124impl ThreadBlockProcessor for CallTreeBuilder {
125    fn on_begin_thread_scope(
126        &mut self,
127        _block_id: &str,
128        event_id: i64,
129        scope: ScopeDesc,
130        ts: i64,
131    ) -> Result<bool> {
132        if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
133            return Ok(false);
134        }
135        let time = self.convert_ticks.ticks_to_nanoseconds(ts);
136        if time < self.begin_range_ns {
137            return Ok(true);
138        }
139        if time > self.end_range_ns {
140            return Ok(false);
141        }
142        let hash = scope.hash;
143        self.record_scope_desc(scope);
144        let node = CallTreeNode {
145            id: Some(event_id),
146            hash,
147            begin: time,
148            end: self.end_range_ns,
149            children: Vec::new(),
150        };
151        self.stack.push(node);
152        self.nb_spans += 1;
153        Ok(true) // continue even if we reached the limit to allow the opportunity to close than span
154    }
155
156    fn on_end_thread_scope(
157        &mut self,
158        _block_id: &str,
159        event_id: i64,
160        scope: ScopeDesc,
161        ts: i64,
162    ) -> Result<bool> {
163        let time = self.convert_ticks.ticks_to_nanoseconds(ts);
164        if time < self.begin_range_ns {
165            return Ok(true);
166        }
167        if time > self.end_range_ns {
168            return Ok(false);
169        }
170        let hash = scope.hash;
171        self.record_scope_desc(scope);
172        if let Some(mut old_top) = self.stack.pop() {
173            if old_top.hash == hash {
174                old_top.end = time;
175                self.add_child_to_top(old_top);
176            } else if old_top.hash == self.root_hash {
177                old_top.id = Some(event_id);
178                old_top.hash = hash;
179                old_top.end = time;
180                self.add_child_to_top(old_top);
181            } else {
182                let ending_name = self.scopes.get(&hash).map_or("?", |s| s.name.as_str());
183                let open_name = self
184                    .scopes
185                    .get(&old_top.hash)
186                    .map_or("?", |s| s.name.as_str());
187                anyhow::bail!(
188                    "top scope mismatch in block {_block_id}: closing '{ending_name}' but '{open_name}' is open"
189                );
190            }
191        } else {
192            if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
193                return Ok(false);
194            }
195            let node = CallTreeNode {
196                id: Some(event_id),
197                hash,
198                begin: self.begin_range_ns,
199                end: time,
200                children: Vec::new(),
201            };
202            self.add_child_to_top(node);
203        }
204        Ok(true)
205    }
206}
207
208/// Creates a call tree from a set of thread event blocks.
209#[allow(clippy::cast_precision_loss)]
210#[span_fn]
211pub async fn make_call_tree(
212    blocks: &[BlockMetadata],
213    begin_range_ns: i64,
214    end_range_ns: i64,
215    limit: Option<i64>,
216    blob_storage: Arc<BlobStorage>,
217    convert_ticks: ConvertTicks,
218    stream: &StreamMetadata,
219) -> Result<CallTree> {
220    let mut builder = CallTreeBuilder::new(
221        begin_range_ns,
222        end_range_ns,
223        limit,
224        convert_ticks,
225        get_thread_name_from_stream_metadata(stream)?,
226    );
227    for block in blocks {
228        parse_thread_block(
229            blob_storage.clone(),
230            stream,
231            block.block_id,
232            block.object_offset,
233            &mut builder,
234        )
235        .await?;
236    }
237    Ok(builder.finish())
238}