micromegas_datafusion_extensions/
binary_column_accessor.rs

1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{Array, ArrayRef, BinaryArray, DictionaryArray, RecordBatch};
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use std::sync::Arc;
5
6pub trait BinaryColumnAccessor: Send {
7    fn value(&self, index: usize) -> &[u8];
8
9    fn len(&self) -> usize;
10
11    fn is_null(&self, index: usize) -> bool;
12
13    fn is_empty(&self) -> bool {
14        self.len() == 0
15    }
16}
17
18struct BinaryArrayAccessor {
19    array: Arc<BinaryArray>,
20}
21
22impl BinaryArrayAccessor {
23    fn new(array: Arc<BinaryArray>) -> Self {
24        Self { array }
25    }
26}
27
28impl BinaryColumnAccessor for BinaryArrayAccessor {
29    fn value(&self, index: usize) -> &[u8] {
30        self.array.value(index)
31    }
32
33    fn len(&self) -> usize {
34        self.array.len()
35    }
36
37    fn is_null(&self, index: usize) -> bool {
38        self.array.is_null(index)
39    }
40}
41
42struct DictionaryBinaryAccessor {
43    array: Arc<DictionaryArray<Int32Type>>,
44    values: Arc<BinaryArray>,
45}
46
47impl DictionaryBinaryAccessor {
48    fn new(array: Arc<DictionaryArray<Int32Type>>) -> Result<Self> {
49        let values = array
50            .values()
51            .as_any()
52            .downcast_ref::<BinaryArray>()
53            .ok_or_else(|| anyhow!("Dictionary values are not BinaryArray"))?
54            .clone();
55
56        Ok(Self {
57            array,
58            values: Arc::new(values),
59        })
60    }
61}
62
63impl BinaryColumnAccessor for DictionaryBinaryAccessor {
64    fn value(&self, index: usize) -> &[u8] {
65        let key = self.array.keys().value(index);
66        self.values.value(key as usize)
67    }
68
69    fn len(&self) -> usize {
70        self.array.len()
71    }
72
73    fn is_null(&self, index: usize) -> bool {
74        self.array.is_null(index)
75    }
76}
77
78pub fn create_binary_accessor(array: &ArrayRef) -> Result<Box<dyn BinaryColumnAccessor + Send>> {
79    match array.data_type() {
80        DataType::Binary => {
81            let binary_array = array
82                .as_any()
83                .downcast_ref::<BinaryArray>()
84                .ok_or_else(|| anyhow!("Failed to downcast to BinaryArray"))?
85                .clone();
86            Ok(Box::new(BinaryArrayAccessor::new(Arc::new(binary_array))))
87        }
88        DataType::Dictionary(key_type, value_type) => {
89            if !matches!(value_type.as_ref(), DataType::Binary) {
90                return Err(anyhow!("Dictionary values must be Binary"));
91            }
92
93            match key_type.as_ref() {
94                DataType::Int32 => {
95                    let dict_array = array
96                        .as_any()
97                        .downcast_ref::<DictionaryArray<Int32Type>>()
98                        .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int32>"))?
99                        .clone();
100                    Ok(Box::new(DictionaryBinaryAccessor::new(Arc::new(
101                        dict_array,
102                    ))?))
103                }
104                _ => Err(anyhow!("Unsupported dictionary key type: {:?}", key_type)),
105            }
106        }
107        _ => Err(anyhow!(
108            "Unsupported array type for binary accessor: {:?}",
109            array.data_type()
110        )),
111    }
112}
113
114pub fn binary_column_by_name(
115    batch: &RecordBatch,
116    name: &str,
117) -> Result<Box<dyn BinaryColumnAccessor + Send>> {
118    let column = batch
119        .column_by_name(name)
120        .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
121    create_binary_accessor(column)
122}