micromegas_analytics/properties/
property_set_jsonb_dictionary_builder.rs

1use crate::arrow_properties::serialize_property_set_to_jsonb;
2use crate::properties::property_set::PropertySet;
3use anyhow::Result;
4use datafusion::arrow::array::{BinaryArray, DictionaryArray, Int32Array};
5use datafusion::arrow::datatypes::Int32Type;
6use datafusion::common::DataFusionError;
7use micromegas_transit::value::Object;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11/// A wrapper around raw pointers that implements Send/Sync for use in HashMap keys.
12///
13/// This is safe because:
14/// 1. We only use the pointer for identity comparison (equality/hashing)
15/// 2. We never dereference the pointer
16/// 3. We keep the actual Arc<Object> alive in _property_refs
17/// 4. The cache is scoped to single block processing (no cross-thread sharing)
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19struct ObjectPointer(*const Object);
20
21unsafe impl Send for ObjectPointer {}
22unsafe impl Sync for ObjectPointer {}
23
24impl ObjectPointer {
25    fn new(arc_obj: &Arc<Object>) -> Self {
26        Self(Arc::as_ptr(arc_obj))
27    }
28}
29
30/// Custom dictionary builder for PropertySet → JSONB encoding with pointer-based deduplication.
31///
32/// This builder eliminates redundant JSONB serialization and dictionary hash lookups
33/// for duplicate PropertySets by using PropertySet's `Arc<Object>` pointer addresses as keys.
34///
35/// Performance benefits over Arrow's BinaryDictionaryBuilder:
36/// - Eliminates content-based hashing: Arrow's builder hashes JSONB bytes for deduplication
37/// - Pointer-based deduplication: O(1) pointer comparison vs O(n) content hash
38/// - Serialization only when needed: Only serialize PropertySet on first encounter
39/// - Memory efficiency: Shared PropertySet references, single JSONB copy per unique set
40pub struct PropertySetJsonbDictionaryBuilder {
41    /// Maps `Arc<Object>` pointer to dictionary index (avoids content hashing)
42    pointer_to_index: HashMap<ObjectPointer, i32>,
43    /// Pre-serialized JSONB values in dictionary
44    jsonb_values: Vec<Vec<u8>>,
45    /// Dictionary keys (indices) for each appended entry
46    keys: Vec<Option<i32>>,
47    /// Keep PropertySet references alive for pointer safety
48    _property_refs: Vec<Arc<Object>>,
49}
50
51impl PropertySetJsonbDictionaryBuilder {
52    /// Create a new builder with the specified capacity hint
53    pub fn new(capacity: usize) -> Self {
54        Self {
55            pointer_to_index: HashMap::with_capacity(capacity),
56            jsonb_values: Vec::with_capacity(capacity),
57            keys: Vec::with_capacity(capacity),
58            _property_refs: Vec::with_capacity(capacity),
59        }
60    }
61
62    /// Append PropertySet using pointer-based deduplication
63    ///
64    /// For cache hits: reuses existing dictionary index (no serialization)
65    /// For cache misses: serializes once and stores in dictionary
66    pub fn append_property_set(&mut self, property_set: &PropertySet) -> Result<()> {
67        let arc_obj = property_set.as_arc_object();
68        let ptr = ObjectPointer::new(arc_obj);
69
70        match self.pointer_to_index.get(&ptr) {
71            Some(&index) => {
72                // Cache hit: reuse existing dictionary index (no serialization)
73                self.keys.push(Some(index));
74            }
75            None => {
76                // Cache miss: serialize once and store in dictionary
77                let jsonb_bytes = serialize_property_set_to_jsonb(property_set)?;
78                let new_index = self.jsonb_values.len() as i32;
79
80                self.jsonb_values.push(jsonb_bytes);
81                self.pointer_to_index.insert(ptr, new_index);
82                self.keys.push(Some(new_index));
83                // Keep Arc reference alive to ensure pointer validity
84                self._property_refs.push(Arc::clone(arc_obj));
85            }
86        }
87        Ok(())
88    }
89
90    /// Append a null value
91    pub fn append_null(&mut self) {
92        self.keys.push(None);
93    }
94
95    /// Finish building and return the DictionaryArray
96    ///
97    /// Output is identical to Arrow's BinaryDictionaryBuilder for compatibility
98    pub fn finish(self) -> Result<DictionaryArray<Int32Type>> {
99        let keys = Int32Array::from(self.keys);
100        // Convert Vec<Vec<u8>> to Vec<&[u8]> for BinaryArray::from_vec
101        let byte_slices: Vec<&[u8]> = self.jsonb_values.iter().map(|v| v.as_slice()).collect();
102        let values = Arc::new(BinaryArray::from_vec(byte_slices));
103        DictionaryArray::try_new(keys, values)
104            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None).into())
105    }
106
107    /// Get the current number of appended entries
108    pub fn len(&self) -> usize {
109        self.keys.len()
110    }
111
112    /// Check if the builder is empty
113    pub fn is_empty(&self) -> bool {
114        self.keys.is_empty()
115    }
116}