1use std::sync::Arc;
2
3use crate::call_tree::CallTree;
4use crate::call_tree::CallTreeNode;
5use anyhow::{Context, Result};
6use datafusion::arrow::array::ArrayBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringDictionaryBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int16Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::Schema;
14use datafusion::arrow::datatypes::TimeUnit;
15use datafusion::arrow::datatypes::TimestampNanosecondType;
16use datafusion::arrow::datatypes::UInt32Type;
17use datafusion::arrow::record_batch::RecordBatch;
18
19#[derive(Debug)]
21pub struct SpanRow {
22 pub id: i64,
23 pub parent: i64,
24 pub depth: u32,
25 pub begin: i64,
26 pub end: i64,
27 pub hash: u32,
28 pub name: Arc<String>,
29 pub target: Arc<String>,
30 pub filename: Arc<String>,
31 pub line: u32,
32}
33
34pub struct SpanRecordBuilder {
36 pub ids: PrimitiveBuilder<Int64Type>,
37 pub parents: PrimitiveBuilder<Int64Type>,
38 pub depths: PrimitiveBuilder<UInt32Type>,
39 pub hashes: PrimitiveBuilder<UInt32Type>,
40 pub begins: PrimitiveBuilder<TimestampNanosecondType>,
41 pub ends: PrimitiveBuilder<TimestampNanosecondType>,
42 pub durations: PrimitiveBuilder<Int64Type>,
43 pub names: StringDictionaryBuilder<Int16Type>,
44 pub targets: StringDictionaryBuilder<Int16Type>,
45 pub filenames: StringDictionaryBuilder<Int16Type>,
46 pub lines: PrimitiveBuilder<UInt32Type>,
47}
48
49pub fn get_spans_schema() -> Schema {
51 Schema::new(vec![
52 Field::new("id", DataType::Int64, false),
53 Field::new("parent", DataType::Int64, false),
54 Field::new("depth", DataType::UInt32, false),
55 Field::new("hash", DataType::UInt32, false),
56 Field::new(
57 "begin",
58 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59 false,
60 ),
61 Field::new(
62 "end",
63 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
64 false,
65 ),
66 Field::new("duration", DataType::Int64, false), Field::new(
68 "name",
69 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
70 false,
71 ),
72 Field::new(
73 "target",
74 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
75 false,
76 ),
77 Field::new(
78 "filename",
79 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
80 false,
81 ),
82 Field::new("line", DataType::UInt32, false),
83 ])
84}
85
86impl SpanRecordBuilder {
87 pub fn with_capacity(capacity: usize) -> Self {
88 Self {
89 ids: PrimitiveBuilder::with_capacity(capacity),
90 parents: PrimitiveBuilder::with_capacity(capacity),
91 depths: PrimitiveBuilder::with_capacity(capacity),
92 hashes: PrimitiveBuilder::with_capacity(capacity),
93 begins: PrimitiveBuilder::with_capacity(capacity),
94 ends: PrimitiveBuilder::with_capacity(capacity),
95 durations: PrimitiveBuilder::with_capacity(capacity),
96 names: StringDictionaryBuilder::new(), targets: StringDictionaryBuilder::new(),
98 filenames: StringDictionaryBuilder::new(),
99 lines: PrimitiveBuilder::with_capacity(capacity),
100 }
101 }
102
103 pub fn len(&self) -> i64 {
104 self.ids.len() as i64
105 }
106
107 pub fn is_empty(&self) -> bool {
108 self.ids.len() == 0
109 }
110
111 pub fn append(&mut self, row: SpanRow) -> Result<()> {
112 self.ids.append_value(row.id);
113 self.parents.append_value(row.parent);
114 self.depths.append_value(row.depth);
115 self.hashes.append_value(row.hash);
116 self.begins.append_value(row.begin);
117 self.ends.append_value(row.end);
118 self.durations.append_value(row.end - row.begin);
119 self.names.append_value(&*row.name);
120 self.targets.append_value(&*row.target);
121 self.filenames.append_value(&*row.filename);
122 self.lines.append_value(row.line);
123 Ok(())
124 }
125
126 pub fn append_call_tree(&mut self, tree: &CallTree) -> Result<()> {
127 if let Some(root) = &tree.call_tree_root {
128 for_each_node_in_tree(root, 0, 0, &mut |node, parent, depth| {
129 let scope_desc = tree
130 .scopes
131 .get(&node.hash)
132 .with_context(|| "fetching scope_desc from hash")?;
133 self.append(SpanRow {
134 id: node.id.unwrap_or(-1),
135 parent,
136 depth,
137 begin: node.begin,
138 end: node.end,
139 hash: node.hash,
140 name: scope_desc.name.clone(),
141 target: scope_desc.target.clone(),
142 filename: scope_desc.filename.clone(),
143 line: scope_desc.line,
144 })
145 })?;
146 }
147 Ok(())
148 }
149
150 pub fn finish(mut self) -> Result<RecordBatch> {
151 let schema = get_spans_schema();
152 RecordBatch::try_new(
153 Arc::new(schema),
154 vec![
155 Arc::new(self.ids.finish()),
156 Arc::new(self.parents.finish()),
157 Arc::new(self.depths.finish()),
158 Arc::new(self.hashes.finish()),
159 Arc::new(self.begins.finish().with_timezone_utc()),
160 Arc::new(self.ends.finish().with_timezone_utc()),
161 Arc::new(self.durations.finish()),
162 Arc::new(self.names.finish()),
163 Arc::new(self.targets.finish()),
164 Arc::new(self.filenames.finish()),
165 Arc::new(self.lines.finish()),
166 ],
167 )
168 .with_context(|| "building record batch")
169 }
170}
171
172fn for_each_node_in_tree<NodeFun>(
173 node: &CallTreeNode,
174 parent: i64,
175 depth: u32,
176 process_node: &mut NodeFun,
177) -> Result<()>
178where
179 NodeFun: FnMut(&CallTreeNode, i64, u32) -> Result<()>,
180{
181 process_node(node, parent, depth)?;
182 let span_id = node.id.unwrap_or(-1);
183 for child in &node.children {
184 for_each_node_in_tree(child, span_id, depth + 1, process_node)?;
185 }
186 Ok(())
187}