1use crate::{
2 measure::Measure, metadata::ProcessMetadata,
3 properties::property_set_jsonb_dictionary_builder::PropertySetJsonbDictionaryBuilder,
4 time::TimeRange,
5};
6use anyhow::{Context, Result};
7use chrono::DateTime;
8use datafusion::arrow::{
9 array::{ArrayBuilder, BinaryDictionaryBuilder, PrimitiveBuilder, StringDictionaryBuilder},
10 datatypes::{
11 DataType, Field, Float64Type, Int16Type, Int32Type, Schema, TimeUnit,
12 TimestampNanosecondType,
13 },
14 record_batch::RecordBatch,
15};
16use std::sync::Arc;
17
18pub fn metrics_table_schema() -> Schema {
20 Schema::new(vec![
21 Field::new(
22 "process_id",
23 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
24 false,
25 ),
26 Field::new(
27 "stream_id",
28 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
29 false,
30 ),
31 Field::new(
32 "block_id",
33 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
34 false,
35 ),
36 Field::new(
37 "insert_time",
38 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
39 false,
40 ),
41 Field::new(
42 "exe",
43 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
44 false,
45 ),
46 Field::new(
47 "username",
48 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
49 false,
50 ),
51 Field::new(
52 "computer",
53 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
54 false,
55 ),
56 Field::new(
57 "time",
58 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
59 false,
60 ),
61 Field::new(
62 "target",
63 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
64 false,
65 ),
66 Field::new(
67 "name",
68 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
69 false,
70 ),
71 Field::new(
72 "unit",
73 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
74 false,
75 ),
76 Field::new("value", DataType::Float64, false),
77 Field::new(
78 "properties",
79 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
80 false,
81 ),
82 Field::new(
83 "process_properties",
84 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
85 false,
86 ),
87 ])
88}
89
90pub struct MetricsRecordBuilder {
92 pub process_ids: StringDictionaryBuilder<Int16Type>,
93 pub stream_ids: StringDictionaryBuilder<Int16Type>,
94 pub block_ids: StringDictionaryBuilder<Int16Type>,
95 pub insert_times: PrimitiveBuilder<TimestampNanosecondType>,
96 pub exes: StringDictionaryBuilder<Int16Type>,
97 pub usernames: StringDictionaryBuilder<Int16Type>,
98 pub computers: StringDictionaryBuilder<Int16Type>,
99 pub times: PrimitiveBuilder<TimestampNanosecondType>,
100 pub targets: StringDictionaryBuilder<Int16Type>,
101 pub names: StringDictionaryBuilder<Int16Type>,
102 pub units: StringDictionaryBuilder<Int16Type>,
103 pub values: PrimitiveBuilder<Float64Type>,
104 pub properties: PropertySetJsonbDictionaryBuilder,
105 pub process_properties: BinaryDictionaryBuilder<Int32Type>,
106}
107
108impl MetricsRecordBuilder {
109 pub fn with_capacity(capacity: usize) -> Self {
110 Self {
111 process_ids: StringDictionaryBuilder::new(),
112 stream_ids: StringDictionaryBuilder::new(),
113 block_ids: StringDictionaryBuilder::new(),
114 insert_times: PrimitiveBuilder::with_capacity(capacity),
115 exes: StringDictionaryBuilder::new(),
116 usernames: StringDictionaryBuilder::new(),
117 computers: StringDictionaryBuilder::new(),
118 times: PrimitiveBuilder::with_capacity(capacity),
119 targets: StringDictionaryBuilder::new(),
120 names: StringDictionaryBuilder::new(),
121 units: StringDictionaryBuilder::new(),
122 values: PrimitiveBuilder::with_capacity(capacity),
123 properties: PropertySetJsonbDictionaryBuilder::new(capacity),
124 process_properties: BinaryDictionaryBuilder::new(),
125 }
126 }
127
128 pub fn len(&self) -> i64 {
129 self.times.len() as i64
130 }
131
132 pub fn is_empty(&self) -> bool {
133 self.times.len() == 0
134 }
135
136 pub fn get_time_range(&self) -> Option<TimeRange> {
137 if self.is_empty() {
138 return None;
139 }
140 let slice = self.times.values_slice();
142 Some(TimeRange::new(
143 DateTime::from_timestamp_nanos(slice[0]),
144 DateTime::from_timestamp_nanos(slice[slice.len() - 1]),
145 ))
146 }
147
148 pub fn append(&mut self, row: &Measure) -> Result<()> {
149 self.process_ids
150 .append_value(format!("{}", row.process.process_id));
151 self.stream_ids.append_value(&*row.stream_id);
152 self.block_ids.append_value(&*row.block_id);
153 self.insert_times.append_value(row.insert_time);
154 self.exes.append_value(&row.process.exe);
155 self.usernames.append_value(&row.process.username);
156 self.computers.append_value(&row.process.computer);
157 self.times.append_value(row.time);
158 self.targets.append_value(&*row.target);
159 self.names.append_value(&*row.name);
160 self.units.append_value(&*row.unit);
161 self.values.append_value(row.value);
162 self.properties.append_property_set(&row.properties)?;
163 self.process_properties
164 .append_value(&*row.process.properties);
165 Ok(())
166 }
167
168 pub fn append_entry_only(&mut self, row: &Measure) -> Result<()> {
170 self.times.append_value(row.time);
172 self.targets.append_value(&*row.target);
173 self.names.append_value(&*row.name);
174 self.units.append_value(&*row.unit);
175 self.values.append_value(row.value);
176 self.properties.append_property_set(&row.properties)?;
177 Ok(())
178 }
179
180 pub fn fill_constant_columns(
182 &mut self,
183 process: &ProcessMetadata,
184 stream_id: &str,
185 block_id: &str,
186 insert_time: i64,
187 entry_count: usize,
188 ) -> Result<()> {
189 let process_id_str = format!("{}", process.process_id);
190
191 let insert_times_slice = vec![insert_time; entry_count];
193 self.insert_times.append_slice(&insert_times_slice);
194
195 self.process_properties
197 .append_values(&**process.properties, entry_count);
198
199 self.process_ids.append_values(&process_id_str, entry_count);
201 self.stream_ids.append_values(stream_id, entry_count);
202 self.block_ids.append_values(block_id, entry_count);
203 self.exes.append_values(&process.exe, entry_count);
204 self.usernames.append_values(&process.username, entry_count);
205 self.computers.append_values(&process.computer, entry_count);
206
207 Ok(())
208 }
209
210 pub fn finish(mut self) -> Result<RecordBatch> {
211 RecordBatch::try_new(
212 Arc::new(metrics_table_schema()),
213 vec![
214 Arc::new(self.process_ids.finish()),
215 Arc::new(self.stream_ids.finish()),
216 Arc::new(self.block_ids.finish()),
217 Arc::new(self.insert_times.finish().with_timezone_utc()),
218 Arc::new(self.exes.finish()),
219 Arc::new(self.usernames.finish()),
220 Arc::new(self.computers.finish()),
221 Arc::new(self.times.finish().with_timezone_utc()),
222 Arc::new(self.targets.finish()),
223 Arc::new(self.names.finish()),
224 Arc::new(self.units.finish()),
225 Arc::new(self.values.finish()),
226 Arc::new(self.properties.finish()?),
227 Arc::new(self.process_properties.finish()),
228 ],
229 )
230 .with_context(|| "building record batch")
231 }
232}