1use crate::{
2 metadata::{ProcessMetadata, StreamMetadata},
3 payload::{fetch_block_payload, parse_block},
4 properties::property_set::PropertySet,
5 time::ConvertTicks,
6};
7use anyhow::{Context, Result};
8use micromegas_telemetry::{blob_storage::BlobStorage, types::block::BlockMetadata};
9use micromegas_tracing::prelude::*;
10use micromegas_transit::value::{Object, Value};
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub struct LogEntry {
16 pub process: Arc<ProcessMetadata>,
17 pub stream_id: Arc<String>,
18 pub block_id: Arc<String>,
19 pub insert_time: i64,
20 pub time: i64,
21 pub level: i32,
22 pub target: Arc<String>,
23 pub msg: Arc<String>,
24 pub properties: PropertySet,
25}
26
27#[span_fn]
29pub fn log_entry_from_value(
30 convert_ticks: &ConvertTicks,
31 process: Arc<ProcessMetadata>,
32 stream_id: Arc<String>,
33 block_id: Arc<String>,
34 block_insert_time_ns: i64,
35 val: &Value,
36) -> Result<Option<LogEntry>> {
37 if let Value::Object(obj) = val {
38 match obj.type_name.as_str() {
39 "LogStaticStrEvent" => {
40 let ticks = obj
41 .get::<i64>("time")
42 .with_context(|| "reading time from LogStaticStrEvent")?;
43 let desc = obj
44 .get::<Arc<Object>>("desc")
45 .with_context(|| "reading desc from LogStaticStrEvent")?;
46 let level = desc
47 .get::<u32>("level")
48 .with_context(|| "reading level from LogStaticStrEvent")?;
49 let target = desc
50 .get::<Arc<String>>("target")
51 .with_context(|| "reading target from LogStaticStrEvent")?;
52 let msg = desc
53 .get::<Arc<String>>("fmt_str")
54 .with_context(|| "reading fmt_str from LogStaticStrEvent")?;
55 Ok(Some(LogEntry {
56 process,
57 stream_id,
58 block_id,
59 insert_time: block_insert_time_ns,
60 time: convert_ticks.ticks_to_nanoseconds(ticks),
61 level: level as i32,
62 target,
63 msg,
64 properties: PropertySet::empty(),
65 }))
66 }
67 "LogStringEvent" | "LogStringEventV2" => {
68 let ticks = obj
69 .get::<i64>("time")
70 .with_context(|| "reading time from LogStringEvent")?;
71 let desc = obj
72 .get::<Arc<Object>>("desc")
73 .with_context(|| "reading desc from LogStringEvent")?;
74 let level = desc
75 .get::<u32>("level")
76 .with_context(|| "reading level from LogStringEvent")?;
77 let target = desc
78 .get::<Arc<String>>("target")
79 .with_context(|| "reading target from LogStringEvent")?;
80 let msg = obj
81 .get::<Arc<String>>("msg")
82 .with_context(|| "reading msg from LogStringEvent")?;
83 Ok(Some(LogEntry {
84 process,
85 stream_id,
86 block_id,
87 insert_time: block_insert_time_ns,
88 time: convert_ticks.ticks_to_nanoseconds(ticks),
89 level: level as i32,
90 target,
91 msg,
92 properties: PropertySet::empty(),
93 }))
94 }
95 "LogStaticStrInteropEvent" | "LogStringInteropEventV2" | "LogStringInteropEventV3" => {
96 let ticks = obj
97 .get::<i64>("time")
98 .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
99 let level = obj
100 .get::<u32>("level")
101 .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
102 let target = obj
103 .get::<Arc<String>>("target")
104 .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
105 let msg = obj
106 .get::<Arc<String>>("msg")
107 .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
108 Ok(Some(LogEntry {
109 process,
110 stream_id,
111 block_id,
112 insert_time: block_insert_time_ns,
113 time: convert_ticks.ticks_to_nanoseconds(ticks),
114 level: level as i32,
115 target,
116 msg,
117 properties: PropertySet::empty(),
118 }))
119 }
120 "TaggedLogInteropEvent" => {
121 let ticks = obj
122 .get::<i64>("time")
123 .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
124 let level = obj
125 .get::<u32>("level")
126 .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
127 let target = obj
128 .get::<Arc<String>>("target")
129 .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
130 let msg = obj
131 .get::<Arc<String>>("msg")
132 .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
133 let properties = obj.get::<Arc<Object>>("properties").with_context(|| {
134 format!("reading properties from {}", obj.type_name.as_str())
135 })?;
136 let time = convert_ticks.ticks_to_nanoseconds(ticks);
137 Ok(Some(LogEntry {
138 process,
139 stream_id,
140 block_id,
141 insert_time: block_insert_time_ns,
142 time,
143 level: level as i32,
144 target,
145 msg,
146 properties: properties.into(),
147 }))
148 }
149 "TaggedLogString" => {
150 let ticks = obj
151 .get::<i64>("time")
152 .with_context(|| format!("reading time from {}", obj.type_name.as_str()))?;
153 let msg = obj
154 .get::<Arc<String>>("msg")
155 .with_context(|| format!("reading msg from {}", obj.type_name.as_str()))?;
156 let desc = obj
157 .get::<Arc<Object>>("desc")
158 .with_context(|| format!("reading desc from {}", obj.type_name.as_str()))?;
159 let mut level = desc
160 .get::<u32>("level")
161 .with_context(|| format!("reading level from {}", obj.type_name.as_str()))?;
162 let mut target = desc
163 .get::<Arc<String>>("target")
164 .with_context(|| format!("reading target from {}", obj.type_name.as_str()))?;
165 let properties = obj.get::<Arc<Object>>("properties").with_context(|| {
166 format!("reading properties from {}", obj.type_name.as_str())
167 })?;
168 for (prop_name, prop_value) in &properties.members {
169 match (prop_name.as_str(), prop_value) {
170 ("target", Value::String(value_str)) => {
171 target = value_str.clone();
172 }
173 ("level", Value::String(level_str)) => {
174 level = Level::parse(level_str).with_context(|| "parsing log level")?
175 as u32;
176 }
177 (&_, _) => {}
178 }
179 }
180 Ok(Some(LogEntry {
181 process,
182 stream_id,
183 block_id,
184 insert_time: block_insert_time_ns,
185 time: convert_ticks.ticks_to_nanoseconds(ticks),
186 level: level as i32,
187 target,
188 msg,
189 properties: properties.into(),
190 }))
191 }
192
193 _ => {
194 warn!("unknown log event {:?}", obj);
195 Ok(None)
196 }
197 }
198 } else {
199 Ok(None)
200 }
201}
202
203#[span_fn]
205pub async fn for_each_log_entry_in_block<Predicate: FnMut(LogEntry) -> Result<bool>>(
206 blob_storage: Arc<BlobStorage>,
207 convert_ticks: &ConvertTicks,
208 process: Arc<ProcessMetadata>,
209 stream: &StreamMetadata,
210 block: &BlockMetadata,
211 mut fun: Predicate,
212) -> Result<bool> {
213 let payload = fetch_block_payload(
214 blob_storage,
215 stream.process_id,
216 stream.stream_id,
217 block.block_id,
218 )
219 .await?;
220 let stream_id = Arc::new(stream.stream_id.to_string());
221 let block_id = Arc::new(block.block_id.to_string());
222 let block_insert_time_ns = block.insert_time.timestamp_nanos_opt().unwrap_or_default();
223 let continue_iterating = parse_block(stream, &payload, |val| {
224 if let Some(log_entry) = log_entry_from_value(
225 convert_ticks,
226 process.clone(),
227 stream_id.clone(),
228 block_id.clone(),
229 block_insert_time_ns,
230 &val,
231 )
232 .with_context(|| "log_entry_from_value")?
233 && !fun(log_entry)?
234 {
235 return Ok(false); }
237 Ok(true) })
239 .with_context(|| format!("parse_block {}", block.block_id))?;
240 Ok(continue_iterating)
241}