micromegas_datafusion_extensions/properties/
property_get.rs

1use anyhow::Context;
2use datafusion::arrow::array::{Array, StringDictionaryBuilder};
3use datafusion::arrow::array::{
4    ArrayRef, DictionaryArray, GenericBinaryArray, GenericListArray, StringArray,
5};
6use datafusion::arrow::array::{AsArray, StructArray};
7use datafusion::arrow::datatypes::{DataType, Int32Type};
8use datafusion::common::{Result, internal_err};
9use datafusion::error::DataFusionError;
10use datafusion::logical_expr::{
11    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
12};
13use jsonb::RawJsonb;
14use std::any::Any;
15use std::sync::Arc;
16
17/// A scalar UDF that retrieves a property from a list of properties.
18#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct PropertyGet {
20    signature: Signature,
21}
22
23impl PropertyGet {
24    pub fn new() -> Self {
25        Self {
26            signature: Signature::any(2, Volatility::Immutable),
27        }
28    }
29}
30
31impl Default for PropertyGet {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37fn find_property_in_list(properties: ArrayRef, name: &str) -> anyhow::Result<Option<String>> {
38    let properties: &StructArray = properties.as_struct();
39    let (key_index, _key_field) = properties
40        .fields()
41        .find("key")
42        .with_context(|| "getting key field")?;
43    let (value_index, _value_field) = properties
44        .fields()
45        .find("value")
46        .with_context(|| "getting value field")?;
47    for i in 0..properties.len() {
48        let key = properties.column(key_index).as_string::<i32>().value(i);
49        if key.eq_ignore_ascii_case(name) {
50            let value = properties.column(value_index).as_string::<i32>().value(i);
51            return Ok(Some(value.into()));
52        }
53    }
54    Ok(None)
55}
56
57fn extract_from_jsonb(jsonb_bytes: &[u8], name: &str) -> anyhow::Result<Option<String>> {
58    let jsonb = RawJsonb::new(jsonb_bytes);
59    if let Some(value_jsonb) = jsonb
60        .get_by_name(name, true)
61        .with_context(|| "getting JSONB property by name")?
62    {
63        // The value_jsonb is an OwnedJsonb, convert it to RawJsonb to access its value
64        let raw_value = value_jsonb.as_raw();
65
66        // Try to get the value as a string (handles unescaping properly)
67        if let Some(str_value) = raw_value
68            .as_str()
69            .with_context(|| "extracting string value from JSONB")?
70        {
71            Ok(Some(str_value.to_string()))
72        } else {
73            // If it's not a string, convert it to JSON representation
74            Ok(Some(raw_value.to_string()))
75        }
76    } else {
77        Ok(None)
78    }
79}
80
81impl ScalarUDFImpl for PropertyGet {
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85    fn name(&self) -> &str {
86        "property_get"
87    }
88    fn signature(&self) -> &Signature {
89        &self.signature
90    }
91    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
92        Ok(DataType::Dictionary(
93            Box::new(DataType::Int32),
94            Box::new(DataType::Utf8),
95        ))
96    }
97    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98        let args = ColumnarValue::values_to_arrays(&args.args)?;
99        if args.len() != 2 {
100            return internal_err!("wrong number of arguments to property_get()");
101        }
102
103        let names = args[1]
104            .as_any()
105            .downcast_ref::<StringArray>()
106            .ok_or_else(|| DataFusionError::Execution("downcasting names in PropertyGet".into()))?;
107
108        // Handle both regular arrays and dictionary arrays
109        match args[0].data_type() {
110            DataType::List(_) => {
111                // Handle regular list array
112                let prop_lists = args[0]
113                    .as_any()
114                    .downcast_ref::<GenericListArray<i32>>()
115                    .ok_or_else(|| {
116                        DataFusionError::Internal("error casting property list".into())
117                    })?;
118
119                if prop_lists.len() != names.len() {
120                    return internal_err!("arrays of different lengths in property_get()");
121                }
122
123                let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
124                for i in 0..prop_lists.len() {
125                    let name = names.value(i);
126                    if let Some(value) = find_property_in_list(prop_lists.value(i), name)
127                        .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?
128                    {
129                        dict_builder.append_value(value);
130                    } else {
131                        dict_builder.append_null();
132                    }
133                }
134                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
135            }
136            DataType::Binary => {
137                // Handle non-dictionary JSONB binary array
138                let binary_array = args[0]
139                    .as_any()
140                    .downcast_ref::<GenericBinaryArray<i32>>()
141                    .ok_or_else(|| {
142                        DataFusionError::Internal("error casting to binary array".into())
143                    })?;
144
145                if binary_array.len() != names.len() {
146                    return internal_err!("arrays of different lengths in property_get()");
147                }
148
149                let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
150                for i in 0..binary_array.len() {
151                    if binary_array.is_null(i) {
152                        dict_builder.append_null();
153                    } else {
154                        let jsonb_bytes = binary_array.value(i);
155                        let name = names.value(i);
156                        if let Some(value) = extract_from_jsonb(jsonb_bytes, name).map_err(|e| {
157                            DataFusionError::Internal(format!("JSONB extraction error: {e:?}"))
158                        })? {
159                            dict_builder.append_value(value);
160                        } else {
161                            dict_builder.append_null();
162                        }
163                    }
164                }
165                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
166            }
167            DataType::Dictionary(_, value_type) => {
168                // Handle dictionary array
169                match value_type.as_ref() {
170                    DataType::Binary => {
171                        // Handle dictionary-encoded JSONB (new primary format)
172                        let dict_array = args[0]
173                            .as_any()
174                            .downcast_ref::<DictionaryArray<Int32Type>>()
175                            .ok_or_else(|| {
176                                DataFusionError::Internal("error casting dictionary array".into())
177                            })?;
178
179                        if dict_array.len() != names.len() {
180                            return internal_err!("arrays of different lengths in property_get()");
181                        }
182
183                        let values_array = dict_array.values();
184                        let binary_values = values_array
185                            .as_any()
186                            .downcast_ref::<GenericBinaryArray<i32>>()
187                            .ok_or_else(|| {
188                                DataFusionError::Internal(
189                                    "dictionary values are not a binary array".into(),
190                                )
191                            })?;
192
193                        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
194                        for i in 0..dict_array.len() {
195                            if dict_array.is_null(i) {
196                                dict_builder.append_null();
197                            } else {
198                                let key_index = dict_array.keys().value(i) as usize;
199                                if key_index < binary_values.len() {
200                                    let jsonb_bytes = binary_values.value(key_index);
201                                    let name = names.value(i);
202                                    if let Some(value) = extract_from_jsonb(jsonb_bytes, name)
203                                        .map_err(|e| {
204                                            DataFusionError::Internal(format!(
205                                                "JSONB extraction error: {e:?}"
206                                            ))
207                                        })?
208                                    {
209                                        dict_builder.append_value(value);
210                                    } else {
211                                        dict_builder.append_null();
212                                    }
213                                } else {
214                                    return internal_err!(
215                                        "Dictionary key index out of bounds in property_get"
216                                    );
217                                }
218                            }
219                        }
220                        Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
221                    }
222                    DataType::List(_) => {
223                        let dict_array = args[0]
224                            .as_any()
225                            .downcast_ref::<DictionaryArray<Int32Type>>()
226                            .ok_or_else(|| {
227                                DataFusionError::Internal("error casting dictionary array".into())
228                            })?;
229
230                        if dict_array.len() != names.len() {
231                            return internal_err!("arrays of different lengths in property_get()");
232                        }
233
234                        let values_array = dict_array.values();
235                        let list_values = values_array
236                            .as_any()
237                            .downcast_ref::<GenericListArray<i32>>()
238                            .ok_or_else(|| {
239                                DataFusionError::Internal(
240                                    "dictionary values are not a list array".into(),
241                                )
242                            })?;
243
244                        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
245                        for i in 0..dict_array.len() {
246                            let name = names.value(i);
247
248                            if dict_array.is_null(i) {
249                                dict_builder.append_null();
250                            } else {
251                                let key_index = dict_array.keys().value(i) as usize;
252                                if key_index < list_values.len() {
253                                    let property_list = list_values.value(key_index);
254                                    if let Some(value) = find_property_in_list(property_list, name)
255                                        .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?
256                                    {
257                                        dict_builder.append_value(value);
258                                    } else {
259                                        dict_builder.append_null();
260                                    }
261                                } else {
262                                    return internal_err!(
263                                        "Dictionary key index out of bounds in property_get"
264                                    );
265                                }
266                            }
267                        }
268                        Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
269                    }
270                    _ => internal_err!(
271                        "property_get: unsupported dictionary value type, expected List or Binary"
272                    ),
273                }
274            }
275            _ => internal_err!(
276                "property_get: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
277            ),
278        }
279    }
280}