1use anyhow::{Context, Result};
2use datafusion::arrow::array::ArrayBuilder;
3use datafusion::arrow::array::BinaryBuilder;
4use datafusion::arrow::array::BinaryDictionaryBuilder;
5use datafusion::arrow::array::ListBuilder;
6use datafusion::arrow::array::PrimitiveBuilder;
7use datafusion::arrow::array::StringBuilder;
8use datafusion::arrow::array::StructBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int32Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::TimeUnit;
14use datafusion::arrow::datatypes::TimestampNanosecondType;
15use datafusion::arrow::record_batch::RecordBatch;
16use datafusion::common::cast::as_struct_array;
17use jsonb::Value;
18use micromegas_telemetry::property::Property;
19use sqlx::Column;
20use sqlx::Row;
21use sqlx::TypeInfo;
22use sqlx::postgres::{PgColumn, PgRow};
23use std::borrow::Cow;
24use std::collections::BTreeMap;
25use std::sync::Arc;
26
27use crate::arrow_utils::make_empty_record_batch;
28
29pub trait ColumnReader {
31 fn extract_column_from_row(
32 &self,
33 row: &PgRow,
34 struct_builder: &mut StructBuilder,
35 ) -> Result<()>;
36 fn field(&self) -> Field;
37}
38
39pub struct StringColumnReader {
41 pub field: Field,
42 pub column_ordinal: usize,
43}
44
45impl ColumnReader for StringColumnReader {
46 fn extract_column_from_row(
47 &self,
48 row: &PgRow,
49 struct_builder: &mut StructBuilder,
50 ) -> Result<()> {
51 let value: Option<&str> = row
52 .try_get(self.column_ordinal)
53 .with_context(|| "try_get failed on row")?;
54 let field_builder = struct_builder
55 .field_builder::<StringBuilder>(self.column_ordinal)
56 .with_context(|| "getting field builder for string column")?;
57 if let Some(v) = value {
58 field_builder.append_value(v);
59 } else {
60 field_builder.append_null();
61 }
62 Ok(())
63 }
64
65 fn field(&self) -> Field {
66 self.field.clone()
67 }
68}
69
70pub struct UuidColumnReader {
72 pub field: Field,
73 pub column_ordinal: usize,
74}
75
76impl ColumnReader for UuidColumnReader {
77 fn extract_column_from_row(
78 &self,
79 row: &PgRow,
80 struct_builder: &mut StructBuilder,
81 ) -> Result<()> {
82 let value: Option<sqlx::types::uuid::Uuid> = row
83 .try_get(self.column_ordinal)
84 .with_context(|| "try_get failed on row")?;
85 let field_builder = struct_builder
86 .field_builder::<StringBuilder>(self.column_ordinal)
87 .with_context(|| "getting field builder for string column")?;
88 if let Some(uuid) = value {
89 field_builder.append_value(
90 uuid.hyphenated()
91 .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer()),
92 );
93 } else {
94 field_builder.append_null();
95 }
96 Ok(())
97 }
98
99 fn field(&self) -> Field {
100 self.field.clone()
101 }
102}
103
104pub struct Int64ColumnReader {
106 pub field: Field,
107 pub column_ordinal: usize,
108}
109
110impl ColumnReader for Int64ColumnReader {
111 fn extract_column_from_row(
112 &self,
113 row: &PgRow,
114 struct_builder: &mut StructBuilder,
115 ) -> Result<()> {
116 let value: Option<i64> = row
117 .try_get(self.column_ordinal)
118 .with_context(|| "try_get failed on row")?;
119 let field_builder = struct_builder
120 .field_builder::<PrimitiveBuilder<Int64Type>>(self.column_ordinal)
121 .with_context(|| "getting field builder for int64 column")?;
122 if let Some(v) = value {
123 field_builder.append_value(v);
124 } else {
125 field_builder.append_null();
126 }
127 Ok(())
128 }
129 fn field(&self) -> Field {
130 self.field.clone()
131 }
132}
133
134pub struct Int32ColumnReader {
136 pub field: Field,
137 pub column_ordinal: usize,
138}
139
140impl ColumnReader for Int32ColumnReader {
141 fn extract_column_from_row(
142 &self,
143 row: &PgRow,
144 struct_builder: &mut StructBuilder,
145 ) -> Result<()> {
146 let value: Option<i32> = row
147 .try_get(self.column_ordinal)
148 .with_context(|| "try_get failed on row")?;
149 let field_builder = struct_builder
150 .field_builder::<PrimitiveBuilder<Int32Type>>(self.column_ordinal)
151 .with_context(|| "getting field builder for int32 column")?;
152 if let Some(v) = value {
153 field_builder.append_value(v);
154 } else {
155 field_builder.append_null();
156 }
157 Ok(())
158 }
159 fn field(&self) -> Field {
160 self.field.clone()
161 }
162}
163
164pub struct TimestampColumnReader {
166 pub field: Field,
167 pub column_ordinal: usize,
168}
169
170impl ColumnReader for TimestampColumnReader {
171 fn extract_column_from_row(
172 &self,
173 row: &PgRow,
174 struct_builder: &mut StructBuilder,
175 ) -> Result<()> {
176 use sqlx::types::chrono::{DateTime, Utc};
177 let value: Option<DateTime<Utc>> = row
178 .try_get(self.column_ordinal)
179 .with_context(|| "try_get failed on row")?;
180 let field_builder = struct_builder
181 .field_builder::<PrimitiveBuilder<TimestampNanosecondType>>(self.column_ordinal)
182 .with_context(|| "getting field builder for timestamp column")?;
183 if let Some(timestamp) = value {
184 field_builder.append_value(timestamp.timestamp_nanos_opt().unwrap_or(0));
185 } else {
186 field_builder.append_null();
187 }
188 Ok(())
189 }
190
191 fn field(&self) -> Field {
192 self.field.clone()
193 }
194}
195
196pub struct StringArrayColumnReader {
198 pub field: Field,
199 pub column_ordinal: usize,
200}
201
202impl ColumnReader for StringArrayColumnReader {
203 fn extract_column_from_row(
204 &self,
205 row: &PgRow,
206 struct_builder: &mut StructBuilder,
207 ) -> Result<()> {
208 let strings: Option<Vec<String>> = row
209 .try_get(self.column_ordinal)
210 .with_context(|| "try_get failed on row")?;
211 let list_builder = struct_builder
212 .field_builder::<ListBuilder<Box<dyn ArrayBuilder>>>(self.column_ordinal)
213 .with_context(|| "getting field builder for string array column")?;
214 if let Some(strings) = strings {
215 let string_builder = list_builder
216 .values()
217 .as_any_mut()
218 .downcast_mut::<StringBuilder>()
219 .unwrap();
220 for v in strings {
221 string_builder.append_value(v);
222 }
223 list_builder.append(true);
224 } else {
225 list_builder.append_null();
226 }
227 Ok(())
228 }
229
230 fn field(&self) -> Field {
231 self.field.clone()
232 }
233}
234
235pub struct BlobColumnReader {
237 pub field: Field,
238 pub column_ordinal: usize,
239}
240
241impl ColumnReader for BlobColumnReader {
242 fn extract_column_from_row(
243 &self,
244 row: &PgRow,
245 struct_builder: &mut StructBuilder,
246 ) -> Result<()> {
247 let value: Option<Vec<u8>> = row
248 .try_get(self.column_ordinal)
249 .with_context(|| "try_get failed on row")?;
250 let field_builder = struct_builder
251 .field_builder::<BinaryBuilder>(self.column_ordinal)
252 .with_context(|| "getting field builder for blob column")?;
253 if let Some(v) = value {
254 field_builder.append_value(v);
255 } else {
256 field_builder.append_null();
257 }
258 Ok(())
259 }
260
261 fn field(&self) -> Field {
262 self.field.clone()
263 }
264}
265
266pub struct PropertiesColumnReader {
268 pub field: Field,
269 pub column_ordinal: usize,
270}
271
272impl ColumnReader for PropertiesColumnReader {
273 fn extract_column_from_row(
274 &self,
275 row: &PgRow,
276 struct_builder: &mut StructBuilder,
277 ) -> Result<()> {
278 let props: Vec<Property> = row
279 .try_get(self.column_ordinal)
280 .with_context(|| "try_get failed on row")?;
281
282 let dict_builder = struct_builder
284 .field_builder::<BinaryDictionaryBuilder<Int32Type>>(self.column_ordinal)
285 .with_context(|| "getting dictionary field builder for properties")?;
286
287 if props.is_empty() {
289 let empty_map = Value::Object(BTreeMap::new());
291 let mut buffer = Vec::new();
292 empty_map.write_to_vec(&mut buffer);
293 dict_builder.append_value(&buffer);
294 } else {
295 let mut map = BTreeMap::new();
297 for p in props {
298 map.insert(
299 p.key_str().to_string(),
300 Value::String(Cow::Owned(p.value_str().to_string())),
301 );
302 }
303
304 let jsonb_object = Value::Object(map);
306 let mut buffer = Vec::new();
307 jsonb_object.write_to_vec(&mut buffer);
308 dict_builder.append_value(&buffer);
309 }
310
311 Ok(())
312 }
313
314 fn field(&self) -> Field {
315 self.field.clone()
316 }
317}
318
319pub fn make_column_reader(column: &PgColumn) -> Result<Arc<dyn ColumnReader>> {
321 match column.type_info().name() {
322 "VARCHAR" => Ok(Arc::new(StringColumnReader {
323 field: Field::new(column.name(), DataType::Utf8, true),
324 column_ordinal: column.ordinal(),
325 })),
326 "UUID" => Ok(Arc::new(UuidColumnReader {
327 field: Field::new(column.name(), DataType::Utf8, true),
328 column_ordinal: column.ordinal(),
329 })),
330 "TIMESTAMPTZ" => Ok(Arc::new(TimestampColumnReader {
331 field: Field::new(
332 column.name(),
333 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), true,
335 ),
336 column_ordinal: column.ordinal(),
337 })),
338 "INT8" => Ok(Arc::new(Int64ColumnReader {
339 field: Field::new(column.name(), DataType::Int64, true),
340 column_ordinal: column.ordinal(),
341 })),
342 "INT4" => Ok(Arc::new(Int32ColumnReader {
343 field: Field::new(column.name(), DataType::Int32, true),
344 column_ordinal: column.ordinal(),
345 })),
346 "BYTEA" => Ok(Arc::new(BlobColumnReader {
347 field: Field::new(column.name(), DataType::Binary, true),
348 column_ordinal: column.ordinal(),
349 })),
350 "TEXT[]" => Ok(Arc::new(StringArrayColumnReader {
351 field: Field::new(
352 column.name(),
353 DataType::List(Arc::new(Field::new("tag", DataType::Utf8, false))),
354 true,
355 ),
356 column_ordinal: column.ordinal(),
357 })),
358 "micromegas_property[]" => Ok(Arc::new(PropertiesColumnReader {
359 field: Field::new(
360 column.name(),
361 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
362 true,
363 ),
364 column_ordinal: column.ordinal(),
365 })),
366 other => anyhow::bail!("unknown type {other}"),
367 }
368}
369
370pub fn rows_to_record_batch(rows: &[PgRow]) -> Result<RecordBatch> {
372 if rows.is_empty() {
373 return Ok(make_empty_record_batch());
374 }
375
376 let mut field_readers = vec![];
377 for column in rows[0].columns() {
378 field_readers
379 .push(make_column_reader(column).with_context(|| "error building column reader")?);
380 }
381
382 let fields: Vec<_> = field_readers.iter().map(|reader| reader.field()).collect();
383 let mut list_builder = ListBuilder::new(StructBuilder::from_fields(fields, 1024));
384 let struct_builder: &mut StructBuilder = list_builder.values();
385 for r in rows {
386 for reader in &field_readers {
387 reader.extract_column_from_row(r, struct_builder)?;
388 }
389 struct_builder.append(true);
390 }
391 list_builder.append(true);
392 let array = list_builder.finish();
393 Ok(as_struct_array(array.values())
394 .with_context(|| "casting list values to struct srray")?
395 .into())
396}