micromegas_datafusion_extensions/jsonb/
array_elements.rs

1use async_trait::async_trait;
2use datafusion::arrow::array::{Array, ArrayRef, BinaryArray, DictionaryArray, GenericBinaryArray};
3use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
4use datafusion::arrow::record_batch::RecordBatch;
5use datafusion::catalog::Session;
6use datafusion::catalog::TableFunctionImpl;
7use datafusion::catalog::TableProvider;
8use datafusion::datasource::TableType;
9use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
10use datafusion::error::DataFusionError;
11use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
12use datafusion::physical_plan::ExecutionPlan;
13use datafusion::prelude::Expr;
14use datafusion::scalar::ScalarValue;
15use jsonb::RawJsonb;
16use std::any::Any;
17use std::sync::Arc;
18
19/// A DataFusion `TableFunctionImpl` that expands a JSONB array into rows with a single `value` column.
20///
21/// Usage:
22/// ```sql
23/// SELECT jsonb_as_string(elem.value)
24/// FROM jsonb_array_elements(jsonb_parse('[1, 2, 3]')) as elem
25/// ```
26#[derive(Debug)]
27pub struct JsonbArrayElementsTableFunction {}
28
29impl JsonbArrayElementsTableFunction {
30    pub fn new() -> Self {
31        Self {}
32    }
33}
34
35impl Default for JsonbArrayElementsTableFunction {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41/// The source of JSONB data — either a literal value or a subquery/expression to evaluate.
42#[derive(Debug, Clone)]
43enum JsonbSource {
44    Literal(ScalarValue),
45    Subquery(Arc<LogicalPlan>),
46}
47
48impl TableFunctionImpl for JsonbArrayElementsTableFunction {
49    fn call(&self, args: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
50        if args.len() != 1 {
51            return Err(DataFusionError::Plan(
52                "jsonb_array_elements requires exactly one argument (a JSONB array)".into(),
53            ));
54        }
55
56        let source = match &args[0] {
57            Expr::Literal(scalar, _metadata) => JsonbSource::Literal(scalar.clone()),
58            Expr::ScalarSubquery(subquery) => JsonbSource::Subquery(subquery.subquery.clone()),
59            other => {
60                let plan = LogicalPlanBuilder::empty(true)
61                    .project(vec![other.clone()])?
62                    .build()?;
63                JsonbSource::Subquery(Arc::new(plan))
64            }
65        };
66
67        Ok(Arc::new(JsonbArrayElementsTableProvider { source }))
68    }
69}
70
71fn output_schema() -> SchemaRef {
72    Arc::new(Schema::new(vec![Field::new(
73        "value",
74        DataType::Binary,
75        false,
76    )]))
77}
78
79/// Extract element values from a JSONB array.
80fn extract_elements_from_jsonb(jsonb_bytes: &[u8]) -> Result<Vec<Vec<u8>>, DataFusionError> {
81    let jsonb = RawJsonb::new(jsonb_bytes);
82    match jsonb.array_values() {
83        Ok(Some(values)) => Ok(values.into_iter().map(|v| v.as_ref().to_vec()).collect()),
84        Ok(None) => Err(DataFusionError::Execution(
85            "jsonb_array_elements: input is not a JSONB array".into(),
86        )),
87        Err(e) => Err(DataFusionError::External(e.into())),
88    }
89}
90
91fn empty_exec(projection: Option<&Vec<usize>>) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
92    let batch = RecordBatch::new_empty(output_schema());
93    let source = MemorySourceConfig::try_new(
94        &[vec![batch]],
95        output_schema(),
96        projection.map(|v| v.to_owned()),
97    )?;
98    Ok(DataSourceExec::from_data_source(source))
99}
100
101fn elements_to_batch(elements: &[Vec<u8>]) -> Result<RecordBatch, DataFusionError> {
102    if elements.is_empty() {
103        return Ok(RecordBatch::new_empty(output_schema()));
104    }
105
106    let values: Vec<&[u8]> = elements.iter().map(|v| v.as_slice()).collect();
107    let value_array: ArrayRef = Arc::new(BinaryArray::from(values));
108
109    RecordBatch::try_new(output_schema(), vec![value_array])
110        .map_err(|e| DataFusionError::External(e.into()))
111}
112
113fn scalar_to_elements(scalar: &ScalarValue) -> Result<Vec<Vec<u8>>, DataFusionError> {
114    match scalar {
115        ScalarValue::Binary(Some(bytes)) => extract_elements_from_jsonb(bytes),
116        ScalarValue::Binary(None) => Ok(vec![]),
117        ScalarValue::Dictionary(_, inner) => scalar_to_elements(inner.as_ref()),
118        _ => Err(DataFusionError::Plan(format!(
119            "jsonb_array_elements argument must be Binary (JSONB), got: {:?}",
120            scalar.data_type()
121        ))),
122    }
123}
124
125/// Extract JSONB bytes from all rows of a column, handling both plain Binary
126/// and Dictionary<Int32, Binary> encodings.
127fn extract_all_jsonb_bytes_from_column(column: &ArrayRef) -> Result<Vec<Vec<u8>>, DataFusionError> {
128    match column.data_type() {
129        DataType::Binary => {
130            let binary_array = column
131                .as_any()
132                .downcast_ref::<GenericBinaryArray<i32>>()
133                .ok_or_else(|| {
134                    DataFusionError::Execution("failed to cast column to BinaryArray".into())
135                })?;
136            Ok((0..binary_array.len())
137                .filter(|&i| !binary_array.is_null(i))
138                .map(|i| binary_array.value(i).to_vec())
139                .collect())
140        }
141        DataType::Dictionary(_, value_type) if matches!(value_type.as_ref(), DataType::Binary) => {
142            let dict_array = column
143                .as_any()
144                .downcast_ref::<DictionaryArray<Int32Type>>()
145                .ok_or_else(|| {
146                    DataFusionError::Execution(
147                        "failed to cast column to DictionaryArray<Int32, Binary>".into(),
148                    )
149                })?;
150            let binary_values = dict_array
151                .values()
152                .as_any()
153                .downcast_ref::<GenericBinaryArray<i32>>()
154                .ok_or_else(|| {
155                    DataFusionError::Execution("dictionary values are not a binary array".into())
156                })?;
157            Ok((0..dict_array.len())
158                .filter(|&i| !dict_array.is_null(i))
159                .map(|i| {
160                    let key_index = dict_array.keys().value(i) as usize;
161                    binary_values.value(key_index).to_vec()
162                })
163                .collect())
164        }
165        other => Err(DataFusionError::Execution(format!(
166            "jsonb_array_elements subquery must return a Binary or Dictionary<Int32, Binary> column, got: {other:?}"
167        ))),
168    }
169}
170
171/// Table provider for expanding JSONB arrays into value rows.
172#[derive(Debug)]
173pub struct JsonbArrayElementsTableProvider {
174    source: JsonbSource,
175}
176
177impl JsonbArrayElementsTableProvider {
178    /// Creates a new provider from a JSONB scalar value (for testing).
179    pub fn from_scalar(scalar: ScalarValue) -> Result<Self, DataFusionError> {
180        if !matches!(&scalar, ScalarValue::Binary(Some(_))) {
181            return Err(DataFusionError::Plan(format!(
182                "jsonb_array_elements argument must be Binary (JSONB), got: {:?}",
183                scalar.data_type()
184            )));
185        }
186        Ok(Self {
187            source: JsonbSource::Literal(scalar),
188        })
189    }
190}
191
192#[async_trait]
193impl TableProvider for JsonbArrayElementsTableProvider {
194    fn as_any(&self) -> &dyn Any {
195        self
196    }
197
198    fn schema(&self) -> SchemaRef {
199        output_schema()
200    }
201
202    fn table_type(&self) -> TableType {
203        TableType::Temporary
204    }
205
206    async fn scan(
207        &self,
208        state: &dyn Session,
209        projection: Option<&Vec<usize>>,
210        _filters: &[Expr],
211        limit: Option<usize>,
212    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
213        let elements = match &self.source {
214            JsonbSource::Literal(scalar) => scalar_to_elements(scalar)?,
215            JsonbSource::Subquery(plan) => {
216                let physical_plan = state.create_physical_plan(plan).await?;
217                let task_ctx = state.task_ctx();
218                let batches = datafusion::physical_plan::collect(physical_plan, task_ctx).await?;
219
220                let mut all_elements = Vec::new();
221                if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
222                    return empty_exec(projection);
223                }
224                for batch in &batches {
225                    if batch.num_columns() != 1 {
226                        return Err(DataFusionError::Execution(format!(
227                            "jsonb_array_elements subquery must return exactly one column, got {}",
228                            batch.num_columns()
229                        )));
230                    }
231                    for jsonb_bytes in extract_all_jsonb_bytes_from_column(batch.column(0))? {
232                        all_elements.extend(extract_elements_from_jsonb(&jsonb_bytes)?);
233                    }
234                }
235                all_elements
236            }
237        };
238
239        let mut record_batch = elements_to_batch(&elements)?;
240
241        // Apply limit if specified
242        if let Some(n) = limit
243            && n < record_batch.num_rows()
244        {
245            record_batch = record_batch.slice(0, n);
246        }
247
248        let source = MemorySourceConfig::try_new(
249            &[vec![record_batch]],
250            self.schema(),
251            projection.map(|v| v.to_owned()),
252        )?;
253        Ok(DataSourceExec::from_data_source(source))
254    }
255}