micromegas_analytics/dfext/
string_column_accessor.rs

1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{
3    Array, ArrayRef, DictionaryArray, RecordBatch, StringArray, types::ArrowDictionaryKeyType,
4};
5use datafusion::arrow::datatypes::{DataType, Int8Type, Int16Type, Int32Type, Int64Type};
6use std::sync::Arc;
7
8pub trait StringColumnAccessor: Send {
9    fn value(&self, index: usize) -> Result<&str>;
10
11    fn len(&self) -> usize;
12
13    fn is_null(&self, index: usize) -> bool;
14
15    fn is_empty(&self) -> bool {
16        self.len() == 0
17    }
18}
19
20struct StringArrayAccessor {
21    array: Arc<StringArray>,
22}
23
24impl StringArrayAccessor {
25    fn new(array: Arc<StringArray>) -> Self {
26        Self { array }
27    }
28}
29
30impl StringColumnAccessor for StringArrayAccessor {
31    fn value(&self, index: usize) -> Result<&str> {
32        Ok(self.array.value(index))
33    }
34
35    fn len(&self) -> usize {
36        self.array.len()
37    }
38
39    fn is_null(&self, index: usize) -> bool {
40        self.array.is_null(index)
41    }
42}
43
44struct DictionaryStringAccessor<K: ArrowDictionaryKeyType> {
45    array: Arc<DictionaryArray<K>>,
46    values: Arc<StringArray>,
47}
48
49impl<K: ArrowDictionaryKeyType> DictionaryStringAccessor<K> {
50    fn new(array: Arc<DictionaryArray<K>>) -> Result<Self> {
51        let values = array
52            .values()
53            .as_any()
54            .downcast_ref::<StringArray>()
55            .ok_or_else(|| anyhow!("Dictionary values are not StringArray"))?
56            .clone();
57
58        Ok(Self {
59            array,
60            values: Arc::new(values),
61        })
62    }
63}
64
65impl<K: ArrowDictionaryKeyType> StringColumnAccessor for DictionaryStringAccessor<K>
66where
67    K::Native: TryInto<usize>,
68{
69    fn value(&self, index: usize) -> Result<&str> {
70        let key = self.array.keys().value(index);
71        let key_usize = key
72            .try_into()
73            .map_err(|_| anyhow!("Dictionary key out of usize range"))?;
74        Ok(self.values.value(key_usize))
75    }
76
77    fn len(&self) -> usize {
78        self.array.len()
79    }
80
81    fn is_null(&self, index: usize) -> bool {
82        self.array.is_null(index)
83    }
84}
85
86pub fn create_string_accessor(array: &ArrayRef) -> Result<Box<dyn StringColumnAccessor + Send>> {
87    match array.data_type() {
88        DataType::Utf8 => {
89            let string_array = array
90                .as_any()
91                .downcast_ref::<StringArray>()
92                .ok_or_else(|| anyhow!("Failed to downcast to StringArray"))?
93                .clone();
94            Ok(Box::new(StringArrayAccessor::new(Arc::new(string_array))))
95        }
96        DataType::Dictionary(key_type, value_type) => {
97            if !matches!(value_type.as_ref(), DataType::Utf8) {
98                return Err(anyhow!("Dictionary values must be Utf8"));
99            }
100
101            match key_type.as_ref() {
102                DataType::Int8 => {
103                    let dict_array = array
104                        .as_any()
105                        .downcast_ref::<DictionaryArray<Int8Type>>()
106                        .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int8>"))?
107                        .clone();
108                    Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
109                        dict_array,
110                    ))?))
111                }
112                DataType::Int16 => {
113                    let dict_array = array
114                        .as_any()
115                        .downcast_ref::<DictionaryArray<Int16Type>>()
116                        .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int16>"))?
117                        .clone();
118                    Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
119                        dict_array,
120                    ))?))
121                }
122                DataType::Int32 => {
123                    let dict_array = array
124                        .as_any()
125                        .downcast_ref::<DictionaryArray<Int32Type>>()
126                        .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int32>"))?
127                        .clone();
128                    Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
129                        dict_array,
130                    ))?))
131                }
132                DataType::Int64 => {
133                    let dict_array = array
134                        .as_any()
135                        .downcast_ref::<DictionaryArray<Int64Type>>()
136                        .ok_or_else(|| anyhow!("Failed to downcast to DictionaryArray<Int64>"))?
137                        .clone();
138                    Ok(Box::new(DictionaryStringAccessor::new(Arc::new(
139                        dict_array,
140                    ))?))
141                }
142                _ => Err(anyhow!("Unsupported dictionary key type: {:?}", key_type)),
143            }
144        }
145        _ => Err(anyhow!(
146            "Unsupported array type for string accessor: {:?}",
147            array.data_type()
148        )),
149    }
150}
151
152pub fn string_column_by_name(
153    batch: &RecordBatch,
154    name: &str,
155) -> Result<Box<dyn StringColumnAccessor + Send>> {
156    let column = batch
157        .column_by_name(name)
158        .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
159    create_string_accessor(column)
160}