micromegas_analytics/
arrow_properties.rs1use crate::properties::property_set::PropertySet;
2use anyhow::{Context, Result};
3use datafusion::arrow::array::{
4 Array, ArrayRef, AsArray, BinaryDictionaryBuilder, ListBuilder, StringBuilder, StructArray,
5 StructBuilder,
6};
7use datafusion::arrow::datatypes::Int32Type;
8use jsonb::Value;
9use micromegas_telemetry::property::Property;
10use std::borrow::Cow;
11use std::collections::{BTreeMap, HashMap};
12use std::sync::Arc;
13
14pub fn read_property_list(value: ArrayRef) -> Result<Vec<Property>> {
18 if value.is_empty() {
19 return Ok(vec![]);
20 }
21 let properties: &StructArray = value
22 .as_struct_opt()
23 .with_context(|| format!("property list in not a struct array: {:?}", value.as_any()))?;
24 let (key_index, _key_field) = properties
25 .fields()
26 .find("key")
27 .with_context(|| "getting key field")?;
28 let (value_index, _value_field) = properties
29 .fields()
30 .find("value")
31 .with_context(|| "getting value field")?;
32 let mut properties_vec = vec![];
33 for i in 0..properties.len() {
34 let key = properties.column(key_index).as_string::<i32>().value(i);
35 let value = properties.column(value_index).as_string::<i32>().value(i);
36 properties_vec.push(Property::new(Arc::new(key.into()), Arc::new(value.into())));
37 }
38 Ok(properties_vec)
39}
40
41pub fn add_properties_to_builder(
45 properties: &HashMap<String, String>,
46 property_list_builder: &mut ListBuilder<StructBuilder>,
47) -> Result<()> {
48 let properties_builder = property_list_builder.values();
49 for (k, v) in properties.iter() {
50 let key_builder = properties_builder
51 .field_builder::<StringBuilder>(0)
52 .with_context(|| "getting key field builder")?;
53 key_builder.append_value(k);
54 let value_builder = properties_builder
55 .field_builder::<StringBuilder>(1)
56 .with_context(|| "getting value field builder")?;
57 value_builder.append_value(v);
58 properties_builder.append(true);
59 }
60 property_list_builder.append(true);
61 Ok(())
62}
63
64pub fn add_property_set_to_builder(
68 properties: &PropertySet,
69 property_list_builder: &mut ListBuilder<StructBuilder>,
70) -> Result<()> {
71 let properties_builder = property_list_builder.values();
72 properties.for_each_property(|prop| {
73 let key_builder = properties_builder
74 .field_builder::<StringBuilder>(0)
75 .with_context(|| "getting key field builder")?;
76 key_builder.append_value(prop.key_str());
77 let value_builder = properties_builder
78 .field_builder::<StringBuilder>(1)
79 .with_context(|| "getting value field builder")?;
80 value_builder.append_value(prop.value_str());
81 properties_builder.append(true);
82 Ok(())
83 })?;
84 property_list_builder.append(true);
85 Ok(())
86}
87
88pub fn add_properties_to_jsonb_builder(
92 properties: &HashMap<String, String>,
93 jsonb_builder: &mut BinaryDictionaryBuilder<Int32Type>,
94) -> Result<()> {
95 let jsonb_bytes = serialize_properties_to_jsonb(properties)?;
96 jsonb_builder.append_value(&jsonb_bytes);
97 Ok(())
98}
99
100pub fn add_property_set_to_jsonb_builder(
104 properties: &PropertySet,
105 jsonb_builder: &mut BinaryDictionaryBuilder<Int32Type>,
106) -> Result<()> {
107 let jsonb_bytes = serialize_property_set_to_jsonb(properties)?;
108 jsonb_builder.append_value(&jsonb_bytes);
109 Ok(())
110}
111
112pub fn serialize_property_set_to_jsonb(properties: &PropertySet) -> Result<Vec<u8>> {
117 let mut btree_map = BTreeMap::new();
118
119 properties.for_each_property(|prop| {
120 btree_map.insert(
121 prop.key_str().to_string(),
122 Value::String(Cow::Owned(prop.value_str().to_string())),
123 );
124 Ok(())
125 })?;
126
127 let jsonb_value = Value::Object(btree_map);
128 let mut jsonb_bytes = Vec::new();
129 jsonb_value.write_to_vec(&mut jsonb_bytes);
130
131 Ok(jsonb_bytes)
132}
133
134pub fn serialize_properties_to_jsonb(properties: &HashMap<String, String>) -> Result<Vec<u8>> {
139 let btree_map: BTreeMap<String, Value> = properties
141 .iter()
142 .map(|(k, v)| (k.clone(), Value::String(Cow::Borrowed(v))))
143 .collect();
144
145 let jsonb_value = Value::Object(btree_map);
146 let mut jsonb_bytes = Vec::new();
147 jsonb_value.write_to_vec(&mut jsonb_bytes);
148
149 Ok(jsonb_bytes)
150}