micromegas_datafusion_extensions/jsonb/
parse.rs

1use datafusion::arrow::array::{Array, BinaryDictionaryBuilder, DictionaryArray, StringArray};
2use datafusion::arrow::datatypes::{DataType, Int32Type};
3use datafusion::common::{Result, internal_err};
4use datafusion::error::DataFusionError;
5use datafusion::logical_expr::{
6    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::parse_value;
9use micromegas_tracing::warn;
10use std::any::Any;
11use std::sync::Arc;
12
13/// A scalar UDF that parses a JSON string into a JSONB value.
14///
15/// Accepts both Utf8 and Dictionary<Int32, Utf8> inputs.
16/// Returns Dictionary<Int32, Binary> for memory efficiency.
17#[derive(Debug, PartialEq, Eq, Hash)]
18pub struct JsonbParse {
19    signature: Signature,
20}
21
22impl JsonbParse {
23    pub fn new() -> Self {
24        Self {
25            signature: Signature::any(1, Volatility::Immutable),
26        }
27    }
28}
29
30impl Default for JsonbParse {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36fn parse_json_to_jsonb(json_str: &str) -> Option<Vec<u8>> {
37    match parse_value(json_str.as_bytes()) {
38        Ok(parsed) => {
39            let mut buffer = vec![];
40            parsed.write_to_vec(&mut buffer);
41            Some(buffer)
42        }
43        Err(e) => {
44            warn!("error parsing json={json_str} error={e:?}");
45            None
46        }
47    }
48}
49
50impl ScalarUDFImpl for JsonbParse {
51    fn as_any(&self) -> &dyn Any {
52        self
53    }
54
55    fn name(&self) -> &str {
56        "jsonb_parse"
57    }
58
59    fn signature(&self) -> &Signature {
60        &self.signature
61    }
62
63    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
64        Ok(DataType::Dictionary(
65            Box::new(DataType::Int32),
66            Box::new(DataType::Binary),
67        ))
68    }
69
70    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
71        let args = ColumnarValue::values_to_arrays(&args.args)?;
72        if args.len() != 1 {
73            return internal_err!("wrong number of arguments to jsonb_parse()");
74        }
75
76        match args[0].data_type() {
77            DataType::Utf8 => {
78                let string_array =
79                    args[0]
80                        .as_any()
81                        .downcast_ref::<StringArray>()
82                        .ok_or_else(|| {
83                            DataFusionError::Internal("error casting to string array".into())
84                        })?;
85
86                let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
87                for i in 0..string_array.len() {
88                    if string_array.is_null(i) {
89                        dict_builder.append_null();
90                    } else {
91                        let json_str = string_array.value(i);
92                        if let Some(jsonb_bytes) = parse_json_to_jsonb(json_str) {
93                            dict_builder.append_value(&jsonb_bytes);
94                        } else {
95                            dict_builder.append_null();
96                        }
97                    }
98                }
99                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
100            }
101            DataType::Dictionary(_, value_type)
102                if matches!(value_type.as_ref(), DataType::Utf8) =>
103            {
104                let dict_array = args[0]
105                    .as_any()
106                    .downcast_ref::<DictionaryArray<Int32Type>>()
107                    .ok_or_else(|| {
108                        DataFusionError::Internal("error casting dictionary array".into())
109                    })?;
110
111                let string_values = dict_array
112                    .values()
113                    .as_any()
114                    .downcast_ref::<StringArray>()
115                    .ok_or_else(|| {
116                        DataFusionError::Internal("dictionary values are not a string array".into())
117                    })?;
118
119                let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
120                for i in 0..dict_array.len() {
121                    if dict_array.is_null(i) {
122                        dict_builder.append_null();
123                    } else {
124                        let key_index = dict_array.keys().value(i) as usize;
125                        if key_index < string_values.len() {
126                            let json_str = string_values.value(key_index);
127                            if let Some(jsonb_bytes) = parse_json_to_jsonb(json_str) {
128                                dict_builder.append_value(&jsonb_bytes);
129                            } else {
130                                dict_builder.append_null();
131                            }
132                        } else {
133                            return internal_err!(
134                                "Dictionary key index out of bounds in jsonb_parse"
135                            );
136                        }
137                    }
138                }
139                Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
140            }
141            _ => internal_err!(
142                "jsonb_parse: unsupported input type, expected Utf8 or Dictionary<Int32, Utf8>"
143            ),
144        }
145    }
146}
147
148/// Creates a user-defined function to parse a JSON string into a JSONB value.
149pub fn make_jsonb_parse_udf() -> ScalarUDF {
150    ScalarUDF::new_from_impl(JsonbParse::new())
151}