micromegas_analytics/
metrics_table.rs

1use crate::{
2    measure::Measure, metadata::ProcessMetadata,
3    properties::property_set_jsonb_dictionary_builder::PropertySetJsonbDictionaryBuilder,
4    time::TimeRange,
5};
6use anyhow::{Context, Result};
7use chrono::DateTime;
8use datafusion::arrow::{
9    array::{ArrayBuilder, BinaryDictionaryBuilder, PrimitiveBuilder, StringDictionaryBuilder},
10    datatypes::{
11        DataType, Field, Float64Type, Int16Type, Int32Type, Schema, TimeUnit,
12        TimestampNanosecondType,
13    },
14    record_batch::RecordBatch,
15};
16use std::sync::Arc;
17
18/// Returns the schema for the metrics table.
19pub fn metrics_table_schema() -> Schema {
20    Schema::new(vec![
21        Field::new(
22            "process_id",
23            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
24            false,
25        ),
26        Field::new(
27            "stream_id",
28            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
29            false,
30        ),
31        Field::new(
32            "block_id",
33            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
34            false,
35        ),
36        Field::new(
37            "insert_time",
38            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
39            false,
40        ),
41        Field::new(
42            "exe",
43            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
44            false,
45        ),
46        Field::new(
47            "username",
48            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
49            false,
50        ),
51        Field::new(
52            "computer",
53            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
54            false,
55        ),
56        Field::new(
57            "time",
58            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59            false,
60        ),
61        Field::new(
62            "target",
63            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
64            false,
65        ),
66        Field::new(
67            "name",
68            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69            false,
70        ),
71        Field::new(
72            "unit",
73            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
74            false,
75        ),
76        Field::new("value", DataType::Float64, false),
77        Field::new(
78            "properties",
79            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
80            false,
81        ),
82        Field::new(
83            "process_properties",
84            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
85            false,
86        ),
87    ])
88}
89
90/// A builder for creating a `RecordBatch` of metrics.
91pub struct MetricsRecordBuilder {
92    pub process_ids: StringDictionaryBuilder<Int16Type>,
93    pub stream_ids: StringDictionaryBuilder<Int16Type>,
94    pub block_ids: StringDictionaryBuilder<Int16Type>,
95    pub insert_times: PrimitiveBuilder<TimestampNanosecondType>,
96    pub exes: StringDictionaryBuilder<Int16Type>,
97    pub usernames: StringDictionaryBuilder<Int16Type>,
98    pub computers: StringDictionaryBuilder<Int16Type>,
99    pub times: PrimitiveBuilder<TimestampNanosecondType>,
100    pub targets: StringDictionaryBuilder<Int16Type>,
101    pub names: StringDictionaryBuilder<Int16Type>,
102    pub units: StringDictionaryBuilder<Int16Type>,
103    pub values: PrimitiveBuilder<Float64Type>,
104    pub properties: PropertySetJsonbDictionaryBuilder,
105    pub process_properties: BinaryDictionaryBuilder<Int32Type>,
106}
107
108impl MetricsRecordBuilder {
109    pub fn with_capacity(capacity: usize) -> Self {
110        Self {
111            process_ids: StringDictionaryBuilder::new(),
112            stream_ids: StringDictionaryBuilder::new(),
113            block_ids: StringDictionaryBuilder::new(),
114            insert_times: PrimitiveBuilder::with_capacity(capacity),
115            exes: StringDictionaryBuilder::new(),
116            usernames: StringDictionaryBuilder::new(),
117            computers: StringDictionaryBuilder::new(),
118            times: PrimitiveBuilder::with_capacity(capacity),
119            targets: StringDictionaryBuilder::new(),
120            names: StringDictionaryBuilder::new(),
121            units: StringDictionaryBuilder::new(),
122            values: PrimitiveBuilder::with_capacity(capacity),
123            properties: PropertySetJsonbDictionaryBuilder::new(capacity),
124            process_properties: BinaryDictionaryBuilder::new(),
125        }
126    }
127
128    pub fn len(&self) -> i64 {
129        self.times.len() as i64
130    }
131
132    pub fn is_empty(&self) -> bool {
133        self.times.len() == 0
134    }
135
136    pub fn get_time_range(&self) -> Option<TimeRange> {
137        if self.is_empty() {
138            return None;
139        }
140        // assuming that the events are in order
141        let slice = self.times.values_slice();
142        Some(TimeRange::new(
143            DateTime::from_timestamp_nanos(slice[0]),
144            DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
145        ))
146    }
147
148    pub fn append(&mut self, row: &Measure) -> Result<()> {
149        self.process_ids
150            .append_value(format!("{}", row.process.process_id));
151        self.stream_ids.append_value(&*row.stream_id);
152        self.block_ids.append_value(&*row.block_id);
153        self.insert_times.append_value(row.insert_time);
154        self.exes.append_value(&row.process.exe);
155        self.usernames.append_value(&row.process.username);
156        self.computers.append_value(&row.process.computer);
157        self.times.append_value(row.time);
158        self.targets.append_value(&*row.target);
159        self.names.append_value(&*row.name);
160        self.units.append_value(&*row.unit);
161        self.values.append_value(row.value);
162        self.properties.append_property_set(&row.properties)?;
163        self.process_properties
164            .append_value(&*row.process.properties);
165        Ok(())
166    }
167
168    /// Append only per-entry variable data (optimized for batch processing)
169    pub fn append_entry_only(&mut self, row: &Measure) -> Result<()> {
170        // Only append fields that truly vary per metrics entry
171        self.times.append_value(row.time);
172        self.targets.append_value(&*row.target);
173        self.names.append_value(&*row.name);
174        self.units.append_value(&*row.unit);
175        self.values.append_value(row.value);
176        self.properties.append_property_set(&row.properties)?;
177        Ok(())
178    }
179
180    /// Batch fill all constant columns for all entries in block
181    pub fn fill_constant_columns(
182        &mut self,
183        process: &ProcessMetadata,
184        stream_id: &str,
185        block_id: &str,
186        insert_time: i64,
187        entry_count: usize,
188    ) -> Result<()> {
189        let process_id_str = format!("{}", process.process_id);
190
191        // For PrimitiveBuilder (insert_times): use append_slice for better performance
192        let insert_times_slice = vec![insert_time; entry_count];
193        self.insert_times.append_slice(&insert_times_slice);
194
195        // For BinaryDictionaryBuilder (process_properties): use append_values for same value
196        self.process_properties
197            .append_values(&**process.properties, entry_count);
198
199        // For StringDictionaryBuilder: use append_values for same values (optimal for constant data)
200        self.process_ids.append_values(&process_id_str, entry_count);
201        self.stream_ids.append_values(stream_id, entry_count);
202        self.block_ids.append_values(block_id, entry_count);
203        self.exes.append_values(&process.exe, entry_count);
204        self.usernames.append_values(&process.username, entry_count);
205        self.computers.append_values(&process.computer, entry_count);
206
207        Ok(())
208    }
209
210    pub fn finish(mut self) -> Result<RecordBatch> {
211        RecordBatch::try_new(
212            Arc::new(metrics_table_schema()),
213            vec![
214                Arc::new(self.process_ids.finish()),
215                Arc::new(self.stream_ids.finish()),
216                Arc::new(self.block_ids.finish()),
217                Arc::new(self.insert_times.finish().with_timezone_utc()),
218                Arc::new(self.exes.finish()),
219                Arc::new(self.usernames.finish()),
220                Arc::new(self.computers.finish()),
221                Arc::new(self.times.finish().with_timezone_utc()),
222                Arc::new(self.targets.finish()),
223                Arc::new(self.names.finish()),
224                Arc::new(self.units.finish()),
225                Arc::new(self.values.finish()),
226                Arc::new(self.properties.finish()?),
227                Arc::new(self.process_properties.finish()),
228            ],
229        )
230        .with_context(|| "building record batch")
231    }
232}