micromegas_datafusion_extensions/jsonb/
format_json.rs1use crate::binary_column_accessor::create_binary_accessor;
2use datafusion::arrow::array::StringDictionaryBuilder;
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use datafusion::common::{Result, internal_err};
5use datafusion::logical_expr::{
6 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::RawJsonb;
9use std::any::Any;
10use std::sync::Arc;
11
12#[derive(Debug, PartialEq, Eq, Hash)]
18pub struct JsonbFormatJson {
19 signature: Signature,
20}
21
22impl JsonbFormatJson {
23 pub fn new() -> Self {
24 Self {
25 signature: Signature::any(1, Volatility::Immutable),
26 }
27 }
28}
29
30impl Default for JsonbFormatJson {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl ScalarUDFImpl for JsonbFormatJson {
37 fn as_any(&self) -> &dyn Any {
38 self
39 }
40
41 fn name(&self) -> &str {
42 "jsonb_format_json"
43 }
44
45 fn signature(&self) -> &Signature {
46 &self.signature
47 }
48
49 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
50 Ok(DataType::Dictionary(
51 Box::new(DataType::Int32),
52 Box::new(DataType::Utf8),
53 ))
54 }
55
56 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57 let args = ColumnarValue::values_to_arrays(&args.args)?;
58 if args.len() != 1 {
59 return internal_err!("wrong number of arguments to jsonb_format_json");
60 }
61
62 let binary_accessor = create_binary_accessor(&args[0])
64 .map_err(|e| datafusion::error::DataFusionError::Execution(
65 format!("Invalid input type for jsonb_format_json: {}. Expected Binary or Dictionary<Int32, Binary>", e)
66 ))?;
67
68 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
69
70 for index in 0..binary_accessor.len() {
71 if binary_accessor.is_null(index) {
72 dict_builder.append_null();
73 } else {
74 let src_buffer = binary_accessor.value(index);
75 let jsonb = RawJsonb::new(src_buffer);
76 dict_builder.append_value(jsonb.to_string());
77 }
78 }
79
80 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
81 }
82}
83
84pub fn make_jsonb_format_json_udf() -> datafusion::logical_expr::ScalarUDF {
90 datafusion::logical_expr::ScalarUDF::new_from_impl(JsonbFormatJson::new())
91}