micromegas_analytics/properties/property_set_jsonb_dictionary_builder.rs
1use crate::arrow_properties::serialize_property_set_to_jsonb;
2use crate::properties::property_set::PropertySet;
3use anyhow::Result;
4use datafusion::arrow::array::{BinaryArray, DictionaryArray, Int32Array};
5use datafusion::arrow::datatypes::Int32Type;
6use datafusion::common::DataFusionError;
7use micromegas_transit::value::Object;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11/// A wrapper around raw pointers that implements Send/Sync for use in HashMap keys.
12///
13/// This is safe because:
14/// 1. We only use the pointer for identity comparison (equality/hashing)
15/// 2. We never dereference the pointer
16/// 3. We keep the actual Arc<Object> alive in _property_refs
17/// 4. The cache is scoped to single block processing (no cross-thread sharing)
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19struct ObjectPointer(*const Object);
20
21unsafe impl Send for ObjectPointer {}
22unsafe impl Sync for ObjectPointer {}
23
24impl ObjectPointer {
25 fn new(arc_obj: &Arc<Object>) -> Self {
26 Self(Arc::as_ptr(arc_obj))
27 }
28}
29
30/// Custom dictionary builder for PropertySet → JSONB encoding with pointer-based deduplication.
31///
32/// This builder eliminates redundant JSONB serialization and dictionary hash lookups
33/// for duplicate PropertySets by using PropertySet's `Arc<Object>` pointer addresses as keys.
34///
35/// Performance benefits over Arrow's BinaryDictionaryBuilder:
36/// - Eliminates content-based hashing: Arrow's builder hashes JSONB bytes for deduplication
37/// - Pointer-based deduplication: O(1) pointer comparison vs O(n) content hash
38/// - Serialization only when needed: Only serialize PropertySet on first encounter
39/// - Memory efficiency: Shared PropertySet references, single JSONB copy per unique set
40pub struct PropertySetJsonbDictionaryBuilder {
41 /// Maps `Arc<Object>` pointer to dictionary index (avoids content hashing)
42 pointer_to_index: HashMap<ObjectPointer, i32>,
43 /// Pre-serialized JSONB values in dictionary
44 jsonb_values: Vec<Vec<u8>>,
45 /// Dictionary keys (indices) for each appended entry
46 keys: Vec<Option<i32>>,
47 /// Keep PropertySet references alive for pointer safety
48 _property_refs: Vec<Arc<Object>>,
49}
50
51impl PropertySetJsonbDictionaryBuilder {
52 /// Create a new builder with the specified capacity hint
53 pub fn new(capacity: usize) -> Self {
54 Self {
55 pointer_to_index: HashMap::with_capacity(capacity),
56 jsonb_values: Vec::with_capacity(capacity),
57 keys: Vec::with_capacity(capacity),
58 _property_refs: Vec::with_capacity(capacity),
59 }
60 }
61
62 /// Append PropertySet using pointer-based deduplication
63 ///
64 /// For cache hits: reuses existing dictionary index (no serialization)
65 /// For cache misses: serializes once and stores in dictionary
66 pub fn append_property_set(&mut self, property_set: &PropertySet) -> Result<()> {
67 let arc_obj = property_set.as_arc_object();
68 let ptr = ObjectPointer::new(arc_obj);
69
70 match self.pointer_to_index.get(&ptr) {
71 Some(&index) => {
72 // Cache hit: reuse existing dictionary index (no serialization)
73 self.keys.push(Some(index));
74 }
75 None => {
76 // Cache miss: serialize once and store in dictionary
77 let jsonb_bytes = serialize_property_set_to_jsonb(property_set)?;
78 let new_index = self.jsonb_values.len() as i32;
79
80 self.jsonb_values.push(jsonb_bytes);
81 self.pointer_to_index.insert(ptr, new_index);
82 self.keys.push(Some(new_index));
83 // Keep Arc reference alive to ensure pointer validity
84 self._property_refs.push(Arc::clone(arc_obj));
85 }
86 }
87 Ok(())
88 }
89
90 /// Append a null value
91 pub fn append_null(&mut self) {
92 self.keys.push(None);
93 }
94
95 /// Finish building and return the DictionaryArray
96 ///
97 /// Output is identical to Arrow's BinaryDictionaryBuilder for compatibility
98 pub fn finish(self) -> Result<DictionaryArray<Int32Type>> {
99 let keys = Int32Array::from(self.keys);
100 // Convert Vec<Vec<u8>> to Vec<&[u8]> for BinaryArray::from_vec
101 let byte_slices: Vec<&[u8]> = self.jsonb_values.iter().map(|v| v.as_slice()).collect();
102 let values = Arc::new(BinaryArray::from_vec(byte_slices));
103 DictionaryArray::try_new(keys, values)
104 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None).into())
105 }
106
107 /// Get the current number of appended entries
108 pub fn len(&self) -> usize {
109 self.keys.len()
110 }
111
112 /// Check if the builder is empty
113 pub fn is_empty(&self) -> bool {
114 self.keys.is_empty()
115 }
116}