micromegas_datafusion_extensions/jsonb/
array_elements.rs1use async_trait::async_trait;
2use datafusion::arrow::array::{Array, ArrayRef, BinaryArray, DictionaryArray, GenericBinaryArray};
3use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
4use datafusion::arrow::record_batch::RecordBatch;
5use datafusion::catalog::Session;
6use datafusion::catalog::TableFunctionImpl;
7use datafusion::catalog::TableProvider;
8use datafusion::datasource::TableType;
9use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
10use datafusion::error::DataFusionError;
11use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::Expr;
14use datafusion::scalar::ScalarValue;
15use jsonb::RawJsonb;
16use std::any::Any;
17use std::sync::Arc;
18
19#[derive(Debug)]
27pub struct JsonbArrayElementsTableFunction {}
28
29impl JsonbArrayElementsTableFunction {
30 pub fn new() -> Self {
31 Self {}
32 }
33}
34
35impl Default for JsonbArrayElementsTableFunction {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41#[derive(Debug, Clone)]
43enum JsonbSource {
44 Literal(ScalarValue),
45 Subquery(Arc<LogicalPlan>),
46}
47
48impl TableFunctionImpl for JsonbArrayElementsTableFunction {
49 fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
50 if args.len() != 1 {
51 return Err(DataFusionError::Plan(
52 "jsonb_array_elements requires exactly one argument (a JSONB array)".into(),
53 ));
54 }
55
56 let source = match &args[0] {
57 Expr::Literal(scalar, _metadata) => JsonbSource::Literal(scalar.clone()),
58 Expr::ScalarSubquery(subquery) => JsonbSource::Subquery(subquery.subquery.clone()),
59 other => {
60 let plan = LogicalPlanBuilder::empty(true)
61 .project(vec![other.clone()])?
62 .build()?;
63 JsonbSource::Subquery(Arc::new(plan))
64 }
65 };
66
67 Ok(Arc::new(JsonbArrayElementsTableProvider { source }))
68 }
69}
70
71fn output_schema() -> SchemaRef {
72 Arc::new(Schema::new(vec![Field::new(
73 "value",
74 DataType::Binary,
75 false,
76 )]))
77}
78
79fn extract_elements_from_jsonb(jsonb_bytes: &[u8]) -> Result<Vec<Vec<u8>>, DataFusionError> {
81 let jsonb = RawJsonb::new(jsonb_bytes);
82 match jsonb.array_values() {
83 Ok(Some(values)) => Ok(values.into_iter().map(|v| v.as_ref().to_vec()).collect()),
84 Ok(None) => Err(DataFusionError::Execution(
85 "jsonb_array_elements: input is not a JSONB array".into(),
86 )),
87 Err(e) => Err(DataFusionError::External(e.into())),
88 }
89}
90
91fn empty_exec(projection: Option<&Vec<usize>>) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
92 let batch = RecordBatch::new_empty(output_schema());
93 let source = MemorySourceConfig::try_new(
94 &[vec![batch]],
95 output_schema(),
96 projection.map(|v| v.to_owned()),
97 )?;
98 Ok(DataSourceExec::from_data_source(source))
99}
100
101fn elements_to_batch(elements: &[Vec<u8>]) -> Result<RecordBatch, DataFusionError> {
102 if elements.is_empty() {
103 return Ok(RecordBatch::new_empty(output_schema()));
104 }
105
106 let values: Vec<&[u8]> = elements.iter().map(|v| v.as_slice()).collect();
107 let value_array: ArrayRef = Arc::new(BinaryArray::from(values));
108
109 RecordBatch::try_new(output_schema(), vec![value_array])
110 .map_err(|e| DataFusionError::External(e.into()))
111}
112
113fn scalar_to_elements(scalar: &ScalarValue) -> Result<Vec<Vec<u8>>, DataFusionError> {
114 match scalar {
115 ScalarValue::Binary(Some(bytes)) => extract_elements_from_jsonb(bytes),
116 ScalarValue::Binary(None) => Ok(vec![]),
117 ScalarValue::Dictionary(_, inner) => scalar_to_elements(inner.as_ref()),
118 _ => Err(DataFusionError::Plan(format!(
119 "jsonb_array_elements argument must be Binary (JSONB), got: {:?}",
120 scalar.data_type()
121 ))),
122 }
123}
124
125fn extract_all_jsonb_bytes_from_column(column: &ArrayRef) -> Result<Vec<Vec<u8>>, DataFusionError> {
128 match column.data_type() {
129 DataType::Binary => {
130 let binary_array = column
131 .as_any()
132 .downcast_ref::<GenericBinaryArray<i32>>()
133 .ok_or_else(|| {
134 DataFusionError::Execution("failed to cast column to BinaryArray".into())
135 })?;
136 Ok((0..binary_array.len())
137 .filter(|&i| !binary_array.is_null(i))
138 .map(|i| binary_array.value(i).to_vec())
139 .collect())
140 }
141 DataType::Dictionary(_, value_type) if matches!(value_type.as_ref(), DataType::Binary) => {
142 let dict_array = column
143 .as_any()
144 .downcast_ref::<DictionaryArray<Int32Type>>()
145 .ok_or_else(|| {
146 DataFusionError::Execution(
147 "failed to cast column to DictionaryArray<Int32, Binary>".into(),
148 )
149 })?;
150 let binary_values = dict_array
151 .values()
152 .as_any()
153 .downcast_ref::<GenericBinaryArray<i32>>()
154 .ok_or_else(|| {
155 DataFusionError::Execution("dictionary values are not a binary array".into())
156 })?;
157 Ok((0..dict_array.len())
158 .filter(|&i| !dict_array.is_null(i))
159 .map(|i| {
160 let key_index = dict_array.keys().value(i) as usize;
161 binary_values.value(key_index).to_vec()
162 })
163 .collect())
164 }
165 other => Err(DataFusionError::Execution(format!(
166 "jsonb_array_elements subquery must return a Binary or Dictionary<Int32, Binary> column, got: {other:?}"
167 ))),
168 }
169}
170
171#[derive(Debug)]
173pub struct JsonbArrayElementsTableProvider {
174 source: JsonbSource,
175}
176
177impl JsonbArrayElementsTableProvider {
178 pub fn from_scalar(scalar: ScalarValue) -> Result<Self, DataFusionError> {
180 if !matches!(&scalar, ScalarValue::Binary(Some(_))) {
181 return Err(DataFusionError::Plan(format!(
182 "jsonb_array_elements argument must be Binary (JSONB), got: {:?}",
183 scalar.data_type()
184 )));
185 }
186 Ok(Self {
187 source: JsonbSource::Literal(scalar),
188 })
189 }
190}
191
192#[async_trait]
193impl TableProvider for JsonbArrayElementsTableProvider {
194 fn as_any(&self) -> &dyn Any {
195 self
196 }
197
198 fn schema(&self) -> SchemaRef {
199 output_schema()
200 }
201
202 fn table_type(&self) -> TableType {
203 TableType::Temporary
204 }
205
206 async fn scan(
207 &self,
208 state: &dyn Session,
209 projection: Option<&Vec<usize>>,
210 _filters: &[Expr],
211 limit: Option<usize>,
212 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
213 let elements = match &self.source {
214 JsonbSource::Literal(scalar) => scalar_to_elements(scalar)?,
215 JsonbSource::Subquery(plan) => {
216 let physical_plan = state.create_physical_plan(plan).await?;
217 let task_ctx = state.task_ctx();
218 let batches = datafusion::physical_plan::collect(physical_plan, task_ctx).await?;
219
220 let mut all_elements = Vec::new();
221 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
222 return empty_exec(projection);
223 }
224 for batch in &batches {
225 if batch.num_columns() != 1 {
226 return Err(DataFusionError::Execution(format!(
227 "jsonb_array_elements subquery must return exactly one column, got {}",
228 batch.num_columns()
229 )));
230 }
231 for jsonb_bytes in extract_all_jsonb_bytes_from_column(batch.column(0))? {
232 all_elements.extend(extract_elements_from_jsonb(&jsonb_bytes)?);
233 }
234 }
235 all_elements
236 }
237 };
238
239 let mut record_batch = elements_to_batch(&elements)?;
240
241 if let Some(n) = limit
243 && n < record_batch.num_rows()
244 {
245 record_batch = record_batch.slice(0, n);
246 }
247
248 let source = MemorySourceConfig::try_new(
249 &[vec![record_batch]],
250 self.schema(),
251 projection.map(|v| v.to_owned()),
252 )?;
253 Ok(DataSourceExec::from_data_source(source))
254 }
255}