micromegas_analytics/properties/
properties_to_jsonb_udf.rs1use anyhow::Context;
2use datafusion::arrow::array::{
3 Array, ArrayRef, AsArray, BinaryDictionaryBuilder, DictionaryArray, GenericBinaryArray,
4 GenericListArray, StructArray,
5};
6use datafusion::arrow::datatypes::{DataType, Int32Type};
7use datafusion::common::{Result, internal_err};
8use datafusion::error::DataFusionError;
9use datafusion::logical_expr::{
10 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
11};
12use jsonb::Value;
13use micromegas_tracing::warn;
14use std::any::Any;
15use std::borrow::Cow;
16use std::collections::BTreeMap;
17use std::sync::Arc;
18
19#[derive(Debug, PartialEq, Eq, Hash)]
25pub struct PropertiesToJsonb {
26 signature: Signature,
27}
28
29impl PropertiesToJsonb {
30 pub fn new() -> Self {
31 Self {
32 signature: Signature::any(1, Volatility::Immutable),
33 }
34 }
35}
36
37impl Default for PropertiesToJsonb {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43fn convert_properties_list_to_jsonb(properties: ArrayRef) -> anyhow::Result<Vec<u8>> {
44 let properties: &StructArray = properties.as_struct();
45 let (key_index, _key_field) = properties
46 .fields()
47 .find("key")
48 .with_context(|| "getting key field")?;
49 let (value_index, _value_field) = properties
50 .fields()
51 .find("value")
52 .with_context(|| "getting value field")?;
53
54 let mut map = BTreeMap::new();
55 let key_column = properties.column(key_index).as_string::<i32>();
56 let value_column = properties.column(value_index).as_string::<i32>();
57
58 for i in 0..properties.len() {
59 if key_column.is_null(i) || value_column.is_null(i) {
60 continue; }
62 let key = key_column.value(i);
63 let value = value_column.value(i);
64 map.insert(key.to_string(), Value::String(Cow::Borrowed(value)));
65 }
66
67 let jsonb_object = Value::Object(map);
68 let mut buffer = Vec::new();
69 jsonb_object.write_to_vec(&mut buffer);
70 Ok(buffer)
71}
72
73impl ScalarUDFImpl for PropertiesToJsonb {
74 fn as_any(&self) -> &dyn Any {
75 self
76 }
77
78 fn name(&self) -> &str {
79 "properties_to_jsonb"
80 }
81
82 fn signature(&self) -> &Signature {
83 &self.signature
84 }
85
86 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
87 Ok(DataType::Dictionary(
88 Box::new(DataType::Int32),
89 Box::new(DataType::Binary),
90 ))
91 }
92
93 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
94 let args = ColumnarValue::values_to_arrays(&args.args)?;
95 if args.len() != 1 {
96 return internal_err!("wrong number of arguments to properties_to_jsonb()");
97 }
98
99 match args[0].data_type() {
101 DataType::List(_) => {
102 let prop_lists = args[0]
104 .as_any()
105 .downcast_ref::<GenericListArray<i32>>()
106 .ok_or_else(|| {
107 DataFusionError::Internal("error casting property list".into())
108 })?;
109
110 let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
111 for i in 0..prop_lists.len() {
112 if prop_lists.is_null(i) {
113 dict_builder.append_null();
114 } else {
115 match convert_properties_list_to_jsonb(prop_lists.value(i)) {
116 Ok(jsonb_bytes) => {
117 dict_builder.append_value(&jsonb_bytes);
118 }
119 Err(e) => {
120 warn!(
121 "error converting properties to JSONB at index {}: {:?}",
122 i, e
123 );
124 dict_builder.append_null();
125 }
126 }
127 }
128 }
129 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
130 }
131 DataType::Binary => {
132 let binary_array = args[0]
134 .as_any()
135 .downcast_ref::<GenericBinaryArray<i32>>()
136 .ok_or_else(|| {
137 DataFusionError::Internal("error casting to binary array".into())
138 })?;
139
140 let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
141 for i in 0..binary_array.len() {
142 if binary_array.is_null(i) {
143 dict_builder.append_null();
144 } else {
145 let jsonb_bytes = binary_array.value(i);
146 dict_builder.append_value(jsonb_bytes);
147 }
148 }
149 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
150 }
151 DataType::Dictionary(_, value_type) => {
152 match value_type.as_ref() {
154 DataType::List(_) => {
155 let dict_array = args[0]
157 .as_any()
158 .downcast_ref::<DictionaryArray<Int32Type>>()
159 .ok_or_else(|| {
160 DataFusionError::Internal("error casting dictionary array".into())
161 })?;
162
163 let values_array = dict_array.values();
164 let list_values = values_array
165 .as_any()
166 .downcast_ref::<GenericListArray<i32>>()
167 .ok_or_else(|| {
168 DataFusionError::Internal(
169 "dictionary values are not a list array".into(),
170 )
171 })?;
172
173 let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
174 for i in 0..dict_array.len() {
175 if dict_array.is_null(i) {
176 dict_builder.append_null();
177 } else {
178 let key_index = dict_array.keys().value(i) as usize;
179 if key_index < list_values.len() {
180 let property_list = list_values.value(key_index);
181 match convert_properties_list_to_jsonb(property_list) {
182 Ok(jsonb_bytes) => {
183 dict_builder.append_value(&jsonb_bytes);
184 }
185 Err(e) => {
186 warn!(
187 "error converting properties to JSONB at dict index {}: {:?}",
188 i, e
189 );
190 dict_builder.append_null();
191 }
192 }
193 } else {
194 return internal_err!(
195 "Dictionary key index out of bounds in properties_to_jsonb"
196 );
197 }
198 }
199 }
200 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
201 }
202 DataType::Binary => {
203 Ok(ColumnarValue::Array(args[0].clone()))
205 }
206 _ => internal_err!(
207 "properties_to_jsonb: unsupported dictionary value type, expected List or Binary"
208 ),
209 }
210 }
211 _ => internal_err!(
212 "properties_to_jsonb: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
213 ),
214 }
215 }
216}