micromegas_datafusion_extensions/properties/
properties_udf.rs

1use datafusion::arrow::array::{
2    Array, AsArray, DictionaryArray, GenericBinaryArray, GenericListArray, Int32Array, StructArray,
3};
4use datafusion::arrow::datatypes::{DataType, Int32Type};
5use datafusion::common::{Result, internal_err};
6use datafusion::error::DataFusionError;
7use datafusion::logical_expr::{
8    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
9};
10use jsonb::RawJsonb;
11use std::any::Any;
12use std::sync::Arc;
13
14pub fn extract_properties_as_vec(struct_array: &StructArray) -> Result<Vec<(String, String)>> {
15    let mut properties = Vec::with_capacity(struct_array.len());
16    let key_array = struct_array.column(0).as_string::<i32>();
17    let value_array = struct_array.column(1).as_string::<i32>();
18    for i in 0..struct_array.len() {
19        if struct_array.is_valid(i) {
20            let key = key_array.value(i).to_string();
21            let value = value_array.value(i).to_string();
22            properties.push((key, value));
23        }
24    }
25
26    Ok(properties)
27}
28
29pub fn count_jsonb_properties(jsonb_bytes: &[u8]) -> Result<i32> {
30    let jsonb = RawJsonb::new(jsonb_bytes);
31
32    // Get object keys and count them using array_length
33    match jsonb.object_keys() {
34        Ok(Some(keys_array)) => {
35            // It's an object, get the array length of the keys
36            let keys_raw = keys_array.as_raw();
37            match keys_raw.array_length() {
38                Ok(Some(len)) => Ok(len as i32),
39                Ok(None) => Ok(0), // Empty array
40                Err(e) => Err(DataFusionError::Internal(format!(
41                    "Failed to get keys array length: {e:?}"
42                ))),
43            }
44        }
45        Ok(None) => {
46            // Not an object (array, scalar, null), return 0
47            Ok(0)
48        }
49        Err(e) => Err(DataFusionError::Internal(format!(
50            "Failed to count JSONB properties: {e:?}"
51        ))),
52    }
53}
54
55// Helper UDF to extract properties array from dictionary for use with standard functions
56#[derive(Debug, PartialEq, Eq, Hash)]
57pub struct PropertiesToArray {
58    signature: Signature,
59}
60
61impl PropertiesToArray {
62    pub fn new() -> Self {
63        Self::default()
64    }
65}
66
67impl Default for PropertiesToArray {
68    fn default() -> Self {
69        Self {
70            signature: Signature::any(1, Volatility::Immutable),
71        }
72    }
73}
74
75impl ScalarUDFImpl for PropertiesToArray {
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn name(&self) -> &str {
81        "properties_to_array"
82    }
83
84    fn signature(&self) -> &Signature {
85        &self.signature
86    }
87
88    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
89        match &arg_types[0] {
90            DataType::Dictionary(_, value_type) => Ok(value_type.as_ref().clone()),
91            _ => internal_err!("properties_to_array expects a Dictionary input type"),
92        }
93    }
94
95    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
96        let args = args.args;
97        if args.len() != 1 {
98            return internal_err!("properties_to_array expects exactly one argument");
99        }
100
101        match &args[0] {
102            ColumnarValue::Array(array) => {
103                // Reconstruct the full array from dictionary
104                let dict_array = array
105                    .as_any()
106                    .downcast_ref::<DictionaryArray<Int32Type>>()
107                    .ok_or_else(|| {
108                        DataFusionError::Internal(
109                            "properties_to_array requires a dictionary array as input".to_string(),
110                        )
111                    })?;
112
113                // Use Arrow's take function to reconstruct the array
114                use datafusion::arrow::compute::take;
115                let indices = dict_array.keys();
116                let values = dict_array.values();
117
118                let reconstructed = take(values.as_ref(), indices, None)
119                    .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
120
121                Ok(ColumnarValue::Array(reconstructed))
122            }
123            ColumnarValue::Scalar(_) => {
124                internal_err!("properties_to_array does not support scalar inputs")
125            }
126        }
127    }
128}
129
130// UDF to get length of properties that works with both regular and dictionary arrays
131#[derive(Debug, PartialEq, Eq, Hash)]
132pub struct PropertiesLength {
133    signature: Signature,
134}
135
136impl PropertiesLength {
137    pub fn new() -> Self {
138        Self::default()
139    }
140}
141
142impl Default for PropertiesLength {
143    fn default() -> Self {
144        Self {
145            signature: Signature::any(1, Volatility::Immutable),
146        }
147    }
148}
149
150impl ScalarUDFImpl for PropertiesLength {
151    fn as_any(&self) -> &dyn Any {
152        self
153    }
154
155    fn name(&self) -> &str {
156        "properties_length"
157    }
158
159    fn signature(&self) -> &Signature {
160        &self.signature
161    }
162
163    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
164        Ok(DataType::Int32)
165    }
166
167    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
168        let args = args.args;
169        if args.len() != 1 {
170            return internal_err!("properties_length expects exactly one argument");
171        }
172
173        match &args[0] {
174            ColumnarValue::Array(array) => {
175                match array.data_type() {
176                    DataType::List(_) => {
177                        // Handle regular list array
178                        let list_array = array
179                            .as_any()
180                            .downcast_ref::<GenericListArray<i32>>()
181                            .ok_or_else(|| {
182                                DataFusionError::Internal(
183                                    "properties_length: failed to cast to list array".to_string(),
184                                )
185                            })?;
186
187                        let mut lengths = Vec::with_capacity(list_array.len());
188                        for i in 0..list_array.len() {
189                            if list_array.is_null(i) {
190                                lengths.push(None);
191                            } else {
192                                let start = list_array.value_offsets()[i] as usize;
193                                let end = list_array.value_offsets()[i + 1] as usize;
194                                lengths.push(Some((end - start) as i32));
195                            }
196                        }
197
198                        let length_array = Int32Array::from(lengths);
199                        Ok(ColumnarValue::Array(Arc::new(length_array)))
200                    }
201                    DataType::Binary => {
202                        // Handle JSONB binary array
203                        let binary_array = array
204                            .as_any()
205                            .downcast_ref::<GenericBinaryArray<i32>>()
206                            .ok_or_else(|| {
207                                DataFusionError::Internal(
208                                    "properties_length: failed to cast to binary array".to_string(),
209                                )
210                            })?;
211
212                        let mut lengths = Vec::with_capacity(binary_array.len());
213                        for i in 0..binary_array.len() {
214                            if binary_array.is_null(i) {
215                                lengths.push(None);
216                            } else {
217                                let jsonb_bytes = binary_array.value(i);
218                                match count_jsonb_properties(jsonb_bytes) {
219                                    Ok(len) => lengths.push(Some(len)),
220                                    Err(_) => lengths.push(None), // Error counting, treat as null
221                                }
222                            }
223                        }
224
225                        let length_array = Int32Array::from(lengths);
226                        Ok(ColumnarValue::Array(Arc::new(length_array)))
227                    }
228                    DataType::Dictionary(_, value_type) => {
229                        // Handle dictionary array
230                        match value_type.as_ref() {
231                            DataType::List(_) => {
232                                let dict_array = array
233                                    .as_any()
234                                    .downcast_ref::<DictionaryArray<Int32Type>>()
235                                    .ok_or_else(|| {
236                                        DataFusionError::Internal(
237                                            "properties_length: failed to cast to dictionary array"
238                                                .to_string(),
239                                        )
240                                    })?;
241
242                                let values = dict_array.values();
243                                let list_values = values
244                                    .as_any()
245                                    .downcast_ref::<GenericListArray<i32>>()
246                                    .ok_or_else(|| {
247                                        DataFusionError::Internal(
248                                            "properties_length: dictionary values are not a list array".to_string(),
249                                        )
250                                    })?;
251
252                                // Pre-compute lengths for each unique value in the dictionary
253                                let mut dict_lengths = Vec::with_capacity(list_values.len());
254                                for i in 0..list_values.len() {
255                                    if list_values.is_null(i) {
256                                        dict_lengths.push(None);
257                                    } else {
258                                        let start = list_values.value_offsets()[i] as usize;
259                                        let end = list_values.value_offsets()[i + 1] as usize;
260                                        dict_lengths.push(Some((end - start) as i32));
261                                    }
262                                }
263
264                                // Map dictionary keys to lengths
265                                let keys = dict_array.keys();
266                                let mut lengths = Vec::with_capacity(keys.len());
267                                for i in 0..keys.len() {
268                                    if keys.is_null(i) {
269                                        lengths.push(None);
270                                    } else {
271                                        let key_index = keys.value(i) as usize;
272                                        if key_index < dict_lengths.len() {
273                                            lengths.push(dict_lengths[key_index]);
274                                        } else {
275                                            return internal_err!(
276                                                "Dictionary key index out of bounds"
277                                            );
278                                        }
279                                    }
280                                }
281
282                                let length_array = Int32Array::from(lengths);
283                                Ok(ColumnarValue::Array(Arc::new(length_array)))
284                            }
285                            DataType::Binary => {
286                                // Handle dictionary-encoded JSONB (primary format)
287                                let dict_array = array
288                                    .as_any()
289                                    .downcast_ref::<DictionaryArray<Int32Type>>()
290                                    .ok_or_else(|| {
291                                        DataFusionError::Internal(
292                                            "properties_length: failed to cast to dictionary array"
293                                                .to_string(),
294                                        )
295                                    })?;
296
297                                let values = dict_array.values();
298                                let binary_values = values
299                                    .as_any()
300                                    .downcast_ref::<GenericBinaryArray<i32>>()
301                                    .ok_or_else(|| {
302                                        DataFusionError::Internal(
303                                            "properties_length: dictionary values are not a binary array".to_string(),
304                                        )
305                                    })?;
306
307                                // Pre-compute lengths for each unique JSONB value in the dictionary
308                                let mut dict_lengths = Vec::with_capacity(binary_values.len());
309                                for i in 0..binary_values.len() {
310                                    if binary_values.is_null(i) {
311                                        dict_lengths.push(None);
312                                    } else {
313                                        let jsonb_bytes = binary_values.value(i);
314                                        match count_jsonb_properties(jsonb_bytes) {
315                                            Ok(len) => dict_lengths.push(Some(len)),
316                                            Err(_) => dict_lengths.push(None), // Error counting, treat as null
317                                        }
318                                    }
319                                }
320
321                                // Map dictionary keys to lengths
322                                let keys = dict_array.keys();
323                                let mut lengths = Vec::with_capacity(keys.len());
324                                for i in 0..keys.len() {
325                                    if keys.is_null(i) {
326                                        lengths.push(None);
327                                    } else {
328                                        let key_index = keys.value(i) as usize;
329                                        if key_index < dict_lengths.len() {
330                                            lengths.push(dict_lengths[key_index]);
331                                        } else {
332                                            return internal_err!(
333                                                "Dictionary key index out of bounds"
334                                            );
335                                        }
336                                    }
337                                }
338
339                                let length_array = Int32Array::from(lengths);
340                                Ok(ColumnarValue::Array(Arc::new(length_array)))
341                            }
342                            _ => internal_err!(
343                                "properties_length: unsupported dictionary value type, expected List or Binary"
344                            ),
345                        }
346                    }
347                    _ => internal_err!(
348                        "properties_length: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
349                    ),
350                }
351            }
352            ColumnarValue::Scalar(_) => {
353                internal_err!("properties_length does not support scalar inputs")
354            }
355        }
356    }
357}