micromegas_datafusion_extensions/properties/
property_get.rs1use anyhow::Context;
2use datafusion::arrow::array::{Array, StringDictionaryBuilder};
3use datafusion::arrow::array::{
4 ArrayRef, DictionaryArray, GenericBinaryArray, GenericListArray, StringArray,
5};
6use datafusion::arrow::array::{AsArray, StructArray};
7use datafusion::arrow::datatypes::{DataType, Int32Type};
8use datafusion::common::{Result, internal_err};
9use datafusion::error::DataFusionError;
10use datafusion::logical_expr::{
11 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
12};
13use jsonb::RawJsonb;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug, PartialEq, Eq, Hash)]
19pub struct PropertyGet {
20 signature: Signature,
21}
22
23impl PropertyGet {
24 pub fn new() -> Self {
25 Self {
26 signature: Signature::any(2, Volatility::Immutable),
27 }
28 }
29}
30
31impl Default for PropertyGet {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37fn find_property_in_list(properties: ArrayRef, name: &str) -> anyhow::Result<Option<String>> {
38 let properties: &StructArray = properties.as_struct();
39 let (key_index, _key_field) = properties
40 .fields()
41 .find("key")
42 .with_context(|| "getting key field")?;
43 let (value_index, _value_field) = properties
44 .fields()
45 .find("value")
46 .with_context(|| "getting value field")?;
47 for i in 0..properties.len() {
48 let key = properties.column(key_index).as_string::<i32>().value(i);
49 if key.eq_ignore_ascii_case(name) {
50 let value = properties.column(value_index).as_string::<i32>().value(i);
51 return Ok(Some(value.into()));
52 }
53 }
54 Ok(None)
55}
56
57fn extract_from_jsonb(jsonb_bytes: &[u8], name: &str) -> anyhow::Result<Option<String>> {
58 let jsonb = RawJsonb::new(jsonb_bytes);
59 if let Some(value_jsonb) = jsonb
60 .get_by_name(name, true)
61 .with_context(|| "getting JSONB property by name")?
62 {
63 let raw_value = value_jsonb.as_raw();
65
66 if let Some(str_value) = raw_value
68 .as_str()
69 .with_context(|| "extracting string value from JSONB")?
70 {
71 Ok(Some(str_value.to_string()))
72 } else {
73 Ok(Some(raw_value.to_string()))
75 }
76 } else {
77 Ok(None)
78 }
79}
80
81impl ScalarUDFImpl for PropertyGet {
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85 fn name(&self) -> &str {
86 "property_get"
87 }
88 fn signature(&self) -> &Signature {
89 &self.signature
90 }
91 fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
92 Ok(DataType::Dictionary(
93 Box::new(DataType::Int32),
94 Box::new(DataType::Utf8),
95 ))
96 }
97 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98 let args = ColumnarValue::values_to_arrays(&args.args)?;
99 if args.len() != 2 {
100 return internal_err!("wrong number of arguments to property_get()");
101 }
102
103 let names = args[1]
104 .as_any()
105 .downcast_ref::<StringArray>()
106 .ok_or_else(|| DataFusionError::Execution("downcasting names in PropertyGet".into()))?;
107
108 match args[0].data_type() {
110 DataType::List(_) => {
111 let prop_lists = args[0]
113 .as_any()
114 .downcast_ref::<GenericListArray<i32>>()
115 .ok_or_else(|| {
116 DataFusionError::Internal("error casting property list".into())
117 })?;
118
119 if prop_lists.len() != names.len() {
120 return internal_err!("arrays of different lengths in property_get()");
121 }
122
123 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
124 for i in 0..prop_lists.len() {
125 let name = names.value(i);
126 if let Some(value) = find_property_in_list(prop_lists.value(i), name)
127 .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?
128 {
129 dict_builder.append_value(value);
130 } else {
131 dict_builder.append_null();
132 }
133 }
134 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
135 }
136 DataType::Binary => {
137 let binary_array = args[0]
139 .as_any()
140 .downcast_ref::<GenericBinaryArray<i32>>()
141 .ok_or_else(|| {
142 DataFusionError::Internal("error casting to binary array".into())
143 })?;
144
145 if binary_array.len() != names.len() {
146 return internal_err!("arrays of different lengths in property_get()");
147 }
148
149 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
150 for i in 0..binary_array.len() {
151 if binary_array.is_null(i) {
152 dict_builder.append_null();
153 } else {
154 let jsonb_bytes = binary_array.value(i);
155 let name = names.value(i);
156 if let Some(value) = extract_from_jsonb(jsonb_bytes, name).map_err(|e| {
157 DataFusionError::Internal(format!("JSONB extraction error: {e:?}"))
158 })? {
159 dict_builder.append_value(value);
160 } else {
161 dict_builder.append_null();
162 }
163 }
164 }
165 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
166 }
167 DataType::Dictionary(_, value_type) => {
168 match value_type.as_ref() {
170 DataType::Binary => {
171 let dict_array = args[0]
173 .as_any()
174 .downcast_ref::<DictionaryArray<Int32Type>>()
175 .ok_or_else(|| {
176 DataFusionError::Internal("error casting dictionary array".into())
177 })?;
178
179 if dict_array.len() != names.len() {
180 return internal_err!("arrays of different lengths in property_get()");
181 }
182
183 let values_array = dict_array.values();
184 let binary_values = values_array
185 .as_any()
186 .downcast_ref::<GenericBinaryArray<i32>>()
187 .ok_or_else(|| {
188 DataFusionError::Internal(
189 "dictionary values are not a binary array".into(),
190 )
191 })?;
192
193 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
194 for i in 0..dict_array.len() {
195 if dict_array.is_null(i) {
196 dict_builder.append_null();
197 } else {
198 let key_index = dict_array.keys().value(i) as usize;
199 if key_index < binary_values.len() {
200 let jsonb_bytes = binary_values.value(key_index);
201 let name = names.value(i);
202 if let Some(value) = extract_from_jsonb(jsonb_bytes, name)
203 .map_err(|e| {
204 DataFusionError::Internal(format!(
205 "JSONB extraction error: {e:?}"
206 ))
207 })?
208 {
209 dict_builder.append_value(value);
210 } else {
211 dict_builder.append_null();
212 }
213 } else {
214 return internal_err!(
215 "Dictionary key index out of bounds in property_get"
216 );
217 }
218 }
219 }
220 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
221 }
222 DataType::List(_) => {
223 let dict_array = args[0]
224 .as_any()
225 .downcast_ref::<DictionaryArray<Int32Type>>()
226 .ok_or_else(|| {
227 DataFusionError::Internal("error casting dictionary array".into())
228 })?;
229
230 if dict_array.len() != names.len() {
231 return internal_err!("arrays of different lengths in property_get()");
232 }
233
234 let values_array = dict_array.values();
235 let list_values = values_array
236 .as_any()
237 .downcast_ref::<GenericListArray<i32>>()
238 .ok_or_else(|| {
239 DataFusionError::Internal(
240 "dictionary values are not a list array".into(),
241 )
242 })?;
243
244 let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
245 for i in 0..dict_array.len() {
246 let name = names.value(i);
247
248 if dict_array.is_null(i) {
249 dict_builder.append_null();
250 } else {
251 let key_index = dict_array.keys().value(i) as usize;
252 if key_index < list_values.len() {
253 let property_list = list_values.value(key_index);
254 if let Some(value) = find_property_in_list(property_list, name)
255 .map_err(|e| DataFusionError::Internal(format!("{e:?}")))?
256 {
257 dict_builder.append_value(value);
258 } else {
259 dict_builder.append_null();
260 }
261 } else {
262 return internal_err!(
263 "Dictionary key index out of bounds in property_get"
264 );
265 }
266 }
267 }
268 Ok(ColumnarValue::Array(Arc::new(dict_builder.finish())))
269 }
270 _ => internal_err!(
271 "property_get: unsupported dictionary value type, expected List or Binary"
272 ),
273 }
274 }
275 _ => internal_err!(
276 "property_get: unsupported input type, expected List, Binary, Dictionary<Int32, List>, or Dictionary<Int32, Binary>"
277 ),
278 }
279 }
280}