micromegas_analytics/
log_entry.rs

1use crate::{
2    metadata::{ProcessMetadata, StreamMetadata},
3    payload::{fetch_block_payload, parse_block},
4    properties::property_set::PropertySet,
5    time::ConvertTicks,
6};
7use anyhow::{Context, Result};
8use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
9use micromegas_tracing::prelude::*;
10use micromegas_transit::value::{Object, Value};
11use std::sync::Arc;
12
13/// A single log entry.
14#[derive(Debug)]
15pub struct LogEntry {
16    pub process: Arc<ProcessMetadata>,
17    pub stream_id: Arc<String>,
18    pub block_id: Arc<String>,
19    pub insert_time: i64,
20    pub time: i64,
21    pub level: i32,
22    pub target: Arc<String>,
23    pub msg: Arc<String>,
24    pub properties: PropertySet,
25}
26
27/// Creates a `LogEntry` from a `Value`.
28#[span_fn]
29pub fn log_entry_from_value(
30    convert_ticks: &ConvertTicks,
31    process: Arc<ProcessMetadata>,
32    stream_id: Arc<String>,
33    block_id: Arc<String>,
34    block_insert_time_ns: i64,
35    val: &Value,
36) -> Result<Option<LogEntry>> {
37    if let Value::Object(obj) = val {
38        match obj.type_name.as_str() {
39            "LogStaticStrEvent" => {
40                let ticks = obj
41                    .get::<i64>("time")
42                    .with_context(|| "reading time from LogStaticStrEvent")?;
43                let desc = obj
44                    .get::<Arc<Object>>("desc")
45                    .with_context(|| "reading desc from LogStaticStrEvent")?;
46                let level = desc
47                    .get::<u32>("level")
48                    .with_context(|| "reading level from LogStaticStrEvent")?;
49                let target = desc
50                    .get::<Arc<String>>("target")
51                    .with_context(|| "reading target from LogStaticStrEvent")?;
52                let msg = desc
53                    .get::<Arc<String>>("fmt_str")
54                    .with_context(|| "reading fmt_str from LogStaticStrEvent")?;
55                Ok(Some(LogEntry {
56                    process,
57                    stream_id,
58                    block_id,
59                    insert_time: block_insert_time_ns,
60                    time: convert_ticks.ticks_to_nanoseconds(ticks),
61                    level: level as i32,
62                    target,
63                    msg,
64                    properties: PropertySet::empty(),
65                }))
66            }
67            "LogStringEvent" | "LogStringEventV2" => {
68                let ticks = obj
69                    .get::<i64>("time")
70                    .with_context(|| "reading time from LogStringEvent")?;
71                let desc = obj
72                    .get::<Arc<Object>>("desc")
73                    .with_context(|| "reading desc from LogStringEvent")?;
74                let level = desc
75                    .get::<u32>("level")
76                    .with_context(|| "reading level from LogStringEvent")?;
77                let target = desc
78                    .get::<Arc<String>>("target")
79                    .with_context(|| "reading target from LogStringEvent")?;
80                let msg = obj
81                    .get::<Arc<String>>("msg")
82                    .with_context(|| "reading msg from LogStringEvent")?;
83                Ok(Some(LogEntry {
84                    process,
85                    stream_id,
86                    block_id,
87                    insert_time: block_insert_time_ns,
88                    time: convert_ticks.ticks_to_nanoseconds(ticks),
89                    level: level as i32,
90                    target,
91                    msg,
92                    properties: PropertySet::empty(),
93                }))
94            }
95            "LogStaticStrInteropEvent" | "LogStringInteropEventV2" | "LogStringInteropEventV3" => {
96                let ticks = obj
97                    .get::<i64>("time")
98                    .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
99                let level = obj
100                    .get::<u32>("level")
101                    .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
102                let target = obj
103                    .get::<Arc<String>>("target")
104                    .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
105                let msg = obj
106                    .get::<Arc<String>>("msg")
107                    .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
108                Ok(Some(LogEntry {
109                    process,
110                    stream_id,
111                    block_id,
112                    insert_time: block_insert_time_ns,
113                    time: convert_ticks.ticks_to_nanoseconds(ticks),
114                    level: level as i32,
115                    target,
116                    msg,
117                    properties: PropertySet::empty(),
118                }))
119            }
120            "TaggedLogInteropEvent" => {
121                let ticks = obj
122                    .get::<i64>("time")
123                    .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
124                let level = obj
125                    .get::<u32>("level")
126                    .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
127                let target = obj
128                    .get::<Arc<String>>("target")
129                    .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
130                let msg = obj
131                    .get::<Arc<String>>("msg")
132                    .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
133                let properties = obj.get::<Arc<Object>>("properties").with_context(|| {
134                    format!("reading properties from {}", obj.type_name.as_str())
135                })?;
136                let time = convert_ticks.ticks_to_nanoseconds(ticks);
137                Ok(Some(LogEntry {
138                    process,
139                    stream_id,
140                    block_id,
141                    insert_time: block_insert_time_ns,
142                    time,
143                    level: level as i32,
144                    target,
145                    msg,
146                    properties: properties.into(),
147                }))
148            }
149            "TaggedLogString" => {
150                let ticks = obj
151                    .get::<i64>("time")
152                    .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
153                let msg = obj
154                    .get::<Arc<String>>("msg")
155                    .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
156                let desc = obj
157                    .get::<Arc<Object>>("desc")
158                    .with_context(|| format!("reading desc from {}", obj.type_name.as_str()))?;
159                let mut level = desc
160                    .get::<u32>("level")
161                    .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
162                let mut target = desc
163                    .get::<Arc<String>>("target")
164                    .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
165                let properties = obj.get::<Arc<Object>>("properties").with_context(|| {
166                    format!("reading properties from {}", obj.type_name.as_str())
167                })?;
168                for (prop_name, prop_value) in &properties.members {
169                    match (prop_name.as_str(), prop_value) {
170                        ("target", Value::String(value_str)) => {
171                            target = value_str.clone();
172                        }
173                        ("level", Value::String(level_str)) => {
174                            level = Level::parse(level_str).with_context(|| "parsing log level")?
175                                as u32;
176                        }
177                        (&_, _) => {}
178                    }
179                }
180                Ok(Some(LogEntry {
181                    process,
182                    stream_id,
183                    block_id,
184                    insert_time: block_insert_time_ns,
185                    time: convert_ticks.ticks_to_nanoseconds(ticks),
186                    level: level as i32,
187                    target,
188                    msg,
189                    properties: properties.into(),
190                }))
191            }
192
193            _ => {
194                warn!("unknown log event {:?}", obj);
195                Ok(None)
196            }
197        }
198    } else {
199        Ok(None)
200    }
201}
202
203/// Iterates over all log entries in a block.
204#[span_fn]
205pub async fn for_each_log_entry_in_block<Predicate: FnMut(LogEntry) -> Result<bool>>(
206    blob_storage: Arc<BlobStorage>,
207    convert_ticks: &ConvertTicks,
208    process: Arc<ProcessMetadata>,
209    stream: &StreamMetadata,
210    block: &BlockMetadata,
211    mut fun: Predicate,
212) -> Result<bool> {
213    let payload = fetch_block_payload(
214        blob_storage,
215        stream.process_id,
216        stream.stream_id,
217        block.block_id,
218    )
219    .await?;
220    let stream_id = Arc::new(stream.stream_id.to_string());
221    let block_id = Arc::new(block.block_id.to_string());
222    let block_insert_time_ns = block.insert_time.timestamp_nanos_opt().unwrap_or_default();
223    let continue_iterating = parse_block(stream, &payload, |val| {
224        if let Some(log_entry) = log_entry_from_value(
225            convert_ticks,
226            process.clone(),
227            stream_id.clone(),
228            block_id.clone(),
229            block_insert_time_ns,
230            &val,
231        )
232        .with_context(|| "log_entry_from_value")?
233            && !fun(log_entry)?
234        {
235            return Ok(false); //do not continue
236        }
237        Ok(true) //continue
238    })
239    .with_context(|| format!("parse_block {}", block.block_id))?;
240    Ok(continue_iterating)
241}