micromegas_analytics/
measure.rs

1use crate::{
2    metadata::{ProcessMetadata, StreamMetadata},
3    payload::{fetch_block_payload, parse_block},
4    properties::property_set::PropertySet,
5    time::ConvertTicks,
6};
7use anyhow::{Context, Result};
8use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
9use micromegas_tracing::prelude::*;
10use micromegas_transit::value::{Object, Value};
11use std::sync::Arc;
12
13/// Represents a single metric measurement.
14pub struct Measure {
15    pub process: Arc<ProcessMetadata>,
16    pub stream_id: Arc<String>,
17    pub block_id: Arc<String>,
18    pub insert_time: i64, // nanoseconds
19    pub time: i64,        // nanoseconds
20    pub target: Arc<String>,
21    pub name: Arc<String>,
22    pub unit: Arc<String>,
23    pub value: f64,
24    pub properties: PropertySet,
25}
26
27/// Creates a `Measure` from a `Value`.
28pub fn measure_from_value(
29    process: Arc<ProcessMetadata>,
30    stream_id: Arc<String>,
31    block_id: Arc<String>,
32    block_insert_time_ns: i64,
33    convert_ticks: &ConvertTicks,
34    val: &Value,
35) -> Result<Option<Measure>> {
36    if let Value::Object(obj) = val {
37        match obj.type_name.as_str() {
38            "FloatMetricEvent" => {
39                let ticks = obj
40                    .get::<i64>("time")
41                    .with_context(|| "reading time from FloatMetricEvent")?;
42                let value = obj
43                    .get::<f64>("value")
44                    .with_context(|| "reading value from FloatMetricEvent")?;
45                let desc = obj
46                    .get::<Arc<Object>>("desc")
47                    .with_context(|| "reading desc from FloatMetricEvent")?;
48                let target = desc
49                    .get::<Arc<String>>("target")
50                    .with_context(|| "reading target from FloatMetricEvent")?;
51                let name = desc
52                    .get::<Arc<String>>("name")
53                    .with_context(|| "reading name from FloatMetricEvent")?;
54                let unit = desc
55                    .get::<Arc<String>>("unit")
56                    .with_context(|| "reading unit from FloatMetricEvent")?;
57                Ok(Some(Measure {
58                    process,
59                    stream_id,
60                    block_id,
61                    insert_time: block_insert_time_ns,
62                    time: convert_ticks.ticks_to_nanoseconds(ticks),
63                    target,
64                    name,
65                    unit,
66                    value,
67                    properties: PropertySet::empty(),
68                }))
69            }
70            "IntegerMetricEvent" => {
71                let ticks = obj
72                    .get::<i64>("time")
73                    .with_context(|| "reading time from IntegerMetricEvent")?;
74                let time = convert_ticks.ticks_to_nanoseconds(ticks);
75                let value = obj
76                    .get::<u64>("value")
77                    .with_context(|| "reading value from IntegerMetricEvent")?;
78                let desc = obj
79                    .get::<Arc<Object>>("desc")
80                    .with_context(|| "reading desc from IntegerMetricEvent")?;
81                let target = desc
82                    .get::<Arc<String>>("target")
83                    .with_context(|| "reading target from IntegerMetricEvent")?;
84                let name = desc
85                    .get::<Arc<String>>("name")
86                    .with_context(|| "reading name from IntegerMetricEvent")?;
87                let unit = desc
88                    .get::<Arc<String>>("unit")
89                    .with_context(|| "reading unit from IntegerMetricEvent")?;
90                if *unit == "ticks" {
91                    lazy_static::lazy_static! {
92                        static ref SECONDS_METRIC_UNIT: Arc<String> = Arc::new( String::from("seconds"));
93                    }
94                    Ok(Some(Measure {
95                        process,
96                        stream_id,
97                        block_id,
98                        insert_time: block_insert_time_ns,
99                        time,
100                        target,
101                        name,
102                        unit: SECONDS_METRIC_UNIT.clone(),
103                        value: convert_ticks.delta_ticks_to_ms(value as i64) / 1000.0,
104                        properties: PropertySet::empty(),
105                    }))
106                } else {
107                    Ok(Some(Measure {
108                        process,
109                        stream_id,
110                        block_id,
111                        insert_time: block_insert_time_ns,
112                        time,
113                        target,
114                        name,
115                        unit,
116                        value: value as f64,
117                        properties: PropertySet::empty(),
118                    }))
119                }
120            }
121            "TaggedIntegerMetricEvent" => {
122                let ticks = obj
123                    .get::<i64>("time")
124                    .with_context(|| "reading time from TaggedIntegerMetricEvent")?;
125                let time = convert_ticks.ticks_to_nanoseconds(ticks);
126                let value = obj
127                    .get::<u64>("value")
128                    .with_context(|| "reading value from TaggedIntegerMetricEvent")?;
129                let desc = obj
130                    .get::<Arc<Object>>("desc")
131                    .with_context(|| "reading desc from IntegerMetricEvent")?;
132                let mut target = desc
133                    .get::<Arc<String>>("target")
134                    .with_context(|| "reading target from IntegerMetricEvent")?;
135                let mut name = desc
136                    .get::<Arc<String>>("name")
137                    .with_context(|| "reading name from IntegerMetricEvent")?;
138                let mut unit = desc
139                    .get::<Arc<String>>("unit")
140                    .with_context(|| "reading unit from IntegerMetricEvent")?;
141                let properties = obj
142                    .get::<Arc<Object>>("properties")
143                    .with_context(|| "reading properties from TaggedIntegerMetricEvent")?;
144                for (prop_name, prop_value) in &properties.members {
145                    match (prop_name.as_str(), prop_value) {
146                        ("target", Value::String(value_str)) => {
147                            target = value_str.clone();
148                        }
149                        ("name", Value::String(value_str)) => {
150                            name = value_str.clone();
151                        }
152                        ("unit", Value::String(value_str)) => {
153                            unit = value_str.clone();
154                        }
155                        (&_, _) => {}
156                    }
157                }
158
159                if *unit == "ticks" {
160                    lazy_static::lazy_static! {
161                        static ref SECONDS_METRIC_UNIT: Arc<String> = Arc::new( String::from("seconds"));
162                    }
163                    Ok(Some(Measure {
164                        process,
165                        stream_id,
166                        block_id,
167                        insert_time: block_insert_time_ns,
168                        time,
169                        target,
170                        name,
171                        unit: SECONDS_METRIC_UNIT.clone(),
172                        value: convert_ticks.delta_ticks_to_ms(value as i64) / 1000.0,
173                        properties: properties.into(),
174                    }))
175                } else {
176                    Ok(Some(Measure {
177                        process,
178                        stream_id,
179                        block_id,
180                        insert_time: block_insert_time_ns,
181                        time,
182                        target,
183                        name,
184                        unit,
185                        value: value as f64,
186                        properties: properties.into(),
187                    }))
188                }
189            }
190            "TaggedFloatMetricEvent" => {
191                let ticks = obj
192                    .get::<i64>("time")
193                    .with_context(|| "reading time from TaggedFloatMetricEvent")?;
194                let time = convert_ticks.ticks_to_nanoseconds(ticks);
195                let value = obj
196                    .get::<f64>("value")
197                    .with_context(|| "reading value from TaggedFloatMetricEvent")?;
198                let desc = obj
199                    .get::<Arc<Object>>("desc")
200                    .with_context(|| "reading desc from TaggedFloatMetricEvent")?;
201                let mut target = desc
202                    .get::<Arc<String>>("target")
203                    .with_context(|| "reading target from TaggedFloatMetricEvent")?;
204                let mut name = desc
205                    .get::<Arc<String>>("name")
206                    .with_context(|| "reading name from TaggedFloatMetricEvent")?;
207                let mut unit = desc
208                    .get::<Arc<String>>("unit")
209                    .with_context(|| "reading unit from TaggedFloatMetricEvent")?;
210                let properties = obj
211                    .get::<Arc<Object>>("properties")
212                    .with_context(|| "reading properties from TaggedFloatMetricEvent")?;
213                for (prop_name, prop_value) in &properties.members {
214                    match (prop_name.as_str(), prop_value) {
215                        ("target", Value::String(value_str)) => {
216                            target = value_str.clone();
217                        }
218                        ("name", Value::String(value_str)) => {
219                            name = value_str.clone();
220                        }
221                        ("unit", Value::String(value_str)) => {
222                            unit = value_str.clone();
223                        }
224                        (&_, _) => {}
225                    }
226                }
227                Ok(Some(Measure {
228                    process,
229                    stream_id,
230                    block_id,
231                    insert_time: block_insert_time_ns,
232                    time,
233                    target,
234                    name,
235                    unit,
236                    value,
237                    properties: properties.into(),
238                }))
239            }
240
241            _ => {
242                warn!("unknown metric event {:?}", obj);
243                Ok(None)
244            }
245        }
246    } else {
247        Ok(None)
248    }
249}
250
251/// Iterates over each metric measurement in a block.
252#[span_fn]
253pub async fn for_each_measure_in_block<Predicate: FnMut(Measure) -> Result<bool>>(
254    blob_storage: Arc<BlobStorage>,
255    convert_ticks: &ConvertTicks,
256    process: Arc<ProcessMetadata>,
257    stream: &StreamMetadata,
258    block: &BlockMetadata,
259    mut fun: Predicate,
260) -> Result<bool> {
261    let payload = fetch_block_payload(
262        blob_storage,
263        stream.process_id,
264        stream.stream_id,
265        block.block_id,
266    )
267    .await?;
268    let stream_id = Arc::new(stream.stream_id.to_string());
269    let block_id = Arc::new(block.block_id.to_string());
270    let block_insert_time_ns = block.insert_time.timestamp_nanos_opt().unwrap_or_default();
271    let continue_iterating = parse_block(stream, &payload, |val| {
272        if let Some(measure) = measure_from_value(
273            process.clone(),
274            stream_id.clone(),
275            block_id.clone(),
276            block_insert_time_ns,
277            convert_ticks,
278            &val,
279        )
280        .with_context(|| "measure_from_value")?
281            && !fun(measure)?
282        {
283            return Ok(false); //do not continue
284        }
285        Ok(true) //continue
286    })
287    .with_context(|| format!("parse_block {}", block.block_id))?;
288    Ok(continue_iterating)
289}