micromegas_datafusion_extensions/jsonb/
array_length.rs1use datafusion::arrow::array::{Array, DictionaryArray, GenericBinaryArray, Int64Array};
2use datafusion::arrow::datatypes::{DataType, Int32Type};
3use datafusion::common::{Result, internal_err};
4use datafusion::error::DataFusionError;
5use datafusion::logical_expr::{
6 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::RawJsonb;
9use std::any::Any;
10use std::sync::Arc;
11
12#[derive(Debug, PartialEq, Eq, Hash)]
17pub struct JsonbArrayLength {
18 signature: Signature,
19}
20
21impl JsonbArrayLength {
22 pub fn new() -> Self {
23 Self {
24 signature: Signature::any(1, Volatility::Immutable),
25 }
26 }
27}
28
29impl Default for JsonbArrayLength {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35fn extract_array_length_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<i64>> {
36 let jsonb = RawJsonb::new(jsonb_bytes);
37 match jsonb.array_length() {
38 Ok(Some(len)) => Ok(Some(len as i64)),
39 Ok(None) => Ok(None),
40 Err(e) => Err(DataFusionError::External(e.into())),
41 }
42}
43
44impl ScalarUDFImpl for JsonbArrayLength {
45 fn as_any(&self) -> &dyn Any {
46 self
47 }
48
49 fn name(&self) -> &str {
50 "jsonb_array_length"
51 }
52
53 fn signature(&self) -> &Signature {
54 &self.signature
55 }
56
57 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
58 Ok(DataType::Int64)
59 }
60
61 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
62 let args = ColumnarValue::values_to_arrays(&args.args)?;
63 if args.len() != 1 {
64 return internal_err!("wrong number of arguments to jsonb_array_length()");
65 }
66
67 match args[0].data_type() {
68 DataType::Binary => {
69 let binary_array = args[0]
70 .as_any()
71 .downcast_ref::<GenericBinaryArray<i32>>()
72 .ok_or_else(|| {
73 DataFusionError::Internal("error casting to binary array".into())
74 })?;
75
76 let mut builder = Int64Array::builder(binary_array.len());
77 for i in 0..binary_array.len() {
78 if binary_array.is_null(i) {
79 builder.append_null();
80 } else {
81 let jsonb_bytes = binary_array.value(i);
82 if let Some(value) = extract_array_length_from_jsonb(jsonb_bytes)? {
83 builder.append_value(value);
84 } else {
85 builder.append_null();
86 }
87 }
88 }
89 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
90 }
91 DataType::Dictionary(_, value_type)
92 if matches!(value_type.as_ref(), DataType::Binary) =>
93 {
94 let dict_array = args[0]
95 .as_any()
96 .downcast_ref::<DictionaryArray<Int32Type>>()
97 .ok_or_else(|| {
98 DataFusionError::Internal("error casting dictionary array".into())
99 })?;
100
101 let binary_values = dict_array
102 .values()
103 .as_any()
104 .downcast_ref::<GenericBinaryArray<i32>>()
105 .ok_or_else(|| {
106 DataFusionError::Internal("dictionary values are not a binary array".into())
107 })?;
108
109 let mut builder = Int64Array::builder(dict_array.len());
110 for i in 0..dict_array.len() {
111 if dict_array.is_null(i) {
112 builder.append_null();
113 } else {
114 let key_index = dict_array.keys().value(i) as usize;
115 if key_index < binary_values.len() {
116 let jsonb_bytes = binary_values.value(key_index);
117 if let Some(value) = extract_array_length_from_jsonb(jsonb_bytes)? {
118 builder.append_value(value);
119 } else {
120 builder.append_null();
121 }
122 } else {
123 return internal_err!(
124 "Dictionary key index out of bounds in jsonb_array_length"
125 );
126 }
127 }
128 }
129 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
130 }
131 _ => internal_err!(
132 "jsonb_array_length: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
133 ),
134 }
135 }
136}
137
138pub fn make_jsonb_array_length_udf() -> ScalarUDF {
140 ScalarUDF::new_from_impl(JsonbArrayLength::new())
141}