micromegas_analytics/properties/
properties_to_dict_udf.rs1use datafusion::arrow::array::{
2 Array, AsArray, DictionaryArray, GenericListArray, Int32Array, ListBuilder, StringBuilder,
3 StructArray, StructBuilder,
4};
5use datafusion::arrow::datatypes::{DataType, Field, Fields, Int32Type};
6use datafusion::common::{Result, internal_err};
7use datafusion::error::DataFusionError;
8use datafusion::logical_expr::{
9 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
10};
11use micromegas_datafusion_extensions::properties::properties_udf::extract_properties_as_vec;
12use std::any::Any;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[derive(Debug, PartialEq, Eq, Hash)]
17pub struct PropertiesToDict {
18 signature: Signature,
19}
20
21impl PropertiesToDict {
22 pub fn new() -> Self {
23 Self::default()
24 }
25}
26
27impl Default for PropertiesToDict {
28 fn default() -> Self {
29 Self {
30 signature: Signature::exact(
31 vec![DataType::List(Arc::new(Field::new(
32 "Property",
33 DataType::Struct(Fields::from(vec![
34 Field::new("key", DataType::Utf8, false),
35 Field::new("value", DataType::Utf8, false),
36 ])),
37 false,
38 )))],
39 Volatility::Immutable,
40 ),
41 }
42 }
43}
44
45impl ScalarUDFImpl for PropertiesToDict {
46 fn as_any(&self) -> &dyn Any {
47 self
48 }
49
50 fn name(&self) -> &str {
51 "properties_to_dict"
52 }
53
54 fn signature(&self) -> &Signature {
55 &self.signature
56 }
57
58 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
59 Ok(DataType::Dictionary(
60 Box::new(DataType::Int32),
61 Box::new(DataType::List(Arc::new(Field::new(
62 "Property",
63 DataType::Struct(Fields::from(vec![
64 Field::new("key", DataType::Utf8, false),
65 Field::new("value", DataType::Utf8, false),
66 ])),
67 false,
68 )))),
69 ))
70 }
71
72 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
73 let args = args.args;
74 if args.len() != 1 {
75 return internal_err!("properties_to_dict expects exactly one argument");
76 }
77
78 match &args[0] {
79 ColumnarValue::Array(array) => {
80 let list_array = array
81 .as_any()
82 .downcast_ref::<GenericListArray<i32>>()
83 .ok_or_else(|| {
84 DataFusionError::Internal(
85 "properties_to_dict requires a list array as input".to_string(),
86 )
87 })?;
88
89 let dict_array = build_dictionary_from_properties(list_array)?;
90 Ok(ColumnarValue::Array(Arc::new(dict_array)))
91 }
92 ColumnarValue::Scalar(_) => {
93 internal_err!("properties_to_dict does not support scalar inputs")
94 }
95 }
96 }
97}
98
99struct PropertiesDictionaryBuilder {
100 map: HashMap<Vec<(String, String)>, usize>,
101 values_builder: ListBuilder<StructBuilder>,
102 keys: Vec<Option<i32>>,
103}
104
105impl PropertiesDictionaryBuilder {
106 fn new(capacity: usize) -> Self {
107 let prop_struct_fields = vec![
108 Field::new("key", DataType::Utf8, false),
109 Field::new("value", DataType::Utf8, false),
110 ];
111 let prop_field = Arc::new(Field::new(
112 "Property",
113 DataType::Struct(Fields::from(prop_struct_fields.clone())),
114 false,
115 ));
116 let values_builder =
117 ListBuilder::new(StructBuilder::from_fields(prop_struct_fields, capacity))
118 .with_field(prop_field);
119
120 Self {
121 map: HashMap::new(),
122 values_builder,
123 keys: Vec::with_capacity(capacity),
124 }
125 }
126
127 fn append_property_list(&mut self, struct_array: &StructArray) -> Result<()> {
128 let prop_vec = extract_properties_as_vec(struct_array)?;
129
130 match self.map.get(&prop_vec) {
131 Some(&index) => {
132 self.keys.push(Some(index as i32));
133 }
134 None => {
135 let new_index = self.map.len();
136 self.add_to_values(&prop_vec)?;
137 self.map.insert(prop_vec, new_index);
138 self.keys.push(Some(new_index as i32));
139 }
140 }
141 Ok(())
142 }
143
144 fn append_null(&mut self) {
145 self.keys.push(None);
146 }
147
148 fn add_to_values(&mut self, properties: &[(String, String)]) -> Result<()> {
149 let struct_builder = self.values_builder.values();
150 for (key, value) in properties {
151 struct_builder
152 .field_builder::<StringBuilder>(0)
153 .ok_or_else(|| DataFusionError::Internal("Failed to get key builder".to_string()))?
154 .append_value(key);
155 struct_builder
156 .field_builder::<StringBuilder>(1)
157 .ok_or_else(|| {
158 DataFusionError::Internal("Failed to get value builder".to_string())
159 })?
160 .append_value(value);
161 struct_builder.append(true);
162 }
163 self.values_builder.append(true);
164 Ok(())
165 }
166
167 fn finish(mut self) -> Result<DictionaryArray<Int32Type>> {
168 let keys = Int32Array::from(self.keys);
169 let values = Arc::new(self.values_builder.finish());
170 DictionaryArray::try_new(keys, values)
171 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
172 }
173}
174
175pub fn build_dictionary_from_properties(
176 list_array: &GenericListArray<i32>,
177) -> Result<DictionaryArray<Int32Type>> {
178 let mut builder = PropertiesDictionaryBuilder::new(list_array.len());
179 for i in 0..list_array.len() {
180 if list_array.is_null(i) {
181 builder.append_null();
182 } else {
183 let start = list_array.value_offsets()[i] as usize;
184 let end = list_array.value_offsets()[i + 1] as usize;
185 let sliced_values = list_array.values().slice(start, end - start);
186 let struct_array = sliced_values.as_struct();
187 builder.append_property_list(struct_array)?;
188 }
189 }
190
191 builder.finish()
192}