micromegas_datafusion_extensions/jsonb/
path_query.rs

1use crate::binary_column_accessor::create_binary_accessor;
2use datafusion::arrow::array::{Array, BinaryDictionaryBuilder, StringArray};
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use datafusion::common::{Result, internal_err};
5use datafusion::error::DataFusionError;
6use datafusion::logical_expr::{
7    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
8};
9use jsonb::RawJsonb;
10use jsonb::jsonpath::parse_json_path;
11use std::any::Any;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15#[derive(Clone, Copy)]
16enum PathQueryMode {
17    First,
18    All,
19}
20
21fn eval_jsonb_path_query(
22    func_name: &str,
23    args: ScalarFunctionArgs,
24    mode: PathQueryMode,
25) -> Result<ColumnarValue> {
26    let args = ColumnarValue::values_to_arrays(&args.args)?;
27    if args.len() != 2 {
28        return internal_err!("wrong number of arguments to {func_name}");
29    }
30
31    let accessor = create_binary_accessor(&args[0]).map_err(|e| {
32        DataFusionError::Execution(format!(
33            "Invalid input type for {func_name}: {e}. Expected Binary or Dictionary<Int32, Binary>"
34        ))
35    })?;
36
37    let paths = args[1]
38        .as_any()
39        .downcast_ref::<StringArray>()
40        .ok_or_else(|| {
41            DataFusionError::Execution(format!("second argument to {func_name} must be a string"))
42        })?;
43
44    let mut builder = BinaryDictionaryBuilder::<Int32Type>::new();
45    let mut path_cache: HashMap<&str, _> = HashMap::new();
46
47    for i in 0..accessor.len() {
48        if accessor.is_null(i) || paths.is_null(i) {
49            builder.append_null();
50        } else {
51            let path_str = paths.value(i);
52            if !path_cache.contains_key(path_str) {
53                let parsed = parse_json_path(path_str.as_bytes()).map_err(|e| {
54                    DataFusionError::Execution(format!(
55                        "{func_name}: invalid JSONPath '{path_str}': {e}"
56                    ))
57                })?;
58                path_cache.insert(path_str, parsed);
59            }
60            let json_path = path_cache.get(path_str).expect("just inserted");
61            let raw = RawJsonb::new(accessor.value(i));
62            match mode {
63                PathQueryMode::First => match raw.select_first_by_path(json_path) {
64                    Ok(Some(value)) => builder.append_value(value.as_ref()),
65                    Ok(None) => builder.append_null(),
66                    Err(e) => return Err(DataFusionError::External(e.into())),
67                },
68                PathQueryMode::All => match raw.select_array_by_path(json_path) {
69                    Ok(value) => builder.append_value(value.as_ref()),
70                    Err(e) => return Err(DataFusionError::External(e.into())),
71                },
72            }
73        }
74    }
75
76    Ok(ColumnarValue::Array(Arc::new(builder.finish())))
77}
78
79/// A scalar UDF that returns the first match of a JSONPath expression on a JSONB value.
80///
81/// Accepts both Binary and Dictionary<Int32, Binary> inputs for the JSONB argument.
82/// The path argument is Utf8. Returns Dictionary<Int32, Binary> for memory efficiency,
83/// or NULL if no match is found.
84#[derive(Debug, PartialEq, Eq, Hash)]
85pub struct JsonbPathQueryFirst {
86    signature: Signature,
87}
88
89impl JsonbPathQueryFirst {
90    pub fn new() -> Self {
91        Self {
92            signature: Signature::any(2, Volatility::Immutable),
93        }
94    }
95}
96
97impl Default for JsonbPathQueryFirst {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl ScalarUDFImpl for JsonbPathQueryFirst {
104    fn as_any(&self) -> &dyn Any {
105        self
106    }
107
108    fn name(&self) -> &str {
109        "jsonb_path_query_first"
110    }
111
112    fn signature(&self) -> &Signature {
113        &self.signature
114    }
115
116    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
117        Ok(DataType::Dictionary(
118            Box::new(DataType::Int32),
119            Box::new(DataType::Binary),
120        ))
121    }
122
123    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
124        eval_jsonb_path_query("jsonb_path_query_first", args, PathQueryMode::First)
125    }
126}
127
128/// Creates a user-defined function to extract the first JSONPath match from a JSONB value.
129pub fn make_jsonb_path_query_first_udf() -> ScalarUDF {
130    ScalarUDF::new_from_impl(JsonbPathQueryFirst::new())
131}
132
133/// A scalar UDF that returns all matches of a JSONPath expression on a JSONB value as a JSONB array.
134///
135/// Accepts both Binary and Dictionary<Int32, Binary> inputs for the JSONB argument.
136/// The path argument is Utf8. Returns Dictionary<Int32, Binary> containing a JSONB array
137/// of all matched values.
138#[derive(Debug, PartialEq, Eq, Hash)]
139pub struct JsonbPathQuery {
140    signature: Signature,
141}
142
143impl JsonbPathQuery {
144    pub fn new() -> Self {
145        Self {
146            signature: Signature::any(2, Volatility::Immutable),
147        }
148    }
149}
150
151impl Default for JsonbPathQuery {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl ScalarUDFImpl for JsonbPathQuery {
158    fn as_any(&self) -> &dyn Any {
159        self
160    }
161
162    fn name(&self) -> &str {
163        "jsonb_path_query"
164    }
165
166    fn signature(&self) -> &Signature {
167        &self.signature
168    }
169
170    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
171        Ok(DataType::Dictionary(
172            Box::new(DataType::Int32),
173            Box::new(DataType::Binary),
174        ))
175    }
176
177    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
178        eval_jsonb_path_query("jsonb_path_query", args, PathQueryMode::All)
179    }
180}
181
182/// Creates a user-defined function to extract all JSONPath matches from a JSONB value as a JSONB array.
183pub fn make_jsonb_path_query_udf() -> ScalarUDF {
184    ScalarUDF::new_from_impl(JsonbPathQuery::new())
185}