micromegas_datafusion_extensions/jsonb/
array_length.rs

1use datafusion::arrow::array::{Array, DictionaryArray, GenericBinaryArray, Int64Array};
2use datafusion::arrow::datatypes::{DataType, Int32Type};
3use datafusion::common::{Result, internal_err};
4use datafusion::error::DataFusionError;
5use datafusion::logical_expr::{
6    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::RawJsonb;
9use std::any::Any;
10use std::sync::Arc;
11
12/// A scalar UDF that returns the number of elements in a JSONB array.
13///
14/// Accepts both Binary and Dictionary<Int32, Binary> inputs.
15/// Returns Int64 for arrays, NULL for non-array values.
16#[derive(Debug, PartialEq, Eq, Hash)]
17pub struct JsonbArrayLength {
18    signature: Signature,
19}
20
21impl JsonbArrayLength {
22    pub fn new() -> Self {
23        Self {
24            signature: Signature::any(1, Volatility::Immutable),
25        }
26    }
27}
28
29impl Default for JsonbArrayLength {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35fn extract_array_length_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<i64>> {
36    let jsonb = RawJsonb::new(jsonb_bytes);
37    match jsonb.array_length() {
38        Ok(Some(len)) => Ok(Some(len as i64)),
39        Ok(None) => Ok(None),
40        Err(e) => Err(DataFusionError::External(e.into())),
41    }
42}
43
44impl ScalarUDFImpl for JsonbArrayLength {
45    fn as_any(&self) -> &dyn Any {
46        self
47    }
48
49    fn name(&self) -> &str {
50        "jsonb_array_length"
51    }
52
53    fn signature(&self) -> &Signature {
54        &self.signature
55    }
56
57    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
58        Ok(DataType::Int64)
59    }
60
61    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
62        let args = ColumnarValue::values_to_arrays(&args.args)?;
63        if args.len() != 1 {
64            return internal_err!("wrong number of arguments to jsonb_array_length()");
65        }
66
67        match args[0].data_type() {
68            DataType::Binary => {
69                let binary_array = args[0]
70                    .as_any()
71                    .downcast_ref::<GenericBinaryArray<i32>>()
72                    .ok_or_else(|| {
73                        DataFusionError::Internal("error casting to binary array".into())
74                    })?;
75
76                let mut builder = Int64Array::builder(binary_array.len());
77                for i in 0..binary_array.len() {
78                    if binary_array.is_null(i) {
79                        builder.append_null();
80                    } else {
81                        let jsonb_bytes = binary_array.value(i);
82                        if let Some(value) = extract_array_length_from_jsonb(jsonb_bytes)? {
83                            builder.append_value(value);
84                        } else {
85                            builder.append_null();
86                        }
87                    }
88                }
89                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
90            }
91            DataType::Dictionary(_, value_type)
92                if matches!(value_type.as_ref(), DataType::Binary) =>
93            {
94                let dict_array = args[0]
95                    .as_any()
96                    .downcast_ref::<DictionaryArray<Int32Type>>()
97                    .ok_or_else(|| {
98                        DataFusionError::Internal("error casting dictionary array".into())
99                    })?;
100
101                let binary_values = dict_array
102                    .values()
103                    .as_any()
104                    .downcast_ref::<GenericBinaryArray<i32>>()
105                    .ok_or_else(|| {
106                        DataFusionError::Internal("dictionary values are not a binary array".into())
107                    })?;
108
109                let mut builder = Int64Array::builder(dict_array.len());
110                for i in 0..dict_array.len() {
111                    if dict_array.is_null(i) {
112                        builder.append_null();
113                    } else {
114                        let key_index = dict_array.keys().value(i) as usize;
115                        if key_index < binary_values.len() {
116                            let jsonb_bytes = binary_values.value(key_index);
117                            if let Some(value) = extract_array_length_from_jsonb(jsonb_bytes)? {
118                                builder.append_value(value);
119                            } else {
120                                builder.append_null();
121                            }
122                        } else {
123                            return internal_err!(
124                                "Dictionary key index out of bounds in jsonb_array_length"
125                            );
126                        }
127                    }
128                }
129                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
130            }
131            _ => internal_err!(
132                "jsonb_array_length: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
133            ),
134        }
135    }
136}
137
138/// Creates a user-defined function to get the length of a JSONB array.
139pub fn make_jsonb_array_length_udf() -> ScalarUDF {
140    ScalarUDF::new_from_impl(JsonbArrayLength::new())
141}