micromegas_analytics/
sql_arrow_bridge.rs

1use anyhow::{Context, Result};
2use datafusion::arrow::array::ArrayBuilder;
3use datafusion::arrow::array::BinaryBuilder;
4use datafusion::arrow::array::BinaryDictionaryBuilder;
5use datafusion::arrow::array::ListBuilder;
6use datafusion::arrow::array::PrimitiveBuilder;
7use datafusion::arrow::array::StringBuilder;
8use datafusion::arrow::array::StructBuilder;
9use datafusion::arrow::datatypes::DataType;
10use datafusion::arrow::datatypes::Field;
11use datafusion::arrow::datatypes::Int32Type;
12use datafusion::arrow::datatypes::Int64Type;
13use datafusion::arrow::datatypes::TimeUnit;
14use datafusion::arrow::datatypes::TimestampNanosecondType;
15use datafusion::arrow::record_batch::RecordBatch;
16use datafusion::common::cast::as_struct_array;
17use jsonb::Value;
18use micromegas_telemetry::property::Property;
19use sqlx::Column;
20use sqlx::Row;
21use sqlx::TypeInfo;
22use sqlx::postgres::{PgColumn, PgRow};
23use std::borrow::Cow;
24use std::collections::BTreeMap;
25use std::sync::Arc;
26
27use crate::arrow_utils::make_empty_record_batch;
28
29/// A trait for reading a column from a database row and writing it to an Arrow `StructBuilder`.
30pub trait ColumnReader {
31    fn extract_column_from_row(
32        &self,
33        row: &PgRow,
34        struct_builder: &mut StructBuilder,
35    ) -> Result<()>;
36    fn field(&self) -> Field;
37}
38
39/// A `ColumnReader` for string columns.
40pub struct StringColumnReader {
41    pub field: Field,
42    pub column_ordinal: usize,
43}
44
45impl ColumnReader for StringColumnReader {
46    fn extract_column_from_row(
47        &self,
48        row: &PgRow,
49        struct_builder: &mut StructBuilder,
50    ) -> Result<()> {
51        let value: Option<&str> = row
52            .try_get(self.column_ordinal)
53            .with_context(|| "try_get failed on row")?;
54        let field_builder = struct_builder
55            .field_builder::<StringBuilder>(self.column_ordinal)
56            .with_context(|| "getting field builder for string column")?;
57        if let Some(v) = value {
58            field_builder.append_value(v);
59        } else {
60            field_builder.append_null();
61        }
62        Ok(())
63    }
64
65    fn field(&self) -> Field {
66        self.field.clone()
67    }
68}
69
70/// A `ColumnReader` for UUID columns.
71pub struct UuidColumnReader {
72    pub field: Field,
73    pub column_ordinal: usize,
74}
75
76impl ColumnReader for UuidColumnReader {
77    fn extract_column_from_row(
78        &self,
79        row: &PgRow,
80        struct_builder: &mut StructBuilder,
81    ) -> Result<()> {
82        let value: Option<sqlx::types::uuid::Uuid> = row
83            .try_get(self.column_ordinal)
84            .with_context(|| "try_get failed on row")?;
85        let field_builder = struct_builder
86            .field_builder::<StringBuilder>(self.column_ordinal)
87            .with_context(|| "getting field builder for string column")?;
88        if let Some(uuid) = value {
89            field_builder.append_value(
90                uuid.hyphenated()
91                    .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer()),
92            );
93        } else {
94            field_builder.append_null();
95        }
96        Ok(())
97    }
98
99    fn field(&self) -> Field {
100        self.field.clone()
101    }
102}
103
104/// A `ColumnReader` for `i64` columns.
105pub struct Int64ColumnReader {
106    pub field: Field,
107    pub column_ordinal: usize,
108}
109
110impl ColumnReader for Int64ColumnReader {
111    fn extract_column_from_row(
112        &self,
113        row: &PgRow,
114        struct_builder: &mut StructBuilder,
115    ) -> Result<()> {
116        let value: Option<i64> = row
117            .try_get(self.column_ordinal)
118            .with_context(|| "try_get failed on row")?;
119        let field_builder = struct_builder
120            .field_builder::<PrimitiveBuilder<Int64Type>>(self.column_ordinal)
121            .with_context(|| "getting field builder for int64 column")?;
122        if let Some(v) = value {
123            field_builder.append_value(v);
124        } else {
125            field_builder.append_null();
126        }
127        Ok(())
128    }
129    fn field(&self) -> Field {
130        self.field.clone()
131    }
132}
133
134/// A `ColumnReader` for `i32` columns.
135pub struct Int32ColumnReader {
136    pub field: Field,
137    pub column_ordinal: usize,
138}
139
140impl ColumnReader for Int32ColumnReader {
141    fn extract_column_from_row(
142        &self,
143        row: &PgRow,
144        struct_builder: &mut StructBuilder,
145    ) -> Result<()> {
146        let value: Option<i32> = row
147            .try_get(self.column_ordinal)
148            .with_context(|| "try_get failed on row")?;
149        let field_builder = struct_builder
150            .field_builder::<PrimitiveBuilder<Int32Type>>(self.column_ordinal)
151            .with_context(|| "getting field builder for int32 column")?;
152        if let Some(v) = value {
153            field_builder.append_value(v);
154        } else {
155            field_builder.append_null();
156        }
157        Ok(())
158    }
159    fn field(&self) -> Field {
160        self.field.clone()
161    }
162}
163
164/// A `ColumnReader` for timestamp columns.
165pub struct TimestampColumnReader {
166    pub field: Field,
167    pub column_ordinal: usize,
168}
169
170impl ColumnReader for TimestampColumnReader {
171    fn extract_column_from_row(
172        &self,
173        row: &PgRow,
174        struct_builder: &mut StructBuilder,
175    ) -> Result<()> {
176        use sqlx::types::chrono::{DateTime, Utc};
177        let value: Option<DateTime<Utc>> = row
178            .try_get(self.column_ordinal)
179            .with_context(|| "try_get failed on row")?;
180        let field_builder = struct_builder
181            .field_builder::<PrimitiveBuilder<TimestampNanosecondType>>(self.column_ordinal)
182            .with_context(|| "getting field builder for timestamp column")?;
183        if let Some(timestamp) = value {
184            field_builder.append_value(timestamp.timestamp_nanos_opt().unwrap_or(0));
185        } else {
186            field_builder.append_null();
187        }
188        Ok(())
189    }
190
191    fn field(&self) -> Field {
192        self.field.clone()
193    }
194}
195
196/// A `ColumnReader` for string array columns.
197pub struct StringArrayColumnReader {
198    pub field: Field,
199    pub column_ordinal: usize,
200}
201
202impl ColumnReader for StringArrayColumnReader {
203    fn extract_column_from_row(
204        &self,
205        row: &PgRow,
206        struct_builder: &mut StructBuilder,
207    ) -> Result<()> {
208        let strings: Option<Vec<String>> = row
209            .try_get(self.column_ordinal)
210            .with_context(|| "try_get failed on row")?;
211        let list_builder = struct_builder
212            .field_builder::<ListBuilder<Box<dyn ArrayBuilder>>>(self.column_ordinal)
213            .with_context(|| "getting field builder for string array column")?;
214        if let Some(strings) = strings {
215            let string_builder = list_builder
216                .values()
217                .as_any_mut()
218                .downcast_mut::<StringBuilder>()
219                .unwrap();
220            for v in strings {
221                string_builder.append_value(v);
222            }
223            list_builder.append(true);
224        } else {
225            list_builder.append_null();
226        }
227        Ok(())
228    }
229
230    fn field(&self) -> Field {
231        self.field.clone()
232    }
233}
234
235/// A `ColumnReader` for blob columns.
236pub struct BlobColumnReader {
237    pub field: Field,
238    pub column_ordinal: usize,
239}
240
241impl ColumnReader for BlobColumnReader {
242    fn extract_column_from_row(
243        &self,
244        row: &PgRow,
245        struct_builder: &mut StructBuilder,
246    ) -> Result<()> {
247        let value: Option<Vec<u8>> = row
248            .try_get(self.column_ordinal)
249            .with_context(|| "try_get failed on row")?;
250        let field_builder = struct_builder
251            .field_builder::<BinaryBuilder>(self.column_ordinal)
252            .with_context(|| "getting field builder for blob column")?;
253        if let Some(v) = value {
254            field_builder.append_value(v);
255        } else {
256            field_builder.append_null();
257        }
258        Ok(())
259    }
260
261    fn field(&self) -> Field {
262        self.field.clone()
263    }
264}
265
266/// A `ColumnReader` for properties columns.
267pub struct PropertiesColumnReader {
268    pub field: Field,
269    pub column_ordinal: usize,
270}
271
272impl ColumnReader for PropertiesColumnReader {
273    fn extract_column_from_row(
274        &self,
275        row: &PgRow,
276        struct_builder: &mut StructBuilder,
277    ) -> Result<()> {
278        let props: Vec<Property> = row
279            .try_get(self.column_ordinal)
280            .with_context(|| "try_get failed on row")?;
281
282        // Get the dictionary builder for JSONB format
283        let dict_builder = struct_builder
284            .field_builder::<BinaryDictionaryBuilder<Int32Type>>(self.column_ordinal)
285            .with_context(|| "getting dictionary field builder for properties")?;
286
287        // Convert properties to JSONB format
288        if props.is_empty() {
289            // For empty properties, append an empty JSONB object
290            let empty_map = Value::Object(BTreeMap::new());
291            let mut buffer = Vec::new();
292            empty_map.write_to_vec(&mut buffer);
293            dict_builder.append_value(&buffer);
294        } else {
295            // Build a BTreeMap from properties
296            let mut map = BTreeMap::new();
297            for p in props {
298                map.insert(
299                    p.key_str().to_string(),
300                    Value::String(Cow::Owned(p.value_str().to_string())),
301                );
302            }
303
304            // Convert to JSONB bytes
305            let jsonb_object = Value::Object(map);
306            let mut buffer = Vec::new();
307            jsonb_object.write_to_vec(&mut buffer);
308            dict_builder.append_value(&buffer);
309        }
310
311        Ok(())
312    }
313
314    fn field(&self) -> Field {
315        self.field.clone()
316    }
317}
318
319/// Creates a `ColumnReader` for a given database column.
320pub fn make_column_reader(column: &PgColumn) -> Result<Arc<dyn ColumnReader>> {
321    match column.type_info().name() {
322        "VARCHAR" => Ok(Arc::new(StringColumnReader {
323            field: Field::new(column.name(), DataType::Utf8, true),
324            column_ordinal: column.ordinal(),
325        })),
326        "UUID" => Ok(Arc::new(UuidColumnReader {
327            field: Field::new(column.name(), DataType::Utf8, true),
328            column_ordinal: column.ordinal(),
329        })),
330        "TIMESTAMPTZ" => Ok(Arc::new(TimestampColumnReader {
331            field: Field::new(
332                column.name(),
333                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), //postgres only stores microseconds, but every event is in nanoseconds
334                true,
335            ),
336            column_ordinal: column.ordinal(),
337        })),
338        "INT8" => Ok(Arc::new(Int64ColumnReader {
339            field: Field::new(column.name(), DataType::Int64, true),
340            column_ordinal: column.ordinal(),
341        })),
342        "INT4" => Ok(Arc::new(Int32ColumnReader {
343            field: Field::new(column.name(), DataType::Int32, true),
344            column_ordinal: column.ordinal(),
345        })),
346        "BYTEA" => Ok(Arc::new(BlobColumnReader {
347            field: Field::new(column.name(), DataType::Binary, true),
348            column_ordinal: column.ordinal(),
349        })),
350        "TEXT[]" => Ok(Arc::new(StringArrayColumnReader {
351            field: Field::new(
352                column.name(),
353                DataType::List(Arc::new(Field::new("tag", DataType::Utf8, false))),
354                true,
355            ),
356            column_ordinal: column.ordinal(),
357        })),
358        "micromegas_property[]" => Ok(Arc::new(PropertiesColumnReader {
359            field: Field::new(
360                column.name(),
361                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
362                true,
363            ),
364            column_ordinal: column.ordinal(),
365        })),
366        other => anyhow::bail!("unknown type {other}"),
367    }
368}
369
370/// Converts a slice of database rows to an Arrow `RecordBatch`.
371pub fn rows_to_record_batch(rows: &[PgRow]) -> Result<RecordBatch> {
372    if rows.is_empty() {
373        return Ok(make_empty_record_batch());
374    }
375
376    let mut field_readers = vec![];
377    for column in rows[0].columns() {
378        field_readers
379            .push(make_column_reader(column).with_context(|| "error building column reader")?);
380    }
381
382    let fields: Vec<_> = field_readers.iter().map(|reader| reader.field()).collect();
383    let mut list_builder = ListBuilder::new(StructBuilder::from_fields(fields, 1024));
384    let struct_builder: &mut StructBuilder = list_builder.values();
385    for r in rows {
386        for reader in &field_readers {
387            reader.extract_column_from_row(r, struct_builder)?;
388        }
389        struct_builder.append(true);
390    }
391    list_builder.append(true);
392    let array = list_builder.finish();
393    Ok(as_struct_array(array.values())
394        .with_context(|| "casting list values to struct srray")?
395        .into())
396}