1use crate::{
2 metadata::{ProcessMetadata, StreamMetadata},
3 payload::{fetch_block_payload, parse_block},
4 properties::property_set::PropertySet,
5 time::ConvertTicks,
6};
7use anyhow::{Context, Result};
8use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
9use micromegas_tracing::prelude::*;
10use micromegas_transit::value::{Object, Value};
11use std::sync::Arc;
12
13pub struct Measure {
15 pub process: Arc<ProcessMetadata>,
16 pub stream_id: Arc<String>,
17 pub block_id: Arc<String>,
18 pub insert_time: i64, pub time: i64, pub target: Arc<String>,
21 pub name: Arc<String>,
22 pub unit: Arc<String>,
23 pub value: f64,
24 pub properties: PropertySet,
25}
26
27pub fn measure_from_value(
29 process: Arc<ProcessMetadata>,
30 stream_id: Arc<String>,
31 block_id: Arc<String>,
32 block_insert_time_ns: i64,
33 convert_ticks: &ConvertTicks,
34 val: &Value,
35) -> Result<Option<Measure>> {
36 if let Value::Object(obj) = val {
37 match obj.type_name.as_str() {
38 "FloatMetricEvent" => {
39 let ticks = obj
40 .get::<i64>("time")
41 .with_context(|| "reading time from FloatMetricEvent")?;
42 let value = obj
43 .get::<f64>("value")
44 .with_context(|| "reading value from FloatMetricEvent")?;
45 let desc = obj
46 .get::<Arc<Object>>("desc")
47 .with_context(|| "reading desc from FloatMetricEvent")?;
48 let target = desc
49 .get::<Arc<String>>("target")
50 .with_context(|| "reading target from FloatMetricEvent")?;
51 let name = desc
52 .get::<Arc<String>>("name")
53 .with_context(|| "reading name from FloatMetricEvent")?;
54 let unit = desc
55 .get::<Arc<String>>("unit")
56 .with_context(|| "reading unit from FloatMetricEvent")?;
57 Ok(Some(Measure {
58 process,
59 stream_id,
60 block_id,
61 insert_time: block_insert_time_ns,
62 time: convert_ticks.ticks_to_nanoseconds(ticks),
63 target,
64 name,
65 unit,
66 value,
67 properties: PropertySet::empty(),
68 }))
69 }
70 "IntegerMetricEvent" => {
71 let ticks = obj
72 .get::<i64>("time")
73 .with_context(|| "reading time from IntegerMetricEvent")?;
74 let time = convert_ticks.ticks_to_nanoseconds(ticks);
75 let value = obj
76 .get::<u64>("value")
77 .with_context(|| "reading value from IntegerMetricEvent")?;
78 let desc = obj
79 .get::<Arc<Object>>("desc")
80 .with_context(|| "reading desc from IntegerMetricEvent")?;
81 let target = desc
82 .get::<Arc<String>>("target")
83 .with_context(|| "reading target from IntegerMetricEvent")?;
84 let name = desc
85 .get::<Arc<String>>("name")
86 .with_context(|| "reading name from IntegerMetricEvent")?;
87 let unit = desc
88 .get::<Arc<String>>("unit")
89 .with_context(|| "reading unit from IntegerMetricEvent")?;
90 if *unit == "ticks" {
91 lazy_static::lazy_static! {
92 static ref SECONDS_METRIC_UNIT: Arc<String> = Arc::new( String::from("seconds"));
93 }
94 Ok(Some(Measure {
95 process,
96 stream_id,
97 block_id,
98 insert_time: block_insert_time_ns,
99 time,
100 target,
101 name,
102 unit: SECONDS_METRIC_UNIT.clone(),
103 value: convert_ticks.delta_ticks_to_ms(value as i64) / 1000.0,
104 properties: PropertySet::empty(),
105 }))
106 } else {
107 Ok(Some(Measure {
108 process,
109 stream_id,
110 block_id,
111 insert_time: block_insert_time_ns,
112 time,
113 target,
114 name,
115 unit,
116 value: value as f64,
117 properties: PropertySet::empty(),
118 }))
119 }
120 }
121 "TaggedIntegerMetricEvent" => {
122 let ticks = obj
123 .get::<i64>("time")
124 .with_context(|| "reading time from TaggedIntegerMetricEvent")?;
125 let time = convert_ticks.ticks_to_nanoseconds(ticks);
126 let value = obj
127 .get::<u64>("value")
128 .with_context(|| "reading value from TaggedIntegerMetricEvent")?;
129 let desc = obj
130 .get::<Arc<Object>>("desc")
131 .with_context(|| "reading desc from IntegerMetricEvent")?;
132 let mut target = desc
133 .get::<Arc<String>>("target")
134 .with_context(|| "reading target from IntegerMetricEvent")?;
135 let mut name = desc
136 .get::<Arc<String>>("name")
137 .with_context(|| "reading name from IntegerMetricEvent")?;
138 let mut unit = desc
139 .get::<Arc<String>>("unit")
140 .with_context(|| "reading unit from IntegerMetricEvent")?;
141 let properties = obj
142 .get::<Arc<Object>>("properties")
143 .with_context(|| "reading properties from TaggedIntegerMetricEvent")?;
144 for (prop_name, prop_value) in &properties.members {
145 match (prop_name.as_str(), prop_value) {
146 ("target", Value::String(value_str)) => {
147 target = value_str.clone();
148 }
149 ("name", Value::String(value_str)) => {
150 name = value_str.clone();
151 }
152 ("unit", Value::String(value_str)) => {
153 unit = value_str.clone();
154 }
155 (&_, _) => {}
156 }
157 }
158
159 if *unit == "ticks" {
160 lazy_static::lazy_static! {
161 static ref SECONDS_METRIC_UNIT: Arc<String> = Arc::new( String::from("seconds"));
162 }
163 Ok(Some(Measure {
164 process,
165 stream_id,
166 block_id,
167 insert_time: block_insert_time_ns,
168 time,
169 target,
170 name,
171 unit: SECONDS_METRIC_UNIT.clone(),
172 value: convert_ticks.delta_ticks_to_ms(value as i64) / 1000.0,
173 properties: properties.into(),
174 }))
175 } else {
176 Ok(Some(Measure {
177 process,
178 stream_id,
179 block_id,
180 insert_time: block_insert_time_ns,
181 time,
182 target,
183 name,
184 unit,
185 value: value as f64,
186 properties: properties.into(),
187 }))
188 }
189 }
190 "TaggedFloatMetricEvent" => {
191 let ticks = obj
192 .get::<i64>("time")
193 .with_context(|| "reading time from TaggedFloatMetricEvent")?;
194 let time = convert_ticks.ticks_to_nanoseconds(ticks);
195 let value = obj
196 .get::<f64>("value")
197 .with_context(|| "reading value from TaggedFloatMetricEvent")?;
198 let desc = obj
199 .get::<Arc<Object>>("desc")
200 .with_context(|| "reading desc from TaggedFloatMetricEvent")?;
201 let mut target = desc
202 .get::<Arc<String>>("target")
203 .with_context(|| "reading target from TaggedFloatMetricEvent")?;
204 let mut name = desc
205 .get::<Arc<String>>("name")
206 .with_context(|| "reading name from TaggedFloatMetricEvent")?;
207 let mut unit = desc
208 .get::<Arc<String>>("unit")
209 .with_context(|| "reading unit from TaggedFloatMetricEvent")?;
210 let properties = obj
211 .get::<Arc<Object>>("properties")
212 .with_context(|| "reading properties from TaggedFloatMetricEvent")?;
213 for (prop_name, prop_value) in &properties.members {
214 match (prop_name.as_str(), prop_value) {
215 ("target", Value::String(value_str)) => {
216 target = value_str.clone();
217 }
218 ("name", Value::String(value_str)) => {
219 name = value_str.clone();
220 }
221 ("unit", Value::String(value_str)) => {
222 unit = value_str.clone();
223 }
224 (&_, _) => {}
225 }
226 }
227 Ok(Some(Measure {
228 process,
229 stream_id,
230 block_id,
231 insert_time: block_insert_time_ns,
232 time,
233 target,
234 name,
235 unit,
236 value,
237 properties: properties.into(),
238 }))
239 }
240
241 _ => {
242 warn!("unknown metric event {:?}", obj);
243 Ok(None)
244 }
245 }
246 } else {
247 Ok(None)
248 }
249}
250
251#[span_fn]
253pub async fn for_each_measure_in_block<Predicate: FnMut(Measure) -> Result<bool>>(
254 blob_storage: Arc<BlobStorage>,
255 convert_ticks: &ConvertTicks,
256 process: Arc<ProcessMetadata>,
257 stream: &StreamMetadata,
258 block: &BlockMetadata,
259 mut fun: Predicate,
260) -> Result<bool> {
261 let payload = fetch_block_payload(
262 blob_storage,
263 stream.process_id,
264 stream.stream_id,
265 block.block_id,
266 )
267 .await?;
268 let stream_id = Arc::new(stream.stream_id.to_string());
269 let block_id = Arc::new(block.block_id.to_string());
270 let block_insert_time_ns = block.insert_time.timestamp_nanos_opt().unwrap_or_default();
271 let continue_iterating = parse_block(stream, &payload, |val| {
272 if let Some(measure) = measure_from_value(
273 process.clone(),
274 stream_id.clone(),
275 block_id.clone(),
276 block_insert_time_ns,
277 convert_ticks,
278 &val,
279 )
280 .with_context(|| "measure_from_value")?
281 && !fun(measure)?
282 {
283 return Ok(false); }
285 Ok(true) })
287 .with_context(|| format!("parse_block {}", block.block_id))?;
288 Ok(continue_iterating)
289}