micromegas_datafusion_extensions/properties/
properties_udf.rs1use datafusion::arrow::array::{
2 Array, AsArray, DictionaryArray, GenericBinaryArray, GenericListArray, Int32Array, StructArray,
3};
4use datafusion::arrow::datatypes::{DataType, Int32Type};
5use datafusion::common::{Result, internal_err};
6use datafusion::error::DataFusionError;
7use datafusion::logical_expr::{
8 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
9};
10use jsonb::RawJsonb;
11use std::any::Any;
12use std::sync::Arc;
13
14pub fn extract_properties_as_vec(struct_array: &StructArray) -> Result<Vec<(String, String)>> {
15 let mut properties = Vec::with_capacity(struct_array.len());
16 let key_array = struct_array.column(0).as_string::<i32>();
17 let value_array = struct_array.column(1).as_string::<i32>();
18 for i in 0..struct_array.len() {
19 if struct_array.is_valid(i) {
20 let key = key_array.value(i).to_string();
21 let value = value_array.value(i).to_string();
22 properties.push((key, value));
23 }
24 }
25
26 Ok(properties)
27}
28
29pub fn count_jsonb_properties(jsonb_bytes: &[u8]) -> Result<i32> {
30 let jsonb = RawJsonb::new(jsonb_bytes);
31
32 match jsonb.object_keys() {
34 Ok(Some(keys_array)) => {
35 let keys_raw = keys_array.as_raw();
37 match keys_raw.array_length() {
38 Ok(Some(len)) => Ok(len as i32),
39 Ok(None) => Ok(0), Err(e) => Err(DataFusionError::Internal(format!(
41 "Failed to get keys array length: {e:?}"
42 ))),
43 }
44 }
45 Ok(None) => {
46 Ok(0)
48 }
49 Err(e) => Err(DataFusionError::Internal(format!(
50 "Failed to count JSONB properties: {e:?}"
51 ))),
52 }
53}
54
55#[derive(Debug, PartialEq, Eq, Hash)]
57pub struct PropertiesToArray {
58 signature: Signature,
59}
60
61impl PropertiesToArray {
62 pub fn new() -> Self {
63 Self::default()
64 }
65}
66
67impl Default for PropertiesToArray {
68 fn default() -> Self {
69 Self {
70 signature: Signature::any(1, Volatility::Immutable),
71 }
72 }
73}
74
75impl ScalarUDFImpl for PropertiesToArray {
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn name(&self) -> &str {
81 "properties_to_array"
82 }
83
84 fn signature(&self) -> &Signature {
85 &self.signature
86 }
87
88 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
89 match &arg_types[0] {
90 DataType::Dictionary(_, value_type) => Ok(value_type.as_ref().clone()),
91 _ => internal_err!("properties_to_array expects a Dictionary input type"),
92 }
93 }
94
95 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
96 let args = args.args;
97 if args.len() != 1 {
98 return internal_err!("properties_to_array expects exactly one argument");
99 }
100
101 match &args[0] {
102 ColumnarValue::Array(array) => {
103 let dict_array = array
105 .as_any()
106 .downcast_ref::<DictionaryArray<Int32Type>>()
107 .ok_or_else(|| {
108 DataFusionError::Internal(
109 "properties_to_array requires a dictionary array as input".to_string(),
110 )
111 })?;
112
113 use datafusion::arrow::compute::take;
115 let indices = dict_array.keys();
116 let values = dict_array.values();
117
118 let reconstructed = take(values.as_ref(), indices, None)
119 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
120
121 Ok(ColumnarValue::Array(reconstructed))
122 }
123 ColumnarValue::Scalar(_) => {
124 internal_err!("properties_to_array does not support scalar inputs")
125 }
126 }
127 }
128}
129
130#[derive(Debug, PartialEq, Eq, Hash)]
132pub struct PropertiesLength {
133 signature: Signature,
134}
135
136impl PropertiesLength {
137 pub fn new() -> Self {
138 Self::default()
139 }
140}
141
142impl Default for PropertiesLength {
143 fn default() -> Self {
144 Self {
145 signature: Signature::any(1, Volatility::Immutable),
146 }
147 }
148}
149
150impl ScalarUDFImpl for PropertiesLength {
151 fn as_any(&self) -> &dyn Any {
152 self
153 }
154
155 fn name(&self) -> &str {
156 "properties_length"
157 }
158
159 fn signature(&self) -> &Signature {
160 &self.signature
161 }
162
163 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
164 Ok(DataType::Int32)
165 }
166
167 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
168 let args = args.args;
169 if args.len() != 1 {
170 return internal_err!("properties_length expects exactly one argument");
171 }
172
173 match &args[0] {
174 ColumnarValue::Array(array) => {
175 match array.data_type() {
176 DataType::List(_) => {
177 let list_array = array
179 .as_any()
180 .downcast_ref::<GenericListArray<i32>>()
181 .ok_or_else(|| {
182 DataFusionError::Internal(
183 "properties_length: failed to cast to list array".to_string(),
184 )
185 })?;
186
187 let mut lengths = Vec::with_capacity(list_array.len());
188 for i in 0..list_array.len() {
189 if list_array.is_null(i) {
190 lengths.push(None);
191 } else {
192 let start = list_array.value_offsets()[i] as usize;
193 let end = list_array.value_offsets()[i + 1] as usize;
194 lengths.push(Some((end - start) as i32));
195 }
196 }
197
198 let length_array = Int32Array::from(lengths);
199 Ok(ColumnarValue::Array(Arc::new(length_array)))
200 }
201 DataType::Binary => {
202 let binary_array = array
204 .as_any()
205 .downcast_ref::<GenericBinaryArray<i32>>()
206 .ok_or_else(|| {
207 DataFusionError::Internal(
208 "properties_length: failed to cast to binary array".to_string(),
209 )
210 })?;
211
212 let mut lengths = Vec::with_capacity(binary_array.len());
213 for i in 0..binary_array.len() {
214 if binary_array.is_null(i) {
215 lengths.push(None);
216 } else {
217 let jsonb_bytes = binary_array.value(i);
218 match count_jsonb_properties(jsonb_bytes) {
219 Ok(len) => lengths.push(Some(len)),
220 Err(_) => lengths.push(None), }
222 }
223 }
224
225 let length_array = Int32Array::from(lengths);
226 Ok(ColumnarValue::Array(Arc::new(length_array)))
227 }
228 DataType::Dictionary(_, value_type) => {
229 match value_type.as_ref() {
231 DataType::List(_) => {
232 let dict_array = array
233 .as_any()
234 .downcast_ref::<DictionaryArray<Int32Type>>()
235 .ok_or_else(|| {
236 DataFusionError::Internal(
237 "properties_length: failed to cast to dictionary array"
238 .to_string(),
239 )
240 })?;
241
242 let values = dict_array.values();
243 let list_values = values
244 .as_any()
245 .downcast_ref::<GenericListArray<i32>>()
246 .ok_or_else(|| {
247 DataFusionError::Internal(
248 "properties_length: dictionary values are not a list array".to_string(),
249 )
250 })?;
251
252 let mut dict_lengths = Vec::with_capacity(list_values.len());
254 for i in 0..list_values.len() {
255 if list_values.is_null(i) {
256 dict_lengths.push(None);
257 } else {
258 let start = list_values.value_offsets()[i] as usize;
259 let end = list_values.value_offsets()[i + 1] as usize;
260 dict_lengths.push(Some((end - start) as i32));
261 }
262 }
263
264 let keys = dict_array.keys();
266 let mut lengths = Vec::with_capacity(keys.len());
267 for i in 0..keys.len() {
268 if keys.is_null(i) {
269 lengths.push(None);
270 } else {
271 let key_index = keys.value(i) as usize;
272 if key_index < dict_lengths.len() {
273 lengths.push(dict_lengths[key_index]);
274 } else {
275 return internal_err!(
276 "Dictionary key index out of bounds"
277 );
278 }
279 }
280 }
281
282 let length_array = Int32Array::from(lengths);
283 Ok(ColumnarValue::Array(Arc::new(length_array)))
284 }
285 DataType::Binary => {
286 let dict_array = array
288 .as_any()
289 .downcast_ref::<DictionaryArray<Int32Type>>()
290 .ok_or_else(|| {
291 DataFusionError::Internal(
292 "properties_length: failed to cast to dictionary array"
293 .to_string(),
294 )
295 })?;
296
297 let values = dict_array.values();
298 let binary_values = values
299 .as_any()
300 .downcast_ref::<GenericBinaryArray<i32>>()
301 .ok_or_else(|| {
302 DataFusionError::Internal(
303 "properties_length: dictionary values are not a binary array".to_string(),
304 )
305 })?;
306
307 let mut dict_lengths = Vec::with_capacity(binary_values.len());
309 for i in 0..binary_values.len() {
310 if binary_values.is_null(i) {
311 dict_lengths.push(None);
312 } else {
313 let jsonb_bytes = binary_values.value(i);
314 match count_jsonb_properties(jsonb_bytes) {
315 Ok(len) => dict_lengths.push(Some(len)),
316 Err(_) => dict_lengths.push(None), }
318 }
319 }
320
321 let keys = dict_array.keys();
323 let mut lengths = Vec::with_capacity(keys.len());
324 for i in 0..keys.len() {
325 if keys.is_null(i) {
326 lengths.push(None);
327 } else {
328 let key_index = keys.value(i) as usize;
329 if key_index < dict_lengths.len() {
330 lengths.push(dict_lengths[key_index]);
331 } else {
332 return internal_err!(
333 "Dictionary key index out of bounds"
334 );
335 }
336 }
337 }
338
339 let length_array = Int32Array::from(lengths);
340 Ok(ColumnarValue::Array(Arc::new(length_array)))
341 }
342 _ => internal_err!(
343 "properties_length: unsupported dictionary value type, expected List or Binary"
344 ),
345 }
346 }
347 _ => internal_err!(
348 "properties_length: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
349 ),
350 }
351 }
352 ColumnarValue::Scalar(_) => {
353 internal_err!("properties_length does not support scalar inputs")
354 }
355 }
356 }
357}