micromegas_analytics/
async_events_table.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::PrimitiveBuilder;
7use datafusion::arrow::array::StringDictionaryBuilder;
8use datafusion::arrow::datatypes::DataType;
9use datafusion::arrow::datatypes::Field;
10use datafusion::arrow::datatypes::Int16Type;
11use datafusion::arrow::datatypes::Int64Type;
12use datafusion::arrow::datatypes::Schema;
13use datafusion::arrow::datatypes::TimeUnit;
14use datafusion::arrow::datatypes::TimestampNanosecondType;
15use datafusion::arrow::datatypes::UInt32Type;
16use datafusion::arrow::record_batch::RecordBatch;
17
18use crate::time::TimeRange;
19
20/// Represents a single async span event record.
21/// Optimized for high-frequency data - process info can be joined when needed.
22#[derive(Debug, Clone)]
23pub struct AsyncEventRecord {
24    pub stream_id: Arc<String>,
25    pub block_id: Arc<String>,
26    pub time: i64,
27    pub event_type: Arc<String>,
28    pub span_id: i64,
29    pub parent_span_id: i64,
30    pub depth: u32,
31    pub hash: u32,
32    pub name: Arc<String>,
33    pub filename: Arc<String>,
34    pub target: Arc<String>,
35    pub line: u32,
36}
37
38/// Returns the schema for the async events table.
39/// Optimized for high-frequency data - excludes process info that can be joined.
40pub fn async_events_table_schema() -> Schema {
41    Schema::new(vec![
42        Field::new(
43            "stream_id",
44            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
45            false,
46        ),
47        Field::new(
48            "block_id",
49            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
50            false,
51        ),
52        Field::new(
53            "time",
54            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
55            false,
56        ),
57        Field::new(
58            "event_type",
59            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
60            false,
61        ),
62        Field::new("span_id", DataType::Int64, false),
63        Field::new("parent_span_id", DataType::Int64, false),
64        Field::new("depth", DataType::UInt32, false),
65        Field::new("hash", DataType::UInt32, false),
66        Field::new(
67            "name",
68            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69            false,
70        ),
71        Field::new(
72            "filename",
73            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
74            false,
75        ),
76        Field::new(
77            "target",
78            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
79            false,
80        ),
81        Field::new("line", DataType::UInt32, false),
82    ])
83}
84
85/// A builder for creating a `RecordBatch` of async event records.
86pub struct AsyncEventRecordBuilder {
87    stream_ids: StringDictionaryBuilder<Int16Type>,
88    block_ids: StringDictionaryBuilder<Int16Type>,
89    times: PrimitiveBuilder<TimestampNanosecondType>,
90    event_types: StringDictionaryBuilder<Int16Type>,
91    span_ids: PrimitiveBuilder<Int64Type>,
92    parent_span_ids: PrimitiveBuilder<Int64Type>,
93    depths: PrimitiveBuilder<UInt32Type>,
94    hashes: PrimitiveBuilder<UInt32Type>,
95    names: StringDictionaryBuilder<Int16Type>,
96    filenames: StringDictionaryBuilder<Int16Type>,
97    targets: StringDictionaryBuilder<Int16Type>,
98    lines: PrimitiveBuilder<UInt32Type>,
99}
100
101impl AsyncEventRecordBuilder {
102    pub fn with_capacity(capacity: usize) -> Self {
103        Self {
104            stream_ids: StringDictionaryBuilder::new(),
105            block_ids: StringDictionaryBuilder::new(),
106            times: PrimitiveBuilder::with_capacity(capacity),
107            event_types: StringDictionaryBuilder::new(),
108            span_ids: PrimitiveBuilder::with_capacity(capacity),
109            parent_span_ids: PrimitiveBuilder::with_capacity(capacity),
110            depths: PrimitiveBuilder::with_capacity(capacity),
111            hashes: PrimitiveBuilder::with_capacity(capacity),
112            names: StringDictionaryBuilder::new(),
113            filenames: StringDictionaryBuilder::new(),
114            targets: StringDictionaryBuilder::new(),
115            lines: PrimitiveBuilder::with_capacity(capacity),
116        }
117    }
118
119    pub fn get_time_range(&self) -> Option<TimeRange> {
120        if self.is_empty() {
121            return None;
122        }
123        // assuming that the events are in order
124        let slice = self.times.values_slice();
125        Some(TimeRange::new(
126            DateTime::from_timestamp_nanos(slice[0]),
127            DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
128        ))
129    }
130
131    pub fn len(&self) -> i64 {
132        self.times.len() as i64
133    }
134
135    pub fn is_empty(&self) -> bool {
136        self.times.len() == 0
137    }
138
139    pub fn append(&mut self, record: &AsyncEventRecord) -> Result<()> {
140        self.stream_ids.append_value(&*record.stream_id);
141        self.block_ids.append_value(&*record.block_id);
142        self.times.append_value(record.time);
143        self.event_types.append_value(&*record.event_type);
144        self.span_ids.append_value(record.span_id);
145        self.parent_span_ids.append_value(record.parent_span_id);
146        self.depths.append_value(record.depth);
147        self.hashes.append_value(record.hash);
148        self.names.append_value(&*record.name);
149        self.filenames.append_value(&*record.filename);
150        self.targets.append_value(&*record.target);
151        self.lines.append_value(record.line);
152        Ok(())
153    }
154
155    pub fn finish(mut self) -> Result<RecordBatch> {
156        RecordBatch::try_new(
157            Arc::new(async_events_table_schema()),
158            vec![
159                Arc::new(self.stream_ids.finish()),
160                Arc::new(self.block_ids.finish()),
161                Arc::new(self.times.finish().with_timezone_utc()),
162                Arc::new(self.event_types.finish()),
163                Arc::new(self.span_ids.finish()),
164                Arc::new(self.parent_span_ids.finish()),
165                Arc::new(self.depths.finish()),
166                Arc::new(self.hashes.finish()),
167                Arc::new(self.names.finish()),
168                Arc::new(self.filenames.finish()),
169                Arc::new(self.targets.finish()),
170                Arc::new(self.lines.finish()),
171            ],
172        )
173        .with_context(|| "building record batch")
174    }
175}