micromegas_datafusion_extensions/jsonb/
parse.rs1use datafusion::arrow::array::{Array, BinaryDictionaryBuilder, DictionaryArray, StringArray};
2use datafusion::arrow::datatypes::{DataType, Int32Type};
3use datafusion::common::{Result, internal_err};
4use datafusion::error::DataFusionError;
5use datafusion::logical_expr::{
6 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::parse_value;
9use micromegas_tracing::warn;
10use std::any::Any;
11use std::sync::Arc;
12
13#[derive(Debug, PartialEq, Eq, Hash)]
18pub struct JsonbParse {
19 signature: Signature,
20}
21
22impl JsonbParse {
23 pub fn new() -> Self {
24 Self {
25 signature: Signature::any(1, Volatility::Immutable),
26 }
27 }
28}
29
30impl Default for JsonbParse {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36fn parse_json_to_jsonb(json_str: &str) -> Option<Vec<u8>> {
37 match parse_value(json_str.as_bytes()) {
38 Ok(parsed) => {
39 let mut buffer = vec![];
40 parsed.write_to_vec(&mut buffer);
41 Some(buffer)
42 }
43 Err(e) => {
44 warn!("error parsing json={json_str} error={e:?}");
45 None
46 }
47 }
48}
49
50impl ScalarUDFImpl for JsonbParse {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn name(&self) -> &str {
56 "jsonb_parse"
57 }
58
59 fn signature(&self) -> &Signature {
60 &self.signature
61 }
62
63 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
64 Ok(DataType::Dictionary(
65 Box::new(DataType::Int32),
66 Box::new(DataType::Binary),
67 ))
68 }
69
70 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
71 let args = ColumnarValue::values_to_arrays(&args.args)?;
72 if args.len() != 1 {
73 return internal_err!("wrong number of arguments to jsonb_parse()");
74 }
75
76 match args[0].data_type() {
77 DataType::Utf8 => {
78 let string_array =
79 args[0]
80 .as_any()
81 .downcast_ref::<StringArray>()
82 .ok_or_else(|| {
83 DataFusionError::Internal("error casting to string array".into())
84 })?;
85
86 let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
87 for i in 0..string_array.len() {
88 if string_array.is_null(i) {
89 dict_builder.append_null();
90 } else {
91 let json_str = string_array.value(i);
92 if let Some(jsonb_bytes) = parse_json_to_jsonb(json_str) {
93 dict_builder.append_value(&jsonb_bytes);
94 } else {
95 dict_builder.append_null();
96 }
97 }
98 }
99 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
100 }
101 DataType::Dictionary(_, value_type)
102 if matches!(value_type.as_ref(), DataType::Utf8) =>
103 {
104 let dict_array = args[0]
105 .as_any()
106 .downcast_ref::<DictionaryArray<Int32Type>>()
107 .ok_or_else(|| {
108 DataFusionError::Internal("error casting dictionary array".into())
109 })?;
110
111 let string_values = dict_array
112 .values()
113 .as_any()
114 .downcast_ref::<StringArray>()
115 .ok_or_else(|| {
116 DataFusionError::Internal("dictionary values are not a string array".into())
117 })?;
118
119 let mut dict_builder = BinaryDictionaryBuilder::<Int32Type>::new();
120 for i in 0..dict_array.len() {
121 if dict_array.is_null(i) {
122 dict_builder.append_null();
123 } else {
124 let key_index = dict_array.keys().value(i) as usize;
125 if key_index < string_values.len() {
126 let json_str = string_values.value(key_index);
127 if let Some(jsonb_bytes) = parse_json_to_jsonb(json_str) {
128 dict_builder.append_value(&jsonb_bytes);
129 } else {
130 dict_builder.append_null();
131 }
132 } else {
133 return internal_err!(
134 "Dictionary key index out of bounds in jsonb_parse"
135 );
136 }
137 }
138 }
139 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
140 }
141 _ => internal_err!(
142 "jsonb_parse: unsupported input type, expected Utf8 or Dictionary<Int32, Utf8>"
143 ),
144 }
145 }
146}
147
148pub fn make_jsonb_parse_udf() -> ScalarUDF {
150 ScalarUDF::new_from_impl(JsonbParse::new())
151}