micromegas_analytics/properties/
properties_to_jsonb_udf.rs

1use anyhow::Context;
2use datafusion::arrow::array::{
3    Array, ArrayRef, AsArray, BinaryDictionaryBuilder, DictionaryArray, GenericBinaryArray,
4    GenericListArray, StructArray,
5};
6use datafusion::arrow::datatypes::{DataType, Int32Type};
7use datafusion::common::{Result, internal_err};
8use datafusion::error::DataFusionError;
9use datafusion::logical_expr::{
10    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
11};
12use jsonb::Value;
13use micromegas_tracing::warn;
14use std::any::Any;
15use std::borrow::Cow;
16use std::collections::BTreeMap;
17use std::sync::Arc;
18
19/// A scalar UDF that converts a list of properties to JSONB binary format with dictionary encoding.
20///
21/// Converts List<Struct<key: String, value: String>> to Dictionary<Int32, Binary> (dictionary-encoded JSONB).
22/// The output uses dictionary encoding to optimize storage of repeated property sets.
23/// Each unique JSONB object like {"key1": "value1", "key2": "value2"} is stored once in the dictionary.
24#[derive(Debug, PartialEq, Eq, Hash)]
25pub struct PropertiesToJsonb {
26    signature: Signature,
27}
28
29impl PropertiesToJsonb {
30    pub fn new() -> Self {
31        Self {
32            signature: Signature::any(1, Volatility::Immutable),
33        }
34    }
35}
36
37impl Default for PropertiesToJsonb {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43fn convert_properties_list_to_jsonb(properties: ArrayRef) -> anyhow::Result<Vec<u8>> {
44    let properties: &StructArray = properties.as_struct();
45    let (key_index, _key_field) = properties
46        .fields()
47        .find("key")
48        .with_context(|| "getting key field")?;
49    let (value_index, _value_field) = properties
50        .fields()
51        .find("value")
52        .with_context(|| "getting value field")?;
53
54    let mut map = BTreeMap::new();
55    let key_column = properties.column(key_index).as_string::<i32>();
56    let value_column = properties.column(value_index).as_string::<i32>();
57
58    for i in 0..properties.len() {
59        if key_column.is_null(i) || value_column.is_null(i) {
60            continue; // Skip null entries
61        }
62        let key = key_column.value(i);
63        let value = value_column.value(i);
64        map.insert(key.to_string(), Value::String(Cow::Borrowed(value)));
65    }
66
67    let jsonb_object = Value::Object(map);
68    let mut buffer = Vec::new();
69    jsonb_object.write_to_vec(&mut buffer);
70    Ok(buffer)
71}
72
73impl ScalarUDFImpl for PropertiesToJsonb {
74    fn as_any(&self) -> &dyn Any {
75        self
76    }
77
78    fn name(&self) -> &str {
79        "properties_to_jsonb"
80    }
81
82    fn signature(&self) -> &Signature {
83        &self.signature
84    }
85
86    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
87        Ok(DataType::Dictionary(
88            Box::new(DataType::Int32),
89            Box::new(DataType::Binary),
90        ))
91    }
92
93    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
94        let args = ColumnarValue::values_to_arrays(&args.args)?;
95        if args.len() != 1 {
96            return internal_err!("wrong number of arguments to properties_to_jsonb()");
97        }
98
99        // Handle all input formats and return Dictionary<Int32, Binary>
100        match args[0].data_type() {
101            DataType::List(_) => {
102                // Handle regular list array - convert to dictionary-encoded JSONB
103                let prop_lists = args[0]
104                    .as_any()
105                    .downcast_ref::<GenericListArray<i32>>()
106                    .ok_or_else(|| {
107                        DataFusionError::Internal("error casting property list".into())
108                    })?;
109
110                let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
111                for i in 0..prop_lists.len() {
112                    if prop_lists.is_null(i) {
113                        dict_builder.append_null();
114                    } else {
115                        match convert_properties_list_to_jsonb(prop_lists.value(i)) {
116                            Ok(jsonb_bytes) => {
117                                dict_builder.append_value(&jsonb_bytes);
118                            }
119                            Err(e) => {
120                                warn!(
121                                    "error converting properties to JSONB at index {}: {:?}",
122                                    i, e
123                                );
124                                dict_builder.append_null();
125                            }
126                        }
127                    }
128                }
129                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
130            }
131            DataType::Binary => {
132                // Pass-through optimization: already JSONB, just need to add dictionary encoding
133                let binary_array = args[0]
134                    .as_any()
135                    .downcast_ref::<GenericBinaryArray<i32>>()
136                    .ok_or_else(|| {
137                        DataFusionError::Internal("error casting to binary array".into())
138                    })?;
139
140                let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
141                for i in 0..binary_array.len() {
142                    if binary_array.is_null(i) {
143                        dict_builder.append_null();
144                    } else {
145                        let jsonb_bytes = binary_array.value(i);
146                        dict_builder.append_value(jsonb_bytes);
147                    }
148                }
149                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
150            }
151            DataType::Dictionary(_, value_type) => {
152                // Handle dictionary array
153                match value_type.as_ref() {
154                    DataType::List(_) => {
155                        // Convert dictionary-encoded List<Struct> to dictionary-encoded JSONB
156                        let dict_array = args[0]
157                            .as_any()
158                            .downcast_ref::<DictionaryArray<Int32Type>>()
159                            .ok_or_else(|| {
160                                DataFusionError::Internal("error casting dictionary array".into())
161                            })?;
162
163                        let values_array = dict_array.values();
164                        let list_values = values_array
165                            .as_any()
166                            .downcast_ref::<GenericListArray<i32>>()
167                            .ok_or_else(|| {
168                                DataFusionError::Internal(
169                                    "dictionary values are not a list array".into(),
170                                )
171                            })?;
172
173                        let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
174                        for i in 0..dict_array.len() {
175                            if dict_array.is_null(i) {
176                                dict_builder.append_null();
177                            } else {
178                                let key_index = dict_array.keys().value(i) as usize;
179                                if key_index < list_values.len() {
180                                    let property_list = list_values.value(key_index);
181                                    match convert_properties_list_to_jsonb(property_list) {
182                                        Ok(jsonb_bytes) => {
183                                            dict_builder.append_value(&jsonb_bytes);
184                                        }
185                                        Err(e) => {
186                                            warn!(
187                                                "error converting properties to JSONB at dict index {}: {:?}",
188                                                i, e
189                                            );
190                                            dict_builder.append_null();
191                                        }
192                                    }
193                                } else {
194                                    return internal_err!(
195                                        "Dictionary key index out of bounds in properties_to_jsonb"
196                                    );
197                                }
198                            }
199                        }
200                        Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
201                    }
202                    DataType::Binary => {
203                        // Pass-through optimization: already dictionary-encoded JSONB
204                        Ok(ColumnarValue::Array(args[0].clone()))
205                    }
206                    _ => internal_err!(
207                        "properties_to_jsonb: unsupported dictionary value type, expected List or Binary"
208                    ),
209                }
210            }
211            _ => internal_err!(
212                "properties_to_jsonb: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
213            ),
214        }
215    }
216}