micromegas_datafusion_extensions/jsonb/
cast.rs

1use datafusion::arrow::array::{
2    Array, DictionaryArray, Float64Array, GenericBinaryArray, Int64Array, StringDictionaryBuilder,
3};
4use datafusion::arrow::datatypes::{DataType, Int32Type};
5use datafusion::common::{Result, internal_err};
6use datafusion::error::DataFusionError;
7use datafusion::logical_expr::{
8    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
9};
10use jsonb::RawJsonb;
11use std::any::Any;
12use std::sync::Arc;
13
14/// A scalar UDF that casts a JSONB value to a string.
15///
16/// Accepts both Binary and Dictionary<Int32, Binary> inputs.
17/// Returns Dictionary<Int32, Utf8> for memory efficiency.
18#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct JsonbAsString {
20    signature: Signature,
21}
22
23impl JsonbAsString {
24    pub fn new() -> Self {
25        Self {
26            signature: Signature::any(1, Volatility::Immutable),
27        }
28    }
29}
30
31impl Default for JsonbAsString {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37fn extract_string_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<String>> {
38    let jsonb = RawJsonb::new(jsonb_bytes);
39    match jsonb.as_str() {
40        Ok(Some(value)) => Ok(Some(value.to_string())),
41        Ok(None) => Ok(None),
42        Err(e) => Err(DataFusionError::External(e.into())),
43    }
44}
45
46impl ScalarUDFImpl for JsonbAsString {
47    fn as_any(&self) -> &dyn Any {
48        self
49    }
50
51    fn name(&self) -> &str {
52        "jsonb_as_string"
53    }
54
55    fn signature(&self) -> &Signature {
56        &self.signature
57    }
58
59    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
60        Ok(DataType::Dictionary(
61            Box::new(DataType::Int32),
62            Box::new(DataType::Utf8),
63        ))
64    }
65
66    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
67        let args = ColumnarValue::values_to_arrays(&args.args)?;
68        if args.len() != 1 {
69            return internal_err!("wrong number of arguments to jsonb_as_string()");
70        }
71
72        match args[0].data_type() {
73            DataType::Binary => {
74                let binary_array = args[0]
75                    .as_any()
76                    .downcast_ref::<GenericBinaryArray<i32>>()
77                    .ok_or_else(|| {
78                        DataFusionError::Internal("error casting to binary array".into())
79                    })?;
80
81                let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
82                for i in 0..binary_array.len() {
83                    if binary_array.is_null(i) {
84                        dict_builder.append_null();
85                    } else {
86                        let jsonb_bytes = binary_array.value(i);
87                        if let Some(value) = extract_string_from_jsonb(jsonb_bytes)? {
88                            dict_builder.append_value(value);
89                        } else {
90                            dict_builder.append_null();
91                        }
92                    }
93                }
94                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
95            }
96            DataType::Dictionary(_, value_type)
97                if matches!(value_type.as_ref(), DataType::Binary) =>
98            {
99                let dict_array = args[0]
100                    .as_any()
101                    .downcast_ref::<DictionaryArray<Int32Type>>()
102                    .ok_or_else(|| {
103                        DataFusionError::Internal("error casting dictionary array".into())
104                    })?;
105
106                let binary_values = dict_array
107                    .values()
108                    .as_any()
109                    .downcast_ref::<GenericBinaryArray<i32>>()
110                    .ok_or_else(|| {
111                        DataFusionError::Internal("dictionary values are not a binary array".into())
112                    })?;
113
114                let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
115                for i in 0..dict_array.len() {
116                    if dict_array.is_null(i) {
117                        dict_builder.append_null();
118                    } else {
119                        let key_index = dict_array.keys().value(i) as usize;
120                        if key_index < binary_values.len() {
121                            let jsonb_bytes = binary_values.value(key_index);
122                            if let Some(value) = extract_string_from_jsonb(jsonb_bytes)? {
123                                dict_builder.append_value(value);
124                            } else {
125                                dict_builder.append_null();
126                            }
127                        } else {
128                            return internal_err!(
129                                "Dictionary key index out of bounds in jsonb_as_string"
130                            );
131                        }
132                    }
133                }
134                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
135            }
136            _ => internal_err!(
137                "jsonb_as_string: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
138            ),
139        }
140    }
141}
142
143/// Creates a user-defined function to cast a JSONB value to a string.
144pub fn make_jsonb_as_string_udf() -> ScalarUDF {
145    ScalarUDF::new_from_impl(JsonbAsString::new())
146}
147
148/// A scalar UDF that casts a JSONB value to a f64.
149///
150/// Accepts both Binary and Dictionary<Int32, Binary> inputs.
151/// Returns Float64.
152#[derive(Debug, PartialEq, Eq, Hash)]
153pub struct JsonbAsF64 {
154    signature: Signature,
155}
156
157impl JsonbAsF64 {
158    pub fn new() -> Self {
159        Self {
160            signature: Signature::any(1, Volatility::Immutable),
161        }
162    }
163}
164
165impl Default for JsonbAsF64 {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171fn extract_f64_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<f64>> {
172    let jsonb = RawJsonb::new(jsonb_bytes);
173    match jsonb.as_f64() {
174        Ok(value) => Ok(value),
175        Err(e) => Err(DataFusionError::External(e.into())),
176    }
177}
178
179impl ScalarUDFImpl for JsonbAsF64 {
180    fn as_any(&self) -> &dyn Any {
181        self
182    }
183
184    fn name(&self) -> &str {
185        "jsonb_as_f64"
186    }
187
188    fn signature(&self) -> &Signature {
189        &self.signature
190    }
191
192    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
193        Ok(DataType::Float64)
194    }
195
196    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
197        let args = ColumnarValue::values_to_arrays(&args.args)?;
198        if args.len() != 1 {
199            return internal_err!("wrong number of arguments to jsonb_as_f64()");
200        }
201
202        match args[0].data_type() {
203            DataType::Binary => {
204                let binary_array = args[0]
205                    .as_any()
206                    .downcast_ref::<GenericBinaryArray<i32>>()
207                    .ok_or_else(|| {
208                        DataFusionError::Internal("error casting to binary array".into())
209                    })?;
210
211                let mut builder = Float64Array::builder(binary_array.len());
212                for i in 0..binary_array.len() {
213                    if binary_array.is_null(i) {
214                        builder.append_null();
215                    } else {
216                        let jsonb_bytes = binary_array.value(i);
217                        if let Some(value) = extract_f64_from_jsonb(jsonb_bytes)? {
218                            builder.append_value(value);
219                        } else {
220                            builder.append_null();
221                        }
222                    }
223                }
224                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
225            }
226            DataType::Dictionary(_, value_type)
227                if matches!(value_type.as_ref(), DataType::Binary) =>
228            {
229                let dict_array = args[0]
230                    .as_any()
231                    .downcast_ref::<DictionaryArray<Int32Type>>()
232                    .ok_or_else(|| {
233                        DataFusionError::Internal("error casting dictionary array".into())
234                    })?;
235
236                let binary_values = dict_array
237                    .values()
238                    .as_any()
239                    .downcast_ref::<GenericBinaryArray<i32>>()
240                    .ok_or_else(|| {
241                        DataFusionError::Internal("dictionary values are not a binary array".into())
242                    })?;
243
244                let mut builder = Float64Array::builder(dict_array.len());
245                for i in 0..dict_array.len() {
246                    if dict_array.is_null(i) {
247                        builder.append_null();
248                    } else {
249                        let key_index = dict_array.keys().value(i) as usize;
250                        if key_index < binary_values.len() {
251                            let jsonb_bytes = binary_values.value(key_index);
252                            if let Some(value) = extract_f64_from_jsonb(jsonb_bytes)? {
253                                builder.append_value(value);
254                            } else {
255                                builder.append_null();
256                            }
257                        } else {
258                            return internal_err!(
259                                "Dictionary key index out of bounds in jsonb_as_f64"
260                            );
261                        }
262                    }
263                }
264                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
265            }
266            _ => internal_err!(
267                "jsonb_as_f64: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
268            ),
269        }
270    }
271}
272
273/// Creates a user-defined function to cast a JSONB value to a f64.
274pub fn make_jsonb_as_f64_udf() -> ScalarUDF {
275    ScalarUDF::new_from_impl(JsonbAsF64::new())
276}
277
278/// A scalar UDF that casts a JSONB value to an i64.
279///
280/// Accepts both Binary and Dictionary<Int32, Binary> inputs.
281/// Returns Int64.
282#[derive(Debug, PartialEq, Eq, Hash)]
283pub struct JsonbAsI64 {
284    signature: Signature,
285}
286
287impl JsonbAsI64 {
288    pub fn new() -> Self {
289        Self {
290            signature: Signature::any(1, Volatility::Immutable),
291        }
292    }
293}
294
295impl Default for JsonbAsI64 {
296    fn default() -> Self {
297        Self::new()
298    }
299}
300
301fn extract_i64_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<i64>> {
302    let jsonb = RawJsonb::new(jsonb_bytes);
303    match jsonb.as_i64() {
304        Ok(value) => Ok(value),
305        Err(e) => Err(DataFusionError::External(e.into())),
306    }
307}
308
309impl ScalarUDFImpl for JsonbAsI64 {
310    fn as_any(&self) -> &dyn Any {
311        self
312    }
313
314    fn name(&self) -> &str {
315        "jsonb_as_i64"
316    }
317
318    fn signature(&self) -> &Signature {
319        &self.signature
320    }
321
322    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
323        Ok(DataType::Int64)
324    }
325
326    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
327        let args = ColumnarValue::values_to_arrays(&args.args)?;
328        if args.len() != 1 {
329            return internal_err!("wrong number of arguments to jsonb_as_i64()");
330        }
331
332        match args[0].data_type() {
333            DataType::Binary => {
334                let binary_array = args[0]
335                    .as_any()
336                    .downcast_ref::<GenericBinaryArray<i32>>()
337                    .ok_or_else(|| {
338                        DataFusionError::Internal("error casting to binary array".into())
339                    })?;
340
341                let mut builder = Int64Array::builder(binary_array.len());
342                for i in 0..binary_array.len() {
343                    if binary_array.is_null(i) {
344                        builder.append_null();
345                    } else {
346                        let jsonb_bytes = binary_array.value(i);
347                        if let Some(value) = extract_i64_from_jsonb(jsonb_bytes)? {
348                            builder.append_value(value);
349                        } else {
350                            builder.append_null();
351                        }
352                    }
353                }
354                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
355            }
356            DataType::Dictionary(_, value_type)
357                if matches!(value_type.as_ref(), DataType::Binary) =>
358            {
359                let dict_array = args[0]
360                    .as_any()
361                    .downcast_ref::<DictionaryArray<Int32Type>>()
362                    .ok_or_else(|| {
363                        DataFusionError::Internal("error casting dictionary array".into())
364                    })?;
365
366                let binary_values = dict_array
367                    .values()
368                    .as_any()
369                    .downcast_ref::<GenericBinaryArray<i32>>()
370                    .ok_or_else(|| {
371                        DataFusionError::Internal("dictionary values are not a binary array".into())
372                    })?;
373
374                let mut builder = Int64Array::builder(dict_array.len());
375                for i in 0..dict_array.len() {
376                    if dict_array.is_null(i) {
377                        builder.append_null();
378                    } else {
379                        let key_index = dict_array.keys().value(i) as usize;
380                        if key_index < binary_values.len() {
381                            let jsonb_bytes = binary_values.value(key_index);
382                            if let Some(value) = extract_i64_from_jsonb(jsonb_bytes)? {
383                                builder.append_value(value);
384                            } else {
385                                builder.append_null();
386                            }
387                        } else {
388                            return internal_err!(
389                                "Dictionary key index out of bounds in jsonb_as_i64"
390                            );
391                        }
392                    }
393                }
394                Ok(ColumnarValue::Array(Arc::new(builder.finish())))
395            }
396            _ => internal_err!(
397                "jsonb_as_i64: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
398            ),
399        }
400    }
401}
402
403/// Creates a user-defined function to cast a JSONB value to an i64.
404pub fn make_jsonb_as_i64_udf() -> ScalarUDF {
405    ScalarUDF::new_from_impl(JsonbAsI64::new())
406}