micromegas_analytics/properties/
properties_column_accessor.rs

1use anyhow::{Result, anyhow};
2use datafusion::arrow::array::{Array, ArrayRef, GenericListArray, RecordBatch};
3use datafusion::arrow::datatypes::DataType;
4use std::sync::Arc;
5
6use crate::arrow_properties::{read_property_list, serialize_properties_to_jsonb};
7use crate::dfext::binary_column_accessor::{BinaryColumnAccessor, create_binary_accessor};
8use std::collections::HashMap;
9
10/// Trait for accessing properties columns in a format-agnostic way.
11///
12/// This trait provides unified access to properties data regardless of the underlying
13/// Arrow format - either JSONB dictionary format (`Dictionary(Int32, Binary)`) or
14/// legacy struct array format (`GenericListArray<StructArray>`).
15///
16/// All methods return JSONB bytes for consistent processing downstream.
17pub trait PropertiesColumnAccessor: Send + std::fmt::Debug {
18    /// Get JSONB bytes for properties at the given index.
19    ///
20    /// For JSONB format: Returns the raw JSONB bytes directly.
21    /// For struct array format: Converts struct array to JSONB on-the-fly.
22    fn jsonb_value(&self, index: usize) -> Result<Vec<u8>>;
23
24    /// Get the number of rows in this column.
25    fn len(&self) -> usize;
26
27    /// Check if the value at the given index is null.
28    fn is_null(&self, index: usize) -> bool;
29
30    /// Check if this column is empty.
31    fn is_empty(&self) -> bool {
32        self.len() == 0
33    }
34}
35
36/// Accessor for JSONB format properties (both dictionary-encoded and plain binary)
37struct JsonbColumnAccessor {
38    binary_accessor: Box<dyn BinaryColumnAccessor + Send>,
39}
40
41impl JsonbColumnAccessor {
42    fn new(binary_accessor: Box<dyn BinaryColumnAccessor + Send>) -> Self {
43        Self { binary_accessor }
44    }
45}
46
47impl PropertiesColumnAccessor for JsonbColumnAccessor {
48    fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
49        // For JSONB format, return the raw bytes directly
50        Ok(self.binary_accessor.value(index).to_vec())
51    }
52
53    fn len(&self) -> usize {
54        self.binary_accessor.len()
55    }
56
57    fn is_null(&self, index: usize) -> bool {
58        self.binary_accessor.is_null(index)
59    }
60}
61
62impl std::fmt::Debug for JsonbColumnAccessor {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("JsonbColumnAccessor")
65            .field("len", &self.binary_accessor.len())
66            .finish()
67    }
68}
69
70/// Accessor for legacy struct array format: `GenericListArray<StructArray>`
71#[derive(Debug)]
72struct StructArrayAccessor {
73    array: Arc<GenericListArray<i32>>,
74}
75
76impl StructArrayAccessor {
77    fn new(array: Arc<GenericListArray<i32>>) -> Self {
78        Self { array }
79    }
80}
81
82impl PropertiesColumnAccessor for StructArrayAccessor {
83    fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
84        // Convert struct array to PropertySet, then to HashMap, then to JSONB
85        let property_list_array = self.array.value(index);
86        let properties = read_property_list(property_list_array)?;
87
88        // Convert Vec<Property> to HashMap<String, String>
89        let properties_map: HashMap<String, String> = properties
90            .into_iter()
91            .map(|prop| (prop.key_str().to_string(), prop.value_str().to_string()))
92            .collect();
93
94        // Serialize to JSONB bytes
95        serialize_properties_to_jsonb(&properties_map)
96    }
97
98    fn len(&self) -> usize {
99        self.array.len()
100    }
101
102    fn is_null(&self, index: usize) -> bool {
103        self.array.is_null(index)
104    }
105}
106
107/// Creates a properties column accessor that automatically detects the format.
108///
109/// Supports:
110/// - JSONB dictionary format: `Dictionary(Int32, Binary)` - used by current analytics tables
111/// - Plain binary format: `Binary` - JSONB data without dictionary encoding
112/// - Struct array format: `GenericListArray<StructArray>` - legacy format from replication
113///
114/// Returns a unified accessor that always provides JSONB bytes.
115pub fn create_properties_accessor(
116    array: &ArrayRef,
117) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
118    match array.data_type() {
119        // Modern JSONB dictionary format
120        DataType::Dictionary(key_type, value_type) => {
121            if matches!(key_type.as_ref(), DataType::Int32)
122                && matches!(value_type.as_ref(), DataType::Binary)
123            {
124                let binary_accessor = create_binary_accessor(array)?;
125                Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
126            } else {
127                Err(anyhow!(
128                    "Unsupported dictionary format for properties: key={:?}, value={:?}",
129                    key_type,
130                    value_type
131                ))
132            }
133        }
134
135        // Legacy struct array format
136        DataType::List(field) => {
137            // Verify this is a list of structs with key/value fields
138            if let DataType::Struct(struct_fields) = field.data_type() {
139                let has_key = struct_fields.iter().any(|f| f.name() == "key");
140                let has_value = struct_fields.iter().any(|f| f.name() == "value");
141
142                if has_key && has_value {
143                    let list_array = array
144                        .as_any()
145                        .downcast_ref::<GenericListArray<i32>>()
146                        .ok_or_else(|| anyhow!("Failed to downcast to GenericListArray<i32>"))?
147                        .clone();
148                    Ok(Box::new(StructArrayAccessor::new(Arc::new(list_array))))
149                } else {
150                    Err(anyhow!(
151                        "List array does not contain struct with key/value fields"
152                    ))
153                }
154            } else {
155                Err(anyhow!(
156                    "List array does not contain struct elements: {:?}",
157                    field.data_type()
158                ))
159            }
160        }
161
162        // Direct binary format (less common, but possible)
163        DataType::Binary => {
164            let binary_accessor = create_binary_accessor(array)?;
165            Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
166        }
167
168        _ => Err(anyhow!(
169            "Unsupported array type for properties accessor: {:?}",
170            array.data_type()
171        )),
172    }
173}
174
175/// Convenience function to get a properties column accessor by name from a RecordBatch.
176pub fn properties_column_by_name(
177    batch: &RecordBatch,
178    name: &str,
179) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
180    let column = batch
181        .column_by_name(name)
182        .ok_or_else(|| anyhow!("Column '{}' not found", name))?;
183    create_properties_accessor(column)
184}