micromegas_datafusion_extensions/jsonb/
format_json.rs

1use crate::binary_column_accessor::create_binary_accessor;
2use datafusion::arrow::array::StringDictionaryBuilder;
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use datafusion::common::{Result, internal_err};
5use datafusion::logical_expr::{
6    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
7};
8use jsonb::RawJsonb;
9use std::any::Any;
10use std::sync::Arc;
11
12/// A scalar UDF that formats JSONB binary data as a JSON string.
13///
14/// Accepts both Binary and Dictionary<Int32, Binary> inputs, making it compatible
15/// with dictionary-encoded JSONB columns and the output of `properties_to_jsonb`.
16/// Returns Dictionary<Int32, Utf8> for memory efficiency.
17#[derive(Debug, PartialEq, Eq, Hash)]
18pub struct JsonbFormatJson {
19    signature: Signature,
20}
21
22impl JsonbFormatJson {
23    pub fn new() -> Self {
24        Self {
25            signature: Signature::any(1, Volatility::Immutable),
26        }
27    }
28}
29
30impl Default for JsonbFormatJson {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl ScalarUDFImpl for JsonbFormatJson {
37    fn as_any(&self) -> &dyn Any {
38        self
39    }
40
41    fn name(&self) -> &str {
42        "jsonb_format_json"
43    }
44
45    fn signature(&self) -> &Signature {
46        &self.signature
47    }
48
49    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
50        Ok(DataType::Dictionary(
51            Box::new(DataType::Int32),
52            Box::new(DataType::Utf8),
53        ))
54    }
55
56    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
57        let args = ColumnarValue::values_to_arrays(&args.args)?;
58        if args.len() != 1 {
59            return internal_err!("wrong number of arguments to jsonb_format_json");
60        }
61
62        // Use BinaryColumnAccessor to handle both Binary and Dictionary<Int32, Binary>
63        let binary_accessor = create_binary_accessor(&args[0])
64            .map_err(|e| datafusion::error::DataFusionError::Execution(
65                format!("Invalid input type for jsonb_format_json: {}. Expected Binary or Dictionary<Int32, Binary>", e)
66            ))?;
67
68        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
69
70        for index in 0..binary_accessor.len() {
71            if binary_accessor.is_null(index) {
72                dict_builder.append_null();
73            } else {
74                let src_buffer = binary_accessor.value(index);
75                let jsonb = RawJsonb::new(src_buffer);
76                dict_builder.append_value(jsonb.to_string());
77            }
78        }
79
80        Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
81    }
82}
83
84/// Creates a user-defined function to format a JSONB value as a JSON string.
85///
86/// This function accepts both `Binary` and `Dictionary<Int32, Binary>` inputs,
87/// allowing it to work seamlessly with dictionary-encoded JSONB columns.
88/// Returns `Dictionary<Int32, Utf8>` for memory efficiency.
89pub fn make_jsonb_format_json_udf() -> datafusion::logical_expr::ScalarUDF {
90    datafusion::logical_expr::ScalarUDF::new_from_impl(JsonbFormatJson::new())
91}