micromegas_analytics/
log_entries_table.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::BinaryDictionaryBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringBuilder;
9use datafusion::arrow::array::StringDictionaryBuilder;
10use datafusion::arrow::datatypes::DataType;
11use datafusion::arrow::datatypes::Field;
12use datafusion::arrow::datatypes::Int16Type;
13use datafusion::arrow::datatypes::Int32Type;
14use datafusion::arrow::datatypes::Schema;
15use datafusion::arrow::datatypes::TimeUnit;
16use datafusion::arrow::datatypes::TimestampNanosecondType;
17use datafusion::arrow::record_batch::RecordBatch;
18
19use crate::log_entry::LogEntry;
20use crate::metadata::ProcessMetadata;
21use crate::properties::property_set_jsonb_dictionary_builder::PropertySetJsonbDictionaryBuilder;
22use crate::time::TimeRange;
23
24/// Returns the schema for the log entries table.
25pub fn log_table_schema() -> Schema {
26    Schema::new(vec![
27        Field::new(
28            "process_id",
29            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
30            false,
31        ),
32        Field::new(
33            "stream_id",
34            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
35            false,
36        ),
37        Field::new(
38            "block_id",
39            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
40            false,
41        ),
42        Field::new(
43            "insert_time",
44            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
45            false,
46        ),
47        Field::new(
48            "exe",
49            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
50            false,
51        ),
52        Field::new(
53            "username",
54            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
55            false,
56        ),
57        Field::new(
58            "computer",
59            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
60            false,
61        ),
62        Field::new(
63            "time",
64            DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
65            false,
66        ),
67        Field::new(
68            "target",
69            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
70            false,
71        ),
72        Field::new("level", DataType::Int32, false),
73        Field::new("msg", DataType::Utf8, false),
74        Field::new(
75            "properties",
76            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
77            false,
78        ),
79        Field::new(
80            "process_properties",
81            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
82            false,
83        ),
84    ])
85}
86
87/// A builder for creating a `RecordBatch` of log entries.
88pub struct LogEntriesRecordBuilder {
89    process_ids: StringDictionaryBuilder<Int16Type>,
90    stream_ids: StringDictionaryBuilder<Int16Type>,
91    block_ids: StringDictionaryBuilder<Int16Type>,
92    insert_times: PrimitiveBuilder<TimestampNanosecondType>,
93    exes: StringDictionaryBuilder<Int16Type>,
94    usernames: StringDictionaryBuilder<Int16Type>,
95    computers: StringDictionaryBuilder<Int16Type>,
96    times: PrimitiveBuilder<TimestampNanosecondType>,
97    targets: StringDictionaryBuilder<Int16Type>,
98    levels: PrimitiveBuilder<Int32Type>,
99    msgs: StringBuilder,
100    properties: PropertySetJsonbDictionaryBuilder,
101    process_properties: BinaryDictionaryBuilder<Int32Type>,
102}
103
104impl LogEntriesRecordBuilder {
105    pub fn with_capacity(capacity: usize) -> Self {
106        Self {
107            process_ids: StringDictionaryBuilder::new(),
108            stream_ids: StringDictionaryBuilder::new(),
109            block_ids: StringDictionaryBuilder::new(),
110            insert_times: PrimitiveBuilder::with_capacity(capacity),
111            exes: StringDictionaryBuilder::new(),
112            usernames: StringDictionaryBuilder::new(),
113            computers: StringDictionaryBuilder::new(),
114            times: PrimitiveBuilder::with_capacity(capacity),
115            targets: StringDictionaryBuilder::new(),
116            levels: PrimitiveBuilder::with_capacity(capacity),
117            msgs: StringBuilder::new(),
118            properties: PropertySetJsonbDictionaryBuilder::new(capacity),
119            process_properties: BinaryDictionaryBuilder::new(),
120        }
121    }
122
123    pub fn get_time_range(&self) -> Option<TimeRange> {
124        if self.is_empty() {
125            return None;
126        }
127        // assuming that the events are in order
128        let slice = self.times.values_slice();
129        Some(TimeRange::new(
130            DateTime::from_timestamp_nanos(slice[0]),
131            DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
132        ))
133    }
134
135    pub fn len(&self) -> i64 {
136        self.times.len() as i64
137    }
138
139    pub fn is_empty(&self) -> bool {
140        self.times.len() == 0
141    }
142
143    pub fn append(&mut self, row: &LogEntry) -> Result<()> {
144        self.process_ids
145            .append_value(format!("{}", row.process.process_id));
146        self.stream_ids.append_value(&*row.stream_id);
147        self.block_ids.append_value(&*row.block_id);
148        self.insert_times.append_value(row.insert_time);
149        self.exes.append_value(&row.process.exe);
150        self.usernames.append_value(&row.process.username);
151        self.computers.append_value(&row.process.computer);
152        self.times.append_value(row.time);
153        self.targets.append_value(&*row.target);
154        self.levels.append_value(row.level);
155        self.msgs.append_value(&*row.msg);
156        self.properties.append_property_set(&row.properties)?;
157        self.process_properties
158            .append_value(&*row.process.properties);
159        Ok(())
160    }
161
162    /// Append only per-entry variable data (optimized for batch processing)
163    pub fn append_entry_only(&mut self, row: &LogEntry) -> Result<()> {
164        // Only append fields that truly vary per log entry
165        self.times.append_value(row.time);
166        self.targets.append_value(&*row.target);
167        self.levels.append_value(row.level);
168        self.msgs.append_value(&*row.msg);
169        self.properties.append_property_set(&row.properties)?;
170        Ok(())
171    }
172
173    /// Batch fill all constant columns for all entries in block
174    pub fn fill_constant_columns(
175        &mut self,
176        process: &ProcessMetadata,
177        stream_id: &str,
178        block_id: &str,
179        insert_time: i64,
180        entry_count: usize,
181    ) -> Result<()> {
182        let process_id_str = format!("{}", process.process_id);
183
184        // For PrimitiveBuilder (insert_times): use append_slice for better performance
185        let insert_times_slice = vec![insert_time; entry_count];
186        self.insert_times.append_slice(&insert_times_slice);
187
188        self.process_ids.append_values(&process_id_str, entry_count);
189        self.stream_ids.append_values(stream_id, entry_count);
190        self.block_ids.append_values(block_id, entry_count);
191        self.exes.append_values(&process.exe, entry_count);
192        self.usernames.append_values(&process.username, entry_count);
193        self.computers.append_values(&process.computer, entry_count);
194        self.process_properties
195            .append_values(&**process.properties, entry_count);
196
197        Ok(())
198    }
199
200    pub fn finish(mut self) -> Result<RecordBatch> {
201        RecordBatch::try_new(
202            Arc::new(log_table_schema()),
203            vec![
204                Arc::new(self.process_ids.finish()),
205                Arc::new(self.stream_ids.finish()),
206                Arc::new(self.block_ids.finish()),
207                Arc::new(self.insert_times.finish().with_timezone_utc()),
208                Arc::new(self.exes.finish()),
209                Arc::new(self.usernames.finish()),
210                Arc::new(self.computers.finish()),
211                Arc::new(self.times.finish().with_timezone_utc()),
212                Arc::new(self.targets.finish()),
213                Arc::new(self.levels.finish()),
214                Arc::new(self.msgs.finish()),
215                Arc::new(self.properties.finish()?),
216                Arc::new(self.process_properties.finish()),
217            ],
218        )
219        .with_context(|| "building record batch")
220    }
221}