micromegas_analytics/
span_table.rs

1use std::sync::Arc;
2
3use crate::call_tree::CallTree;
4use crate::call_tree::CallTreeNode;
5use anyhow::{Context, Result};
6use datafusion::arrow::array::ArrayBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringDictionaryBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int16Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::Schema;
14use datafusion::arrow::datatypes::TimeUnit;
15use datafusion::arrow::datatypes::TimestampNanosecondType;
16use datafusion::arrow::datatypes::UInt32Type;
17use datafusion::arrow::record_batch::RecordBatch;
18
19/// A single span in a call tree.
20#[derive(Debug)]
21pub struct SpanRow {
22    pub id: i64,
23    pub parent: i64,
24    pub depth: u32,
25    pub begin: i64,
26    pub end: i64,
27    pub hash: u32,
28    pub name: Arc<String>,
29    pub target: Arc<String>,
30    pub filename: Arc<String>,
31    pub line: u32,
32}
33
34/// A builder for creating a `RecordBatch` of spans.
35pub struct SpanRecordBuilder {
36    pub ids: PrimitiveBuilder<Int64Type>,
37    pub parents: PrimitiveBuilder<Int64Type>,
38    pub depths: PrimitiveBuilder<UInt32Type>,
39    pub hashes: PrimitiveBuilder<UInt32Type>,
40    pub begins: PrimitiveBuilder<TimestampNanosecondType>,
41    pub ends: PrimitiveBuilder<TimestampNanosecondType>,
42    pub durations: PrimitiveBuilder<Int64Type>,
43    pub names: StringDictionaryBuilder<Int16Type>,
44    pub targets: StringDictionaryBuilder<Int16Type>,
45    pub filenames: StringDictionaryBuilder<Int16Type>,
46    pub lines: PrimitiveBuilder<UInt32Type>,
47}
48
49/// Returns the schema for the spans table.
50pub fn get_spans_schema() -> Schema {
51    Schema::new(vec![
52        Field::new("id", DataType::Int64, false),
53        Field::new("parent", DataType::Int64, false),
54        Field::new("depth", DataType::UInt32, false),
55        Field::new("hash", DataType::UInt32, false),
56        Field::new(
57            "begin",
58            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59            false,
60        ),
61        Field::new(
62            "end",
63            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
64            false,
65        ),
66        Field::new("duration", DataType::Int64, false), //DataType::Duration not supported by parquet
67        Field::new(
68            "name",
69            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
70            false,
71        ),
72        Field::new(
73            "target",
74            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
75            false,
76        ),
77        Field::new(
78            "filename",
79            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
80            false,
81        ),
82        Field::new("line", DataType::UInt32, false),
83    ])
84}
85
86impl SpanRecordBuilder {
87    pub fn with_capacity(capacity: usize) -> Self {
88        Self {
89            ids: PrimitiveBuilder::with_capacity(capacity),
90            parents: PrimitiveBuilder::with_capacity(capacity),
91            depths: PrimitiveBuilder::with_capacity(capacity),
92            hashes: PrimitiveBuilder::with_capacity(capacity),
93            begins: PrimitiveBuilder::with_capacity(capacity),
94            ends: PrimitiveBuilder::with_capacity(capacity),
95            durations: PrimitiveBuilder::with_capacity(capacity),
96            names: StringDictionaryBuilder::new(), //we could estimate the number of different names and their size
97            targets: StringDictionaryBuilder::new(),
98            filenames: StringDictionaryBuilder::new(),
99            lines: PrimitiveBuilder::with_capacity(capacity),
100        }
101    }
102
103    pub fn len(&self) -> i64 {
104        self.ids.len() as i64
105    }
106
107    pub fn is_empty(&self) -> bool {
108        self.ids.len() == 0
109    }
110
111    pub fn append(&mut self, row: SpanRow) -> Result<()> {
112        self.ids.append_value(row.id);
113        self.parents.append_value(row.parent);
114        self.depths.append_value(row.depth);
115        self.hashes.append_value(row.hash);
116        self.begins.append_value(row.begin);
117        self.ends.append_value(row.end);
118        self.durations.append_value(row.end - row.begin);
119        self.names.append_value(&*row.name);
120        self.targets.append_value(&*row.target);
121        self.filenames.append_value(&*row.filename);
122        self.lines.append_value(row.line);
123        Ok(())
124    }
125
126    pub fn append_call_tree(&mut self, tree: &CallTree) -> Result<()> {
127        if let Some(root) = &tree.call_tree_root {
128            for_each_node_in_tree(root, 0, 0, &mut |node, parent, depth| {
129                let scope_desc = tree
130                    .scopes
131                    .get(&node.hash)
132                    .with_context(|| "fetching scope_desc from hash")?;
133                self.append(SpanRow {
134                    id: node.id.unwrap_or(-1),
135                    parent,
136                    depth,
137                    begin: node.begin,
138                    end: node.end,
139                    hash: node.hash,
140                    name: scope_desc.name.clone(),
141                    target: scope_desc.target.clone(),
142                    filename: scope_desc.filename.clone(),
143                    line: scope_desc.line,
144                })
145            })?;
146        }
147        Ok(())
148    }
149
150    pub fn finish(mut self) -> Result<RecordBatch> {
151        let schema = get_spans_schema();
152        RecordBatch::try_new(
153            Arc::new(schema),
154            vec![
155                Arc::new(self.ids.finish()),
156                Arc::new(self.parents.finish()),
157                Arc::new(self.depths.finish()),
158                Arc::new(self.hashes.finish()),
159                Arc::new(self.begins.finish().with_timezone_utc()),
160                Arc::new(self.ends.finish().with_timezone_utc()),
161                Arc::new(self.durations.finish()),
162                Arc::new(self.names.finish()),
163                Arc::new(self.targets.finish()),
164                Arc::new(self.filenames.finish()),
165                Arc::new(self.lines.finish()),
166            ],
167        )
168        .with_context(|| "building record batch")
169    }
170}
171
172fn for_each_node_in_tree<NodeFun>(
173    node: &CallTreeNode,
174    parent: i64,
175    depth: u32,
176    process_node: &mut NodeFun,
177) -> Result<()>
178where
179    NodeFun: FnMut(&CallTreeNode, i64, u32) -> Result<()>,
180{
181    process_node(node, parent, depth)?;
182    let span_id = node.id.unwrap_or(-1);
183    for child in &node.children {
184        for_each_node_in_tree(child, span_id, depth + 1, process_node)?;
185    }
186    Ok(())
187}