micromegas_datafusion_extensions/jsonb/
cast.rs1use datafusion::arrow::array::{
2 Array, DictionaryArray, Float64Array, GenericBinaryArray, Int64Array, StringDictionaryBuilder,
3};
4use datafusion::arrow::datatypes::{DataType, Int32Type};
5use datafusion::common::{Result, internal_err};
6use datafusion::error::DataFusionError;
7use datafusion::logical_expr::{
8 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
9};
10use jsonb::RawJsonb;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct JsonbAsString {
20 signature: Signature,
21}
22
23impl JsonbAsString {
24 pub fn new() -> Self {
25 Self {
26 signature: Signature::any(1, Volatility::Immutable),
27 }
28 }
29}
30
31impl Default for JsonbAsString {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37fn extract_string_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<String>> {
38 let jsonb = RawJsonb::new(jsonb_bytes);
39 match jsonb.as_str() {
40 Ok(Some(value)) => Ok(Some(value.to_string())),
41 Ok(None) => Ok(None),
42 Err(e) => Err(DataFusionError::External(e.into())),
43 }
44}
45
46impl ScalarUDFImpl for JsonbAsString {
47 fn as_any(&self) -> &dyn Any {
48 self
49 }
50
51 fn name(&self) -> &str {
52 "jsonb_as_string"
53 }
54
55 fn signature(&self) -> &Signature {
56 &self.signature
57 }
58
59 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
60 Ok(DataType::Dictionary(
61 Box::new(DataType::Int32),
62 Box::new(DataType::Utf8),
63 ))
64 }
65
66 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
67 let args = ColumnarValue::values_to_arrays(&args.args)?;
68 if args.len() != 1 {
69 return internal_err!("wrong number of arguments to jsonb_as_string()");
70 }
71
72 match args[0].data_type() {
73 DataType::Binary => {
74 let binary_array = args[0]
75 .as_any()
76 .downcast_ref::<GenericBinaryArray<i32>>()
77 .ok_or_else(|| {
78 DataFusionError::Internal("error casting to binary array".into())
79 })?;
80
81 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
82 for i in 0..binary_array.len() {
83 if binary_array.is_null(i) {
84 dict_builder.append_null();
85 } else {
86 let jsonb_bytes = binary_array.value(i);
87 if let Some(value) = extract_string_from_jsonb(jsonb_bytes)? {
88 dict_builder.append_value(value);
89 } else {
90 dict_builder.append_null();
91 }
92 }
93 }
94 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
95 }
96 DataType::Dictionary(_, value_type)
97 if matches!(value_type.as_ref(), DataType::Binary) =>
98 {
99 let dict_array = args[0]
100 .as_any()
101 .downcast_ref::<DictionaryArray<Int32Type>>()
102 .ok_or_else(|| {
103 DataFusionError::Internal("error casting dictionary array".into())
104 })?;
105
106 let binary_values = dict_array
107 .values()
108 .as_any()
109 .downcast_ref::<GenericBinaryArray<i32>>()
110 .ok_or_else(|| {
111 DataFusionError::Internal("dictionary values are not a binary array".into())
112 })?;
113
114 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
115 for i in 0..dict_array.len() {
116 if dict_array.is_null(i) {
117 dict_builder.append_null();
118 } else {
119 let key_index = dict_array.keys().value(i) as usize;
120 if key_index < binary_values.len() {
121 let jsonb_bytes = binary_values.value(key_index);
122 if let Some(value) = extract_string_from_jsonb(jsonb_bytes)? {
123 dict_builder.append_value(value);
124 } else {
125 dict_builder.append_null();
126 }
127 } else {
128 return internal_err!(
129 "Dictionary key index out of bounds in jsonb_as_string"
130 );
131 }
132 }
133 }
134 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
135 }
136 _ => internal_err!(
137 "jsonb_as_string: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
138 ),
139 }
140 }
141}
142
143pub fn make_jsonb_as_string_udf() -> ScalarUDF {
145 ScalarUDF::new_from_impl(JsonbAsString::new())
146}
147
148#[derive(Debug, PartialEq, Eq, Hash)]
153pub struct JsonbAsF64 {
154 signature: Signature,
155}
156
157impl JsonbAsF64 {
158 pub fn new() -> Self {
159 Self {
160 signature: Signature::any(1, Volatility::Immutable),
161 }
162 }
163}
164
165impl Default for JsonbAsF64 {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171fn extract_f64_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<f64>> {
172 let jsonb = RawJsonb::new(jsonb_bytes);
173 match jsonb.as_f64() {
174 Ok(value) => Ok(value),
175 Err(e) => Err(DataFusionError::External(e.into())),
176 }
177}
178
179impl ScalarUDFImpl for JsonbAsF64 {
180 fn as_any(&self) -> &dyn Any {
181 self
182 }
183
184 fn name(&self) -> &str {
185 "jsonb_as_f64"
186 }
187
188 fn signature(&self) -> &Signature {
189 &self.signature
190 }
191
192 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
193 Ok(DataType::Float64)
194 }
195
196 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
197 let args = ColumnarValue::values_to_arrays(&args.args)?;
198 if args.len() != 1 {
199 return internal_err!("wrong number of arguments to jsonb_as_f64()");
200 }
201
202 match args[0].data_type() {
203 DataType::Binary => {
204 let binary_array = args[0]
205 .as_any()
206 .downcast_ref::<GenericBinaryArray<i32>>()
207 .ok_or_else(|| {
208 DataFusionError::Internal("error casting to binary array".into())
209 })?;
210
211 let mut builder = Float64Array::builder(binary_array.len());
212 for i in 0..binary_array.len() {
213 if binary_array.is_null(i) {
214 builder.append_null();
215 } else {
216 let jsonb_bytes = binary_array.value(i);
217 if let Some(value) = extract_f64_from_jsonb(jsonb_bytes)? {
218 builder.append_value(value);
219 } else {
220 builder.append_null();
221 }
222 }
223 }
224 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
225 }
226 DataType::Dictionary(_, value_type)
227 if matches!(value_type.as_ref(), DataType::Binary) =>
228 {
229 let dict_array = args[0]
230 .as_any()
231 .downcast_ref::<DictionaryArray<Int32Type>>()
232 .ok_or_else(|| {
233 DataFusionError::Internal("error casting dictionary array".into())
234 })?;
235
236 let binary_values = dict_array
237 .values()
238 .as_any()
239 .downcast_ref::<GenericBinaryArray<i32>>()
240 .ok_or_else(|| {
241 DataFusionError::Internal("dictionary values are not a binary array".into())
242 })?;
243
244 let mut builder = Float64Array::builder(dict_array.len());
245 for i in 0..dict_array.len() {
246 if dict_array.is_null(i) {
247 builder.append_null();
248 } else {
249 let key_index = dict_array.keys().value(i) as usize;
250 if key_index < binary_values.len() {
251 let jsonb_bytes = binary_values.value(key_index);
252 if let Some(value) = extract_f64_from_jsonb(jsonb_bytes)? {
253 builder.append_value(value);
254 } else {
255 builder.append_null();
256 }
257 } else {
258 return internal_err!(
259 "Dictionary key index out of bounds in jsonb_as_f64"
260 );
261 }
262 }
263 }
264 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
265 }
266 _ => internal_err!(
267 "jsonb_as_f64: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
268 ),
269 }
270 }
271}
272
273pub fn make_jsonb_as_f64_udf() -> ScalarUDF {
275 ScalarUDF::new_from_impl(JsonbAsF64::new())
276}
277
278#[derive(Debug, PartialEq, Eq, Hash)]
283pub struct JsonbAsI64 {
284 signature: Signature,
285}
286
287impl JsonbAsI64 {
288 pub fn new() -> Self {
289 Self {
290 signature: Signature::any(1, Volatility::Immutable),
291 }
292 }
293}
294
295impl Default for JsonbAsI64 {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301fn extract_i64_from_jsonb(jsonb_bytes: &[u8]) -> Result<Option<i64>> {
302 let jsonb = RawJsonb::new(jsonb_bytes);
303 match jsonb.as_i64() {
304 Ok(value) => Ok(value),
305 Err(e) => Err(DataFusionError::External(e.into())),
306 }
307}
308
309impl ScalarUDFImpl for JsonbAsI64 {
310 fn as_any(&self) -> &dyn Any {
311 self
312 }
313
314 fn name(&self) -> &str {
315 "jsonb_as_i64"
316 }
317
318 fn signature(&self) -> &Signature {
319 &self.signature
320 }
321
322 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
323 Ok(DataType::Int64)
324 }
325
326 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
327 let args = ColumnarValue::values_to_arrays(&args.args)?;
328 if args.len() != 1 {
329 return internal_err!("wrong number of arguments to jsonb_as_i64()");
330 }
331
332 match args[0].data_type() {
333 DataType::Binary => {
334 let binary_array = args[0]
335 .as_any()
336 .downcast_ref::<GenericBinaryArray<i32>>()
337 .ok_or_else(|| {
338 DataFusionError::Internal("error casting to binary array".into())
339 })?;
340
341 let mut builder = Int64Array::builder(binary_array.len());
342 for i in 0..binary_array.len() {
343 if binary_array.is_null(i) {
344 builder.append_null();
345 } else {
346 let jsonb_bytes = binary_array.value(i);
347 if let Some(value) = extract_i64_from_jsonb(jsonb_bytes)? {
348 builder.append_value(value);
349 } else {
350 builder.append_null();
351 }
352 }
353 }
354 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
355 }
356 DataType::Dictionary(_, value_type)
357 if matches!(value_type.as_ref(), DataType::Binary) =>
358 {
359 let dict_array = args[0]
360 .as_any()
361 .downcast_ref::<DictionaryArray<Int32Type>>()
362 .ok_or_else(|| {
363 DataFusionError::Internal("error casting dictionary array".into())
364 })?;
365
366 let binary_values = dict_array
367 .values()
368 .as_any()
369 .downcast_ref::<GenericBinaryArray<i32>>()
370 .ok_or_else(|| {
371 DataFusionError::Internal("dictionary values are not a binary array".into())
372 })?;
373
374 let mut builder = Int64Array::builder(dict_array.len());
375 for i in 0..dict_array.len() {
376 if dict_array.is_null(i) {
377 builder.append_null();
378 } else {
379 let key_index = dict_array.keys().value(i) as usize;
380 if key_index < binary_values.len() {
381 let jsonb_bytes = binary_values.value(key_index);
382 if let Some(value) = extract_i64_from_jsonb(jsonb_bytes)? {
383 builder.append_value(value);
384 } else {
385 builder.append_null();
386 }
387 } else {
388 return internal_err!(
389 "Dictionary key index out of bounds in jsonb_as_i64"
390 );
391 }
392 }
393 }
394 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
395 }
396 _ => internal_err!(
397 "jsonb_as_i64: unsupported input type, expected Binary or Dictionary<Int32, Binary>"
398 ),
399 }
400 }
401}
402
403pub fn make_jsonb_as_i64_udf() -> ScalarUDF {
405 ScalarUDF::new_from_impl(JsonbAsI64::new())
406}