micromegas_analytics/properties/
properties_to_dict_udf.rs

1use datafusion::arrow::array::{
2    Array, AsArray, DictionaryArray, GenericListArray, Int32Array, ListBuilder, StringBuilder,
3    StructArray, StructBuilder,
4};
5use datafusion::arrow::datatypes::{DataType, Field, Fields, Int32Type};
6use datafusion::common::{Result, internal_err};
7use datafusion::error::DataFusionError;
8use datafusion::logical_expr::{
9    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
10};
11use micromegas_datafusion_extensions::properties::properties_udf::extract_properties_as_vec;
12use std::any::Any;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[derive(Debug, PartialEq, Eq, Hash)]
17pub struct PropertiesToDict {
18    signature: Signature,
19}
20
21impl PropertiesToDict {
22    pub fn new() -> Self {
23        Self::default()
24    }
25}
26
27impl Default for PropertiesToDict {
28    fn default() -> Self {
29        Self {
30            signature: Signature::exact(
31                vec![DataType::List(Arc::new(Field::new(
32                    "Property",
33                    DataType::Struct(Fields::from(vec![
34                        Field::new("key", DataType::Utf8, false),
35                        Field::new("value", DataType::Utf8, false),
36                    ])),
37                    false,
38                )))],
39                Volatility::Immutable,
40            ),
41        }
42    }
43}
44
45impl ScalarUDFImpl for PropertiesToDict {
46    fn as_any(&self) -> &dyn Any {
47        self
48    }
49
50    fn name(&self) -> &str {
51        "properties_to_dict"
52    }
53
54    fn signature(&self) -> &Signature {
55        &self.signature
56    }
57
58    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
59        Ok(DataType::Dictionary(
60            Box::new(DataType::Int32),
61            Box::new(DataType::List(Arc::new(Field::new(
62                "Property",
63                DataType::Struct(Fields::from(vec![
64                    Field::new("key", DataType::Utf8, false),
65                    Field::new("value", DataType::Utf8, false),
66                ])),
67                false,
68            )))),
69        ))
70    }
71
72    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
73        let args = args.args;
74        if args.len() != 1 {
75            return internal_err!("properties_to_dict expects exactly one argument");
76        }
77
78        match &args[0] {
79            ColumnarValue::Array(array) => {
80                let list_array = array
81                    .as_any()
82                    .downcast_ref::<GenericListArray<i32>>()
83                    .ok_or_else(|| {
84                        DataFusionError::Internal(
85                            "properties_to_dict requires a list array as input".to_string(),
86                        )
87                    })?;
88
89                let dict_array = build_dictionary_from_properties(list_array)?;
90                Ok(ColumnarValue::Array(Arc::new(dict_array)))
91            }
92            ColumnarValue::Scalar(_) => {
93                internal_err!("properties_to_dict does not support scalar inputs")
94            }
95        }
96    }
97}
98
99struct PropertiesDictionaryBuilder {
100    map: HashMap<Vec<(String, String)>, usize>,
101    values_builder: ListBuilder<StructBuilder>,
102    keys: Vec<Option<i32>>,
103}
104
105impl PropertiesDictionaryBuilder {
106    fn new(capacity: usize) -> Self {
107        let prop_struct_fields = vec![
108            Field::new("key", DataType::Utf8, false),
109            Field::new("value", DataType::Utf8, false),
110        ];
111        let prop_field = Arc::new(Field::new(
112            "Property",
113            DataType::Struct(Fields::from(prop_struct_fields.clone())),
114            false,
115        ));
116        let values_builder =
117            ListBuilder::new(StructBuilder::from_fields(prop_struct_fields, capacity))
118                .with_field(prop_field);
119
120        Self {
121            map: HashMap::new(),
122            values_builder,
123            keys: Vec::with_capacity(capacity),
124        }
125    }
126
127    fn append_property_list(&mut self, struct_array: &StructArray) -> Result<()> {
128        let prop_vec = extract_properties_as_vec(struct_array)?;
129
130        match self.map.get(&prop_vec) {
131            Some(&index) => {
132                self.keys.push(Some(index as i32));
133            }
134            None => {
135                let new_index = self.map.len();
136                self.add_to_values(&prop_vec)?;
137                self.map.insert(prop_vec, new_index);
138                self.keys.push(Some(new_index as i32));
139            }
140        }
141        Ok(())
142    }
143
144    fn append_null(&mut self) {
145        self.keys.push(None);
146    }
147
148    fn add_to_values(&mut self, properties: &[(String, String)]) -> Result<()> {
149        let struct_builder = self.values_builder.values();
150        for (key, value) in properties {
151            struct_builder
152                .field_builder::<StringBuilder>(0)
153                .ok_or_else(|| DataFusionError::Internal("Failed to get key builder".to_string()))?
154                .append_value(key);
155            struct_builder
156                .field_builder::<StringBuilder>(1)
157                .ok_or_else(|| {
158                    DataFusionError::Internal("Failed to get value builder".to_string())
159                })?
160                .append_value(value);
161            struct_builder.append(true);
162        }
163        self.values_builder.append(true);
164        Ok(())
165    }
166
167    fn finish(mut self) -> Result<DictionaryArray<Int32Type>> {
168        let keys = Int32Array::from(self.keys);
169        let values = Arc::new(self.values_builder.finish());
170        DictionaryArray::try_new(keys, values)
171            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
172    }
173}
174
175pub fn build_dictionary_from_properties(
176    list_array: &GenericListArray<i32>,
177) -> Result<DictionaryArray<Int32Type>> {
178    let mut builder = PropertiesDictionaryBuilder::new(list_array.len());
179    for i in 0..list_array.len() {
180        if list_array.is_null(i) {
181            builder.append_null();
182        } else {
183            let start = list_array.value_offsets()[i] as usize;
184            let end = list_array.value_offsets()[i + 1] as usize;
185            let sliced_values = list_array.values().slice(start, end - start);
186            let struct_array = sliced_values.as_struct();
187            builder.append_property_list(struct_array)?;
188        }
189    }
190
191    builder.finish()
192}