micromegas_analytics/properties/
properties_column_accessor.rs1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{Array, ArrayRef, GenericListArray, RecordBatch};
3use datafusion::arrow::datatypes::DataType;
4use std::sync::Arc;
5
6use crate::arrow_properties::{read_property_list, serialize_properties_to_jsonb};
7use crate::dfext::binary_column_accessor::{BinaryColumnAccessor, create_binary_accessor};
8use std::collections::HashMap;
9
10pub trait PropertiesColumnAccessor: Send + std::fmt::Debug {
18 fn jsonb_value(&self, index: usize) -> Result<Vec<u8>>;
23
24 fn len(&self) -> usize;
26
27 fn is_null(&self, index: usize) -> bool;
29
30 fn is_empty(&self) -> bool {
32 self.len() == 0
33 }
34}
35
36struct JsonbColumnAccessor {
38 binary_accessor: Box<dyn BinaryColumnAccessor + Send>,
39}
40
41impl JsonbColumnAccessor {
42 fn new(binary_accessor: Box<dyn BinaryColumnAccessor + Send>) -> Self {
43 Self { binary_accessor }
44 }
45}
46
47impl PropertiesColumnAccessor for JsonbColumnAccessor {
48 fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
49 Ok(self.binary_accessor.value(index).to_vec())
51 }
52
53 fn len(&self) -> usize {
54 self.binary_accessor.len()
55 }
56
57 fn is_null(&self, index: usize) -> bool {
58 self.binary_accessor.is_null(index)
59 }
60}
61
62impl std::fmt::Debug for JsonbColumnAccessor {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("JsonbColumnAccessor")
65 .field("len", &self.binary_accessor.len())
66 .finish()
67 }
68}
69
70#[derive(Debug)]
72struct StructArrayAccessor {
73 array: Arc<GenericListArray<i32>>,
74}
75
76impl StructArrayAccessor {
77 fn new(array: Arc<GenericListArray<i32>>) -> Self {
78 Self { array }
79 }
80}
81
82impl PropertiesColumnAccessor for StructArrayAccessor {
83 fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
84 let property_list_array = self.array.value(index);
86 let properties = read_property_list(property_list_array)?;
87
88 let properties_map: HashMap<String, String> = properties
90 .into_iter()
91 .map(|prop| (prop.key_str().to_string(), prop.value_str().to_string()))
92 .collect();
93
94 serialize_properties_to_jsonb(&properties_map)
96 }
97
98 fn len(&self) -> usize {
99 self.array.len()
100 }
101
102 fn is_null(&self, index: usize) -> bool {
103 self.array.is_null(index)
104 }
105}
106
107pub fn create_properties_accessor(
116 array: &ArrayRef,
117) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
118 match array.data_type() {
119 DataType::Dictionary(key_type, value_type) => {
121 if matches!(key_type.as_ref(), DataType::Int32)
122 && matches!(value_type.as_ref(), DataType::Binary)
123 {
124 let binary_accessor = create_binary_accessor(array)?;
125 Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
126 } else {
127 Err(anyhow!(
128 "Unsupported dictionary format for properties: key={:?}, value={:?}",
129 key_type,
130 value_type
131 ))
132 }
133 }
134
135 DataType::List(field) => {
137 if let DataType::Struct(struct_fields) = field.data_type() {
139 let has_key = struct_fields.iter().any(|f| f.name() == "key");
140 let has_value = struct_fields.iter().any(|f| f.name() == "value");
141
142 if has_key && has_value {
143 let list_array = array
144 .as_any()
145 .downcast_ref::<GenericListArray<i32>>()
146 .ok_or_else(|| anyhow!("Failed to downcast to GenericListArray<i32>"))?
147 .clone();
148 Ok(Box::new(StructArrayAccessor::new(Arc::new(list_array))))
149 } else {
150 Err(anyhow!(
151 "List array does not contain struct with key/value fields"
152 ))
153 }
154 } else {
155 Err(anyhow!(
156 "List array does not contain struct elements: {:?}",
157 field.data_type()
158 ))
159 }
160 }
161
162 DataType::Binary => {
164 let binary_accessor = create_binary_accessor(array)?;
165 Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
166 }
167
168 _ => Err(anyhow!(
169 "Unsupported array type for properties accessor: {:?}",
170 array.data_type()
171 )),
172 }
173}
174
175pub fn properties_column_by_name(
177 batch: &RecordBatch,
178 name: &str,
179) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
180 let column = batch
181 .column_by_name(name)
182 .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
183 create_properties_accessor(column)
184}