1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::DateTime;
5use datafusion::arrow::array::ArrayBuilder;
6use datafusion::arrow::array::BinaryDictionaryBuilder;
7use datafusion::arrow::array::PrimitiveBuilder;
8use datafusion::arrow::array::StringBuilder;
9use datafusion::arrow::array::StringDictionaryBuilder;
10use datafusion::arrow::datatypes::DataType;
11use datafusion::arrow::datatypes::Field;
12use datafusion::arrow::datatypes::Int16Type;
13use datafusion::arrow::datatypes::Int32Type;
14use datafusion::arrow::datatypes::Schema;
15use datafusion::arrow::datatypes::TimeUnit;
16use datafusion::arrow::datatypes::TimestampNanosecondType;
17use datafusion::arrow::record_batch::RecordBatch;
18
19use crate::log_entry::LogEntry;
20use crate::metadata::ProcessMetadata;
21use crate::properties::property_set_jsonb_dictionary_builder::PropertySetJsonbDictionaryBuilder;
22use crate::time::TimeRange;
23
24pub fn log_table_schema() -> Schema {
26 Schema::new(vec![
27 Field::new(
28 "process_id",
29 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
30 false,
31 ),
32 Field::new(
33 "stream_id",
34 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
35 false,
36 ),
37 Field::new(
38 "block_id",
39 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
40 false,
41 ),
42 Field::new(
43 "insert_time",
44 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
45 false,
46 ),
47 Field::new(
48 "exe",
49 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
50 false,
51 ),
52 Field::new(
53 "username",
54 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
55 false,
56 ),
57 Field::new(
58 "computer",
59 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
60 false,
61 ),
62 Field::new(
63 "time",
64 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
65 false,
66 ),
67 Field::new(
68 "target",
69 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
70 false,
71 ),
72 Field::new("level", DataType::Int32, false),
73 Field::new("msg", DataType::Utf8, false),
74 Field::new(
75 "properties",
76 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
77 false,
78 ),
79 Field::new(
80 "process_properties",
81 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
82 false,
83 ),
84 ])
85}
86
87pub struct LogEntriesRecordBuilder {
89 process_ids: StringDictionaryBuilder<Int16Type>,
90 stream_ids: StringDictionaryBuilder<Int16Type>,
91 block_ids: StringDictionaryBuilder<Int16Type>,
92 insert_times: PrimitiveBuilder<TimestampNanosecondType>,
93 exes: StringDictionaryBuilder<Int16Type>,
94 usernames: StringDictionaryBuilder<Int16Type>,
95 computers: StringDictionaryBuilder<Int16Type>,
96 times: PrimitiveBuilder<TimestampNanosecondType>,
97 targets: StringDictionaryBuilder<Int16Type>,
98 levels: PrimitiveBuilder<Int32Type>,
99 msgs: StringBuilder,
100 properties: PropertySetJsonbDictionaryBuilder,
101 process_properties: BinaryDictionaryBuilder<Int32Type>,
102}
103
104impl LogEntriesRecordBuilder {
105 pub fn with_capacity(capacity: usize) -> Self {
106 Self {
107 process_ids: StringDictionaryBuilder::new(),
108 stream_ids: StringDictionaryBuilder::new(),
109 block_ids: StringDictionaryBuilder::new(),
110 insert_times: PrimitiveBuilder::with_capacity(capacity),
111 exes: StringDictionaryBuilder::new(),
112 usernames: StringDictionaryBuilder::new(),
113 computers: StringDictionaryBuilder::new(),
114 times: PrimitiveBuilder::with_capacity(capacity),
115 targets: StringDictionaryBuilder::new(),
116 levels: PrimitiveBuilder::with_capacity(capacity),
117 msgs: StringBuilder::new(),
118 properties: PropertySetJsonbDictionaryBuilder::new(capacity),
119 process_properties: BinaryDictionaryBuilder::new(),
120 }
121 }
122
123 pub fn get_time_range(&self) -> Option<TimeRange> {
124 if self.is_empty() {
125 return None;
126 }
127 let slice = self.times.values_slice();
129 Some(TimeRange::new(
130 DateTime::from_timestamp_nanos(slice[0]),
131 DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
132 ))
133 }
134
135 pub fn len(&self) -> i64 {
136 self.times.len() as i64
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.times.len() == 0
141 }
142
143 pub fn append(&mut self, row: &LogEntry) -> Result<()> {
144 self.process_ids
145 .append_value(format!("{}", row.process.process_id));
146 self.stream_ids.append_value(&*row.stream_id);
147 self.block_ids.append_value(&*row.block_id);
148 self.insert_times.append_value(row.insert_time);
149 self.exes.append_value(&row.process.exe);
150 self.usernames.append_value(&row.process.username);
151 self.computers.append_value(&row.process.computer);
152 self.times.append_value(row.time);
153 self.targets.append_value(&*row.target);
154 self.levels.append_value(row.level);
155 self.msgs.append_value(&*row.msg);
156 self.properties.append_property_set(&row.properties)?;
157 self.process_properties
158 .append_value(&*row.process.properties);
159 Ok(())
160 }
161
162 pub fn append_entry_only(&mut self, row: &LogEntry) -> Result<()> {
164 self.times.append_value(row.time);
166 self.targets.append_value(&*row.target);
167 self.levels.append_value(row.level);
168 self.msgs.append_value(&*row.msg);
169 self.properties.append_property_set(&row.properties)?;
170 Ok(())
171 }
172
173 pub fn fill_constant_columns(
175 &mut self,
176 process: &ProcessMetadata,
177 stream_id: &str,
178 block_id: &str,
179 insert_time: i64,
180 entry_count: usize,
181 ) -> Result<()> {
182 let process_id_str = format!("{}", process.process_id);
183
184 let insert_times_slice = vec![insert_time; entry_count];
186 self.insert_times.append_slice(&insert_times_slice);
187
188 self.process_ids.append_values(&process_id_str, entry_count);
189 self.stream_ids.append_values(stream_id, entry_count);
190 self.block_ids.append_values(block_id, entry_count);
191 self.exes.append_values(&process.exe, entry_count);
192 self.usernames.append_values(&process.username, entry_count);
193 self.computers.append_values(&process.computer, entry_count);
194 self.process_properties
195 .append_values(&**process.properties, entry_count);
196
197 Ok(())
198 }
199
200 pub fn finish(mut self) -> Result<RecordBatch> {
201 RecordBatch::try_new(
202 Arc::new(log_table_schema()),
203 vec![
204 Arc::new(self.process_ids.finish()),
205 Arc::new(self.stream_ids.finish()),
206 Arc::new(self.block_ids.finish()),
207 Arc::new(self.insert_times.finish().with_timezone_utc()),
208 Arc::new(self.exes.finish()),
209 Arc::new(self.usernames.finish()),
210 Arc::new(self.computers.finish()),
211 Arc::new(self.times.finish().with_timezone_utc()),
212 Arc::new(self.targets.finish()),
213 Arc::new(self.levels.finish()),
214 Arc::new(self.msgs.finish()),
215 Arc::new(self.properties.finish()?),
216 Arc::new(self.process_properties.finish()),
217 ],
218 )
219 .with_context(|| "building record batch")
220 }
221}