micromegas_datafusion_extensions/
binary_column_accessor.rs1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{Array, ArrayRef, BinaryArray, DictionaryArray, RecordBatch};
3use datafusion::arrow::datatypes::{DataType, Int32Type};
4use std::sync::Arc;
5
6pub trait BinaryColumnAccessor: Send {
7 fn value(&self, index: usize) -> &[u8];
8
9 fn len(&self) -> usize;
10
11 fn is_null(&self, index: usize) -> bool;
12
13 fn is_empty(&self) -> bool {
14 self.len() == 0
15 }
16}
17
18struct BinaryArrayAccessor {
19 array: Arc<BinaryArray>,
20}
21
22impl BinaryArrayAccessor {
23 fn new(array: Arc<BinaryArray>) -> Self {
24 Self { array }
25 }
26}
27
28impl BinaryColumnAccessor for BinaryArrayAccessor {
29 fn value(&self, index: usize) -> &[u8] {
30 self.array.value(index)
31 }
32
33 fn len(&self) -> usize {
34 self.array.len()
35 }
36
37 fn is_null(&self, index: usize) -> bool {
38 self.array.is_null(index)
39 }
40}
41
42struct DictionaryBinaryAccessor {
43 array: Arc<DictionaryArray<Int32Type>>,
44 values: Arc<BinaryArray>,
45}
46
47impl DictionaryBinaryAccessor {
48 fn new(array: Arc<DictionaryArray<Int32Type>>) -> Result<Self> {
49 let values = array
50 .values()
51 .as_any()
52 .downcast_ref::<BinaryArray>()
53 .ok_or_else(|| anyhow!("Dictionary values are not BinaryArray"))?
54 .clone();
55
56 Ok(Self {
57 array,
58 values: Arc::new(values),
59 })
60 }
61}
62
63impl BinaryColumnAccessor for DictionaryBinaryAccessor {
64 fn value(&self, index: usize) -> &[u8] {
65 let key = self.array.keys().value(index);
66 self.values.value(key as usize)
67 }
68
69 fn len(&self) -> usize {
70 self.array.len()
71 }
72
73 fn is_null(&self, index: usize) -> bool {
74 self.array.is_null(index)
75 }
76}
77
78pub fn create_binary_accessor(array: &ArrayRef) -> Result<Box<dyn BinaryColumnAccessor + Send>> {
79 match array.data_type() {
80 DataType::Binary => {
81 let binary_array = array
82 .as_any()
83 .downcast_ref::<BinaryArray>()
84 .ok_or_else(|| anyhow!("Failed to downcast to BinaryArray"))?
85 .clone();
86 Ok(Box::new(BinaryArrayAccessor::new(Arc::new(binary_array))))
87 }
88 DataType::Dictionary(key_type, value_type) => {
89 if !matches!(value_type.as_ref(), DataType::Binary) {
90 return Err(anyhow!("Dictionary values must be Binary"));
91 }
92
93 match key_type.as_ref() {
94 DataType::Int32 => {
95 let dict_array = array
96 .as_any()
97 .downcast_ref::<DictionaryArray<Int32Type>>()
98 .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int32>"))?
99 .clone();
100 Ok(Box::new(DictionaryBinaryAccessor::new(Arc::new(
101 dict_array,
102 ))?))
103 }
104 _ => Err(anyhow!("Unsupported dictionary key type: {:?}", key_type)),
105 }
106 }
107 _ => Err(anyhow!(
108 "Unsupported array type for binary accessor: {:?}",
109 array.data_type()
110 )),
111 }
112}
113
114pub fn binary_column_by_name(
115 batch: &RecordBatch,
116 name: &str,
117) -> Result<Box<dyn BinaryColumnAccessor + Send>> {
118 let column = batch
119 .column_by_name(name)
120 .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
121 create_binary_accessor(column)
122}