micromegas_analytics/
net_spans_table.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::BooleanBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringDictionaryBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int16Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::Schema;
14use datafusion::arrow::datatypes::TimeUnit;
15use datafusion::arrow::datatypes::TimestampNanosecondType;
16use datafusion::arrow::datatypes::UInt32Type;
17use datafusion::arrow::record_batch::RecordBatch;
18
19use crate::time::TimeRange;
20
21/// A single net span row (one per materialized span: Connection / Object / Property / RPC).
22#[derive(Debug, Clone)]
23pub struct NetSpanRecord {
24    pub process_id: Arc<String>,
25    pub stream_id: Arc<String>,
26    pub span_id: i64,
27    pub parent_span_id: i64,
28    pub depth: u32,
29    pub kind: Arc<String>,
30    pub name: Arc<String>,
31    pub connection_name: Arc<String>,
32    pub is_outgoing: bool,
33    pub begin_bits: i64,
34    pub end_bits: i64,
35    pub bit_size: i64,
36    pub begin_time: i64,
37    pub end_time: i64,
38}
39
40/// Returns the schema for the `net_spans` view.
41pub fn net_spans_table_schema() -> Schema {
42    Schema::new(vec![
43        Field::new(
44            "process_id",
45            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
46            false,
47        ),
48        Field::new(
49            "stream_id",
50            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
51            false,
52        ),
53        Field::new("span_id", DataType::Int64, false),
54        Field::new("parent_span_id", DataType::Int64, false),
55        Field::new("depth", DataType::UInt32, false),
56        Field::new(
57            "kind",
58            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
59            false,
60        ),
61        Field::new(
62            "name",
63            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
64            false,
65        ),
66        Field::new(
67            "connection_name",
68            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69            false,
70        ),
71        Field::new("is_outgoing", DataType::Boolean, false),
72        Field::new("begin_bits", DataType::Int64, false),
73        Field::new("end_bits", DataType::Int64, false),
74        Field::new("bit_size", DataType::Int64, false),
75        Field::new(
76            "begin_time",
77            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
78            false,
79        ),
80        Field::new(
81            "end_time",
82            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
83            false,
84        ),
85    ])
86}
87
88/// Accumulates `NetSpanRecord` rows into a single Arrow `RecordBatch`.
89pub struct NetSpanRecordBuilder {
90    process_ids: StringDictionaryBuilder<Int16Type>,
91    stream_ids: StringDictionaryBuilder<Int16Type>,
92    span_ids: PrimitiveBuilder<Int64Type>,
93    parent_span_ids: PrimitiveBuilder<Int64Type>,
94    depths: PrimitiveBuilder<UInt32Type>,
95    kinds: StringDictionaryBuilder<Int16Type>,
96    names: StringDictionaryBuilder<Int16Type>,
97    connection_names: StringDictionaryBuilder<Int16Type>,
98    is_outgoings: BooleanBuilder,
99    begin_bits: PrimitiveBuilder<Int64Type>,
100    end_bits: PrimitiveBuilder<Int64Type>,
101    bit_sizes: PrimitiveBuilder<Int64Type>,
102    begin_times: PrimitiveBuilder<TimestampNanosecondType>,
103    end_times: PrimitiveBuilder<TimestampNanosecondType>,
104    min_time: Option<i64>,
105    max_time: Option<i64>,
106}
107
108impl NetSpanRecordBuilder {
109    pub fn with_capacity(capacity: usize) -> Self {
110        Self {
111            process_ids: StringDictionaryBuilder::new(),
112            stream_ids: StringDictionaryBuilder::new(),
113            span_ids: PrimitiveBuilder::with_capacity(capacity),
114            parent_span_ids: PrimitiveBuilder::with_capacity(capacity),
115            depths: PrimitiveBuilder::with_capacity(capacity),
116            kinds: StringDictionaryBuilder::new(),
117            names: StringDictionaryBuilder::new(),
118            connection_names: StringDictionaryBuilder::new(),
119            is_outgoings: BooleanBuilder::with_capacity(capacity),
120            begin_bits: PrimitiveBuilder::with_capacity(capacity),
121            end_bits: PrimitiveBuilder::with_capacity(capacity),
122            bit_sizes: PrimitiveBuilder::with_capacity(capacity),
123            begin_times: PrimitiveBuilder::with_capacity(capacity),
124            end_times: PrimitiveBuilder::with_capacity(capacity),
125            min_time: None,
126            max_time: None,
127        }
128    }
129
130    pub fn len(&self) -> i64 {
131        self.span_ids.len() as i64
132    }
133
134    pub fn is_empty(&self) -> bool {
135        self.span_ids.len() == 0
136    }
137
138    /// Returns the time range spanned by the rows accumulated so far.
139    pub fn get_time_range(&self) -> Option<TimeRange> {
140        match (self.min_time, self.max_time) {
141            (Some(min_ns), Some(max_ns)) => Some(TimeRange::new(
142                DateTime::from_timestamp_nanos(min_ns),
143                DateTime::from_timestamp_nanos(max_ns),
144            )),
145            _ => None,
146        }
147    }
148
149    pub fn append(&mut self, record: &NetSpanRecord) -> Result<()> {
150        self.process_ids.append_value(&*record.process_id);
151        self.stream_ids.append_value(&*record.stream_id);
152        self.span_ids.append_value(record.span_id);
153        self.parent_span_ids.append_value(record.parent_span_id);
154        self.depths.append_value(record.depth);
155        self.kinds.append_value(&*record.kind);
156        self.names.append_value(&*record.name);
157        self.connection_names.append_value(&*record.connection_name);
158        self.is_outgoings.append_value(record.is_outgoing);
159        self.begin_bits.append_value(record.begin_bits);
160        self.end_bits.append_value(record.end_bits);
161        self.bit_sizes.append_value(record.bit_size);
162        self.begin_times.append_value(record.begin_time);
163        self.end_times.append_value(record.end_time);
164        self.min_time = Some(
165            self.min_time
166                .map(|m| m.min(record.begin_time))
167                .unwrap_or(record.begin_time),
168        );
169        self.max_time = Some(
170            self.max_time
171                .map(|m| m.max(record.end_time))
172                .unwrap_or(record.end_time),
173        );
174        Ok(())
175    }
176
177    pub fn finish(mut self) -> Result<RecordBatch> {
178        RecordBatch::try_new(
179            Arc::new(net_spans_table_schema()),
180            vec![
181                Arc::new(self.process_ids.finish()),
182                Arc::new(self.stream_ids.finish()),
183                Arc::new(self.span_ids.finish()),
184                Arc::new(self.parent_span_ids.finish()),
185                Arc::new(self.depths.finish()),
186                Arc::new(self.kinds.finish()),
187                Arc::new(self.names.finish()),
188                Arc::new(self.connection_names.finish()),
189                Arc::new(self.is_outgoings.finish()),
190                Arc::new(self.begin_bits.finish()),
191                Arc::new(self.end_bits.finish()),
192                Arc::new(self.bit_sizes.finish()),
193                Arc::new(self.begin_times.finish().with_timezone_utc()),
194                Arc::new(self.end_times.finish().with_timezone_utc()),
195            ],
196        )
197        .with_context(|| "building net spans record batch")
198    }
199}