1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::BooleanBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringDictionaryBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int16Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::Schema;
14use datafusion::arrow::datatypes::TimeUnit;
15use datafusion::arrow::datatypes::TimestampNanosecondType;
16use datafusion::arrow::datatypes::UInt32Type;
17use datafusion::arrow::record_batch::RecordBatch;
18
19use crate::time::TimeRange;
20
21#[derive(Debug, Clone)]
23pub struct NetSpanRecord {
24 pub process_id: Arc<String>,
25 pub stream_id: Arc<String>,
26 pub span_id: i64,
27 pub parent_span_id: i64,
28 pub depth: u32,
29 pub kind: Arc<String>,
30 pub name: Arc<String>,
31 pub connection_name: Arc<String>,
32 pub is_outgoing: bool,
33 pub begin_bits: i64,
34 pub end_bits: i64,
35 pub bit_size: i64,
36 pub begin_time: i64,
37 pub end_time: i64,
38}
39
40pub fn net_spans_table_schema() -> Schema {
42 Schema::new(vec![
43 Field::new(
44 "process_id",
45 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
46 false,
47 ),
48 Field::new(
49 "stream_id",
50 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
51 false,
52 ),
53 Field::new("span_id", DataType::Int64, false),
54 Field::new("parent_span_id", DataType::Int64, false),
55 Field::new("depth", DataType::UInt32, false),
56 Field::new(
57 "kind",
58 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
59 false,
60 ),
61 Field::new(
62 "name",
63 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
64 false,
65 ),
66 Field::new(
67 "connection_name",
68 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69 false,
70 ),
71 Field::new("is_outgoing", DataType::Boolean, false),
72 Field::new("begin_bits", DataType::Int64, false),
73 Field::new("end_bits", DataType::Int64, false),
74 Field::new("bit_size", DataType::Int64, false),
75 Field::new(
76 "begin_time",
77 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
78 false,
79 ),
80 Field::new(
81 "end_time",
82 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
83 false,
84 ),
85 ])
86}
87
88pub struct NetSpanRecordBuilder {
90 process_ids: StringDictionaryBuilder<Int16Type>,
91 stream_ids: StringDictionaryBuilder<Int16Type>,
92 span_ids: PrimitiveBuilder<Int64Type>,
93 parent_span_ids: PrimitiveBuilder<Int64Type>,
94 depths: PrimitiveBuilder<UInt32Type>,
95 kinds: StringDictionaryBuilder<Int16Type>,
96 names: StringDictionaryBuilder<Int16Type>,
97 connection_names: StringDictionaryBuilder<Int16Type>,
98 is_outgoings: BooleanBuilder,
99 begin_bits: PrimitiveBuilder<Int64Type>,
100 end_bits: PrimitiveBuilder<Int64Type>,
101 bit_sizes: PrimitiveBuilder<Int64Type>,
102 begin_times: PrimitiveBuilder<TimestampNanosecondType>,
103 end_times: PrimitiveBuilder<TimestampNanosecondType>,
104 min_time: Option<i64>,
105 max_time: Option<i64>,
106}
107
108impl NetSpanRecordBuilder {
109 pub fn with_capacity(capacity: usize) -> Self {
110 Self {
111 process_ids: StringDictionaryBuilder::new(),
112 stream_ids: StringDictionaryBuilder::new(),
113 span_ids: PrimitiveBuilder::with_capacity(capacity),
114 parent_span_ids: PrimitiveBuilder::with_capacity(capacity),
115 depths: PrimitiveBuilder::with_capacity(capacity),
116 kinds: StringDictionaryBuilder::new(),
117 names: StringDictionaryBuilder::new(),
118 connection_names: StringDictionaryBuilder::new(),
119 is_outgoings: BooleanBuilder::with_capacity(capacity),
120 begin_bits: PrimitiveBuilder::with_capacity(capacity),
121 end_bits: PrimitiveBuilder::with_capacity(capacity),
122 bit_sizes: PrimitiveBuilder::with_capacity(capacity),
123 begin_times: PrimitiveBuilder::with_capacity(capacity),
124 end_times: PrimitiveBuilder::with_capacity(capacity),
125 min_time: None,
126 max_time: None,
127 }
128 }
129
130 pub fn len(&self) -> i64 {
131 self.span_ids.len() as i64
132 }
133
134 pub fn is_empty(&self) -> bool {
135 self.span_ids.len() == 0
136 }
137
138 pub fn get_time_range(&self) -> Option<TimeRange> {
140 match (self.min_time, self.max_time) {
141 (Some(min_ns), Some(max_ns)) => Some(TimeRange::new(
142 DateTime::from_timestamp_nanos(min_ns),
143 DateTime::from_timestamp_nanos(max_ns),
144 )),
145 _ => None,
146 }
147 }
148
149 pub fn append(&mut self, record: &NetSpanRecord) -> Result<()> {
150 self.process_ids.append_value(&*record.process_id);
151 self.stream_ids.append_value(&*record.stream_id);
152 self.span_ids.append_value(record.span_id);
153 self.parent_span_ids.append_value(record.parent_span_id);
154 self.depths.append_value(record.depth);
155 self.kinds.append_value(&*record.kind);
156 self.names.append_value(&*record.name);
157 self.connection_names.append_value(&*record.connection_name);
158 self.is_outgoings.append_value(record.is_outgoing);
159 self.begin_bits.append_value(record.begin_bits);
160 self.end_bits.append_value(record.end_bits);
161 self.bit_sizes.append_value(record.bit_size);
162 self.begin_times.append_value(record.begin_time);
163 self.end_times.append_value(record.end_time);
164 self.min_time = Some(
165 self.min_time
166 .map(|m| m.min(record.begin_time))
167 .unwrap_or(record.begin_time),
168 );
169 self.max_time = Some(
170 self.max_time
171 .map(|m| m.max(record.end_time))
172 .unwrap_or(record.end_time),
173 );
174 Ok(())
175 }
176
177 pub fn finish(mut self) -> Result<RecordBatch> {
178 RecordBatch::try_new(
179 Arc::new(net_spans_table_schema()),
180 vec![
181 Arc::new(self.process_ids.finish()),
182 Arc::new(self.stream_ids.finish()),
183 Arc::new(self.span_ids.finish()),
184 Arc::new(self.parent_span_ids.finish()),
185 Arc::new(self.depths.finish()),
186 Arc::new(self.kinds.finish()),
187 Arc::new(self.names.finish()),
188 Arc::new(self.connection_names.finish()),
189 Arc::new(self.is_outgoings.finish()),
190 Arc::new(self.begin_bits.finish()),
191 Arc::new(self.end_bits.finish()),
192 Arc::new(self.bit_sizes.finish()),
193 Arc::new(self.begin_times.finish().with_timezone_utc()),
194 Arc::new(self.end_times.finish().with_timezone_utc()),
195 ],
196 )
197 .with_context(|| "building net spans record batch")
198 }
199}