micromegas_datafusion_extensions/jsonb/
path_query.rs1use crate::binary_column_accessor::create_binary_accessor;
2use datafusion::arrow::array::{Array, BinaryDictionaryBuilder, StringArray};
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use datafusion::common::{Result, internal_err};
5use datafusion::error::DataFusionError;
6use datafusion::logical_expr::{
7 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
8};
9use jsonb::RawJsonb;
10use jsonb::jsonpath::parse_json_path;
11use std::any::Any;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15#[derive(Clone, Copy)]
16enum PathQueryMode {
17 First,
18 All,
19}
20
21fn eval_jsonb_path_query(
22 func_name: &str,
23 args: ScalarFunctionArgs,
24 mode: PathQueryMode,
25) -> Result<ColumnarValue> {
26 let args = ColumnarValue::values_to_arrays(&args.args)?;
27 if args.len() != 2 {
28 return internal_err!("wrong number of arguments to {func_name}");
29 }
30
31 let accessor = create_binary_accessor(&args[0]).map_err(|e| {
32 DataFusionError::Execution(format!(
33 "Invalid input type for {func_name}: {e}. Expected Binary or Dictionary<Int32, Binary>"
34 ))
35 })?;
36
37 let paths = args[1]
38 .as_any()
39 .downcast_ref::<StringArray>()
40 .ok_or_else(|| {
41 DataFusionError::Execution(format!("second argument to {func_name} must be a string"))
42 })?;
43
44 let mut builder = BinaryDictionaryBuilder::<Int32Type>::new();
45 let mut path_cache: HashMap<&str, _> = HashMap::new();
46
47 for i in 0..accessor.len() {
48 if accessor.is_null(i) || paths.is_null(i) {
49 builder.append_null();
50 } else {
51 let path_str = paths.value(i);
52 if !path_cache.contains_key(path_str) {
53 let parsed = parse_json_path(path_str.as_bytes()).map_err(|e| {
54 DataFusionError::Execution(format!(
55 "{func_name}: invalid JSONPath '{path_str}': {e}"
56 ))
57 })?;
58 path_cache.insert(path_str, parsed);
59 }
60 let json_path = path_cache.get(path_str).expect("just inserted");
61 let raw = RawJsonb::new(accessor.value(i));
62 match mode {
63 PathQueryMode::First => match raw.select_first_by_path(json_path) {
64 Ok(Some(value)) => builder.append_value(value.as_ref()),
65 Ok(None) => builder.append_null(),
66 Err(e) => return Err(DataFusionError::External(e.into())),
67 },
68 PathQueryMode::All => match raw.select_array_by_path(json_path) {
69 Ok(value) => builder.append_value(value.as_ref()),
70 Err(e) => return Err(DataFusionError::External(e.into())),
71 },
72 }
73 }
74 }
75
76 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
77}
78
79#[derive(Debug, PartialEq, Eq, Hash)]
85pub struct JsonbPathQueryFirst {
86 signature: Signature,
87}
88
89impl JsonbPathQueryFirst {
90 pub fn new() -> Self {
91 Self {
92 signature: Signature::any(2, Volatility::Immutable),
93 }
94 }
95}
96
97impl Default for JsonbPathQueryFirst {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl ScalarUDFImpl for JsonbPathQueryFirst {
104 fn as_any(&self) -> &dyn Any {
105 self
106 }
107
108 fn name(&self) -> &str {
109 "jsonb_path_query_first"
110 }
111
112 fn signature(&self) -> &Signature {
113 &self.signature
114 }
115
116 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
117 Ok(DataType::Dictionary(
118 Box::new(DataType::Int32),
119 Box::new(DataType::Binary),
120 ))
121 }
122
123 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
124 eval_jsonb_path_query("jsonb_path_query_first", args, PathQueryMode::First)
125 }
126}
127
128pub fn make_jsonb_path_query_first_udf() -> ScalarUDF {
130 ScalarUDF::new_from_impl(JsonbPathQueryFirst::new())
131}
132
133#[derive(Debug, PartialEq, Eq, Hash)]
139pub struct JsonbPathQuery {
140 signature: Signature,
141}
142
143impl JsonbPathQuery {
144 pub fn new() -> Self {
145 Self {
146 signature: Signature::any(2, Volatility::Immutable),
147 }
148 }
149}
150
151impl Default for JsonbPathQuery {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl ScalarUDFImpl for JsonbPathQuery {
158 fn as_any(&self) -> &dyn Any {
159 self
160 }
161
162 fn name(&self) -> &str {
163 "jsonb_path_query"
164 }
165
166 fn signature(&self) -> &Signature {
167 &self.signature
168 }
169
170 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
171 Ok(DataType::Dictionary(
172 Box::new(DataType::Int32),
173 Box::new(DataType::Binary),
174 ))
175 }
176
177 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
178 eval_jsonb_path_query("jsonb_path_query", args, PathQueryMode::All)
179 }
180}
181
182pub fn make_jsonb_path_query_udf() -> ScalarUDF {
184 ScalarUDF::new_from_impl(JsonbPathQuery::new())
185}