micromegas_analytics/
call_tree.rs

1use crate::metadata::{StreamMetadata, get_thread_name_from_stream_metadata};
2use crate::scope::ScopeDesc;
3use crate::scope::ScopeHashMap;
4use crate::thread_block_processor::ThreadBlockProcessor;
5use crate::thread_block_processor::parse_thread_block;
6use crate::time::ConvertTicks;
7use anyhow::Result;
8use micromegas_telemetry::blob_storage::BlobStorage;
9use micromegas_telemetry::types::block::BlockMetadata;
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13/// A node in a call tree, representing a single scope instance.
14#[derive(Debug)]
15pub struct CallTreeNode {
16    /// The unique identifier of the scope instance.
17    pub id: Option<i64>,
18    /// The hash of the scope description.
19    pub hash: u32,
20    /// The start time of the scope instance in nanoseconds.
21    pub begin: i64, //absolute nanoseconds
22    /// The end time of the scope instance in nanoseconds.
23    pub end: i64,
24    /// The children of this node in the call tree.
25    pub children: Vec<CallTreeNode>,
26}
27
28/// A call tree, representing the execution of a single thread.
29#[derive(Debug)]
30pub struct CallTree {
31    /// A map from scope hash to scope description.
32    pub scopes: ScopeHashMap,
33    /// The root node of the call tree.
34    // the root node corresponds to the thread and has a span equal to the query range
35    pub call_tree_root: Option<CallTreeNode>,
36}
37
38/// A builder for creating a `CallTree` from a stream of thread events.
39pub struct CallTreeBuilder {
40    begin_range_ns: i64,
41    end_range_ns: i64,
42    limit: Option<i64>,
43    nb_spans: i64,
44    stack: Vec<CallTreeNode>,
45    scopes: ScopeHashMap,
46    convert_ticks: ConvertTicks,
47    root_hash: u32,
48}
49
50impl CallTreeBuilder {
51    pub fn new(
52        begin_range_ns: i64,
53        end_range_ns: i64,
54        limit: Option<i64>,
55        convert_ticks: ConvertTicks,
56        thread_name: String,
57    ) -> Self {
58        let thread_scope_desc = ScopeDesc::new(
59            Arc::new(thread_name),
60            Arc::new("".to_owned()),
61            Arc::new("".to_owned()),
62            0,
63        );
64        let mut scopes = ScopeHashMap::new();
65        let root_hash = thread_scope_desc.hash;
66        scopes.insert(root_hash, thread_scope_desc);
67        Self {
68            begin_range_ns,
69            end_range_ns,
70            limit,
71            nb_spans: 0,
72            stack: Vec::new(),
73            scopes,
74            convert_ticks,
75            root_hash,
76        }
77    }
78
79    #[span_fn]
80    pub fn finish(mut self) -> CallTree {
81        if self.stack.is_empty() {
82            return CallTree {
83                scopes: self.scopes,
84                call_tree_root: None,
85            };
86        }
87        while self.stack.len() > 1 {
88            let top = self.stack.pop().unwrap();
89            let last_index = self.stack.len() - 1;
90            let parent = &mut self.stack[last_index];
91            parent.children.push(top);
92        }
93        assert_eq!(1, self.stack.len());
94        CallTree {
95            scopes: self.scopes,
96            call_tree_root: self.stack.pop(),
97        }
98    }
99
100    fn add_child_to_top(&mut self, node: CallTreeNode) {
101        if let Some(mut top) = self.stack.pop() {
102            top.children.push(node);
103            self.stack.push(top);
104        } else {
105            let new_root = CallTreeNode {
106                id: None,
107                hash: self.root_hash,
108                begin: self.begin_range_ns,
109                end: self.end_range_ns,
110                children: vec![node],
111            };
112            self.stack.push(new_root);
113            self.nb_spans += 1;
114        }
115    }
116
117    fn record_scope_desc(&mut self, scope_desc: ScopeDesc) {
118        self.scopes
119            .entry(scope_desc.hash)
120            .or_insert_with(|| scope_desc);
121    }
122}
123
124impl ThreadBlockProcessor for CallTreeBuilder {
125    fn on_begin_thread_scope(
126        &mut self,
127        _block_id: &str,
128        event_id: i64,
129        scope: ScopeDesc,
130        ts: i64,
131    ) -> Result<bool> {
132        if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
133            return Ok(false);
134        }
135        let time = self.convert_ticks.ticks_to_nanoseconds(ts);
136        if time < self.begin_range_ns {
137            return Ok(true);
138        }
139        if time > self.end_range_ns {
140            return Ok(false);
141        }
142        let hash = scope.hash;
143        self.record_scope_desc(scope);
144        let node = CallTreeNode {
145            id: Some(event_id),
146            hash,
147            begin: time,
148            end: self.end_range_ns,
149            children: Vec::new(),
150        };
151        self.stack.push(node);
152        self.nb_spans += 1;
153        Ok(true) // continue even if we reached the limit to allow the opportunity to close than span
154    }
155
156    fn on_end_thread_scope(
157        &mut self,
158        _block_id: &str,
159        event_id: i64,
160        scope: ScopeDesc,
161        ts: i64,
162    ) -> Result<bool> {
163        let time = self.convert_ticks.ticks_to_nanoseconds(ts);
164        if time < self.begin_range_ns {
165            return Ok(true);
166        }
167        if time > self.end_range_ns {
168            return Ok(false);
169        }
170        let hash = scope.hash;
171        self.record_scope_desc(scope);
172        if let Some(mut old_top) = self.stack.pop() {
173            if old_top.hash == hash {
174                old_top.end = time;
175                self.add_child_to_top(old_top);
176            } else if old_top.hash == self.root_hash {
177                old_top.id = Some(event_id);
178                old_top.hash = hash;
179                old_top.end = time;
180                self.add_child_to_top(old_top);
181            } else {
182                anyhow::bail!("top scope mismatch parsing thread block");
183            }
184        } else {
185            if self.limit.is_some() && self.nb_spans >= self.limit.unwrap() {
186                return Ok(false);
187            }
188            let node = CallTreeNode {
189                id: Some(event_id),
190                hash,
191                begin: self.begin_range_ns,
192                end: time,
193                children: Vec::new(),
194            };
195            self.add_child_to_top(node);
196        }
197        Ok(true)
198    }
199}
200
201/// Creates a call tree from a set of thread event blocks.
202#[allow(clippy::cast_precision_loss)]
203#[span_fn]
204pub async fn make_call_tree(
205    blocks: &[BlockMetadata],
206    begin_range_ns: i64,
207    end_range_ns: i64,
208    limit: Option<i64>,
209    blob_storage: Arc<BlobStorage>,
210    convert_ticks: ConvertTicks,
211    stream: &StreamMetadata,
212) -> Result<CallTree> {
213    let mut builder = CallTreeBuilder::new(
214        begin_range_ns,
215        end_range_ns,
216        limit,
217        convert_ticks,
218        get_thread_name_from_stream_metadata(stream)?,
219    );
220    for block in blocks {
221        parse_thread_block(
222            blob_storage.clone(),
223            stream,
224            block.block_id,
225            block.object_offset,
226            &mut builder,
227        )
228        .await?;
229    }
230    Ok(builder.finish())
231}