1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::PrimitiveBuilder;
7use datafusion::arrow::array::StringDictionaryBuilder;
8use datafusion::arrow::datatypes::DataType;
9use datafusion::arrow::datatypes::Field;
10use datafusion::arrow::datatypes::Int16Type;
11use datafusion::arrow::datatypes::Int64Type;
12use datafusion::arrow::datatypes::Schema;
13use datafusion::arrow::datatypes::TimeUnit;
14use datafusion::arrow::datatypes::TimestampNanosecondType;
15use datafusion::arrow::datatypes::UInt32Type;
16use datafusion::arrow::record_batch::RecordBatch;
17
18use crate::time::TimeRange;
19
20#[derive(Debug, Clone)]
23pub struct AsyncEventRecord {
24 pub stream_id: Arc<String>,
25 pub block_id: Arc<String>,
26 pub time: i64,
27 pub event_type: Arc<String>,
28 pub span_id: i64,
29 pub parent_span_id: i64,
30 pub depth: u32,
31 pub hash: u32,
32 pub name: Arc<String>,
33 pub filename: Arc<String>,
34 pub target: Arc<String>,
35 pub line: u32,
36}
37
38pub fn async_events_table_schema() -> Schema {
41 Schema::new(vec![
42 Field::new(
43 "stream_id",
44 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
45 false,
46 ),
47 Field::new(
48 "block_id",
49 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
50 false,
51 ),
52 Field::new(
53 "time",
54 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
55 false,
56 ),
57 Field::new(
58 "event_type",
59 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
60 false,
61 ),
62 Field::new("span_id", DataType::Int64, false),
63 Field::new("parent_span_id", DataType::Int64, false),
64 Field::new("depth", DataType::UInt32, false),
65 Field::new("hash", DataType::UInt32, false),
66 Field::new(
67 "name",
68 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69 false,
70 ),
71 Field::new(
72 "filename",
73 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
74 false,
75 ),
76 Field::new(
77 "target",
78 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
79 false,
80 ),
81 Field::new("line", DataType::UInt32, false),
82 ])
83}
84
85pub struct AsyncEventRecordBuilder {
87 stream_ids: StringDictionaryBuilder<Int16Type>,
88 block_ids: StringDictionaryBuilder<Int16Type>,
89 times: PrimitiveBuilder<TimestampNanosecondType>,
90 event_types: StringDictionaryBuilder<Int16Type>,
91 span_ids: PrimitiveBuilder<Int64Type>,
92 parent_span_ids: PrimitiveBuilder<Int64Type>,
93 depths: PrimitiveBuilder<UInt32Type>,
94 hashes: PrimitiveBuilder<UInt32Type>,
95 names: StringDictionaryBuilder<Int16Type>,
96 filenames: StringDictionaryBuilder<Int16Type>,
97 targets: StringDictionaryBuilder<Int16Type>,
98 lines: PrimitiveBuilder<UInt32Type>,
99}
100
101impl AsyncEventRecordBuilder {
102 pub fn with_capacity(capacity: usize) -> Self {
103 Self {
104 stream_ids: StringDictionaryBuilder::new(),
105 block_ids: StringDictionaryBuilder::new(),
106 times: PrimitiveBuilder::with_capacity(capacity),
107 event_types: StringDictionaryBuilder::new(),
108 span_ids: PrimitiveBuilder::with_capacity(capacity),
109 parent_span_ids: PrimitiveBuilder::with_capacity(capacity),
110 depths: PrimitiveBuilder::with_capacity(capacity),
111 hashes: PrimitiveBuilder::with_capacity(capacity),
112 names: StringDictionaryBuilder::new(),
113 filenames: StringDictionaryBuilder::new(),
114 targets: StringDictionaryBuilder::new(),
115 lines: PrimitiveBuilder::with_capacity(capacity),
116 }
117 }
118
119 pub fn get_time_range(&self) -> Option<TimeRange> {
120 if self.is_empty() {
121 return None;
122 }
123 let slice = self.times.values_slice();
125 Some(TimeRange::new(
126 DateTime::from_timestamp_nanos(slice[0]),
127 DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
128 ))
129 }
130
131 pub fn len(&self) -> i64 {
132 self.times.len() as i64
133 }
134
135 pub fn is_empty(&self) -> bool {
136 self.times.len() == 0
137 }
138
139 pub fn append(&mut self, record: &AsyncEventRecord) -> Result<()> {
140 self.stream_ids.append_value(&*record.stream_id);
141 self.block_ids.append_value(&*record.block_id);
142 self.times.append_value(record.time);
143 self.event_types.append_value(&*record.event_type);
144 self.span_ids.append_value(record.span_id);
145 self.parent_span_ids.append_value(record.parent_span_id);
146 self.depths.append_value(record.depth);
147 self.hashes.append_value(record.hash);
148 self.names.append_value(&*record.name);
149 self.filenames.append_value(&*record.filename);
150 self.targets.append_value(&*record.target);
151 self.lines.append_value(record.line);
152 Ok(())
153 }
154
155 pub fn finish(mut self) -> Result<RecordBatch> {
156 RecordBatch::try_new(
157 Arc::new(async_events_table_schema()),
158 vec![
159 Arc::new(self.stream_ids.finish()),
160 Arc::new(self.block_ids.finish()),
161 Arc::new(self.times.finish().with_timezone_utc()),
162 Arc::new(self.event_types.finish()),
163 Arc::new(self.span_ids.finish()),
164 Arc::new(self.parent_span_ids.finish()),
165 Arc::new(self.depths.finish()),
166 Arc::new(self.hashes.finish()),
167 Arc::new(self.names.finish()),
168 Arc::new(self.filenames.finish()),
169 Arc::new(self.targets.finish()),
170 Arc::new(self.lines.finish()),
171 ],
172 )
173 .with_context(|| "building record batch")
174 }
175}