micromegas_analytics/dfext/
string_column_accessor.rs1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{
3 Array, ArrayRef, DictionaryArray, RecordBatch, StringArray, types::ArrowDictionaryKeyType,
4};
5use datafusion::arrow::datatypes::{DataType, Int8Type, Int16Type, Int32Type, Int64Type};
6use std::sync::Arc;
7
8pub trait StringColumnAccessor: Send {
9 fn value(&self, index: usize) -> Result<&str>;
10
11 fn len(&self) -> usize;
12
13 fn is_null(&self, index: usize) -> bool;
14
15 fn is_empty(&self) -> bool {
16 self.len() == 0
17 }
18}
19
20struct StringArrayAccessor {
21 array: Arc<StringArray>,
22}
23
24impl StringArrayAccessor {
25 fn new(array: Arc<StringArray>) -> Self {
26 Self { array }
27 }
28}
29
30impl StringColumnAccessor for StringArrayAccessor {
31 fn value(&self, index: usize) -> Result<&str> {
32 Ok(self.array.value(index))
33 }
34
35 fn len(&self) -> usize {
36 self.array.len()
37 }
38
39 fn is_null(&self, index: usize) -> bool {
40 self.array.is_null(index)
41 }
42}
43
44struct DictionaryStringAccessor<K: ArrowDictionaryKeyType> {
45 array: Arc<DictionaryArray<K>>,
46 values: Arc<StringArray>,
47}
48
49impl<K: ArrowDictionaryKeyType> DictionaryStringAccessor<K> {
50 fn new(array: Arc<DictionaryArray<K>>) -> Result<Self> {
51 let values = array
52 .values()
53 .as_any()
54 .downcast_ref::<StringArray>()
55 .ok_or_else(|| anyhow!("Dictionary values are not StringArray"))?
56 .clone();
57
58 Ok(Self {
59 array,
60 values: Arc::new(values),
61 })
62 }
63}
64
65impl<K: ArrowDictionaryKeyType> StringColumnAccessor for DictionaryStringAccessor<K>
66where
67 K::Native: TryInto<usize>,
68{
69 fn value(&self, index: usize) -> Result<&str> {
70 let key = self.array.keys().value(index);
71 let key_usize = key
72 .try_into()
73 .map_err(|_| anyhow!("Dictionary key out of usize range"))?;
74 Ok(self.values.value(key_usize))
75 }
76
77 fn len(&self) -> usize {
78 self.array.len()
79 }
80
81 fn is_null(&self, index: usize) -> bool {
82 self.array.is_null(index)
83 }
84}
85
86pub fn create_string_accessor(array: &ArrayRef) -> Result<Box<dyn StringColumnAccessor + Send>> {
87 match array.data_type() {
88 DataType::Utf8 => {
89 let string_array = array
90 .as_any()
91 .downcast_ref::<StringArray>()
92 .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?
93 .clone();
94 Ok(Box::new(StringArrayAccessor::new(Arc::new(string_array))))
95 }
96 DataType::Dictionary(key_type, value_type) => {
97 if !matches!(value_type.as_ref(), DataType::Utf8) {
98 return Err(anyhow!("Dictionary values must be Utf8"));
99 }
100
101 match key_type.as_ref() {
102 DataType::Int8 => {
103 let dict_array = array
104 .as_any()
105 .downcast_ref::<DictionaryArray<Int8Type>>()
106 .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int8>"))?
107 .clone();
108 Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
109 dict_array,
110 ))?))
111 }
112 DataType::Int16 => {
113 let dict_array = array
114 .as_any()
115 .downcast_ref::<DictionaryArray<Int16Type>>()
116 .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int16>"))?
117 .clone();
118 Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
119 dict_array,
120 ))?))
121 }
122 DataType::Int32 => {
123 let dict_array = array
124 .as_any()
125 .downcast_ref::<DictionaryArray<Int32Type>>()
126 .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int32>"))?
127 .clone();
128 Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
129 dict_array,
130 ))?))
131 }
132 DataType::Int64 => {
133 let dict_array = array
134 .as_any()
135 .downcast_ref::<DictionaryArray<Int64Type>>()
136 .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int64>"))?
137 .clone();
138 Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
139 dict_array,
140 ))?))
141 }
142 _ => Err(anyhow!("Unsupported dictionary key type: {:?}", key_type)),
143 }
144 }
145 _ => Err(anyhow!(
146 "Unsupported array type for string accessor: {:?}",
147 array.data_type()
148 )),
149 }
150}
151
152pub fn string_column_by_name(
153 batch: &RecordBatch,
154 name: &str,
155) -> Result<Box<dyn StringColumnAccessor + Send>> {
156 let column = batch
157 .column_by_name(name)
158 .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
159 create_string_accessor(column)
160}