micromegas_analytics/
thread_block_processor.rs1use crate::metadata::StreamMetadata;
2use crate::payload::{fetch_block_payload, parse_block};
3use crate::scope::ScopeDesc;
4use anyhow::{Context, Result};
5use micromegas_telemetry::blob_storage::BlobStorage;
6use micromegas_tracing::prelude::*;
7use micromegas_transit::value::{Object, Value};
8use std::sync::Arc;
9
10pub trait ThreadBlockProcessor {
12 fn on_begin_thread_scope(
14 &mut self,
15 block_id: &str,
16 event_id: i64,
17 scope: ScopeDesc,
18 ts: i64,
19 ) -> Result<bool>;
20 fn on_end_thread_scope(
21 &mut self,
22 block_id: &str,
23 event_id: i64,
24 scope: ScopeDesc,
25 ts: i64,
26 ) -> Result<bool>;
27}
28
29fn on_thread_event<F>(obj: &Object, mut fun: F) -> Result<bool>
30where
31 F: FnMut(Arc<Object>, i64) -> Result<bool>,
32{
33 let tick = obj.get::<i64>("time")?;
34 let scope = obj.get::<Arc<Object>>("thread_span_desc")?;
35 fun(scope, tick)
36}
37
38fn on_thread_named_event<F>(obj: &Object, mut fun: F) -> Result<bool>
39where
40 F: FnMut(Arc<Object>, Arc<String>, i64) -> Result<bool>,
41{
42 let tick = obj.get::<i64>("time")?;
43 let scope = obj.get::<Arc<Object>>("thread_span_location")?;
44 let name = obj.get::<Arc<String>>("name")?;
45 fun(scope, name, tick)
46}
47
48#[span_fn]
50pub fn parse_thread_block_payload<Proc: ThreadBlockProcessor>(
51 block_id: &str,
52 object_offset: i64,
53 payload: µmegas_telemetry::block_wire_format::BlockPayload,
54 stream: &StreamMetadata,
55 processor: &mut Proc,
56) -> Result<bool> {
57 let mut event_id = object_offset;
58 parse_block(stream, payload, |val| {
59 let res = if let Value::Object(obj) = val {
60 match obj.type_name.as_str() {
61 "BeginThreadSpanEvent" => on_thread_event(&obj, |scope, ts| {
62 let name = scope.get::<Arc<String>>("name")?;
63 let filename = scope.get::<Arc<String>>("file")?;
64 let target = scope.get::<Arc<String>>("target")?;
65 let line = scope.get::<u32>("line")?;
66 let scope_desc = ScopeDesc::new(name, filename, target, line);
67 processor.on_begin_thread_scope(block_id, event_id, scope_desc, ts)
68 })
69 .with_context(|| "reading BeginThreadSpanEvent"),
70 "EndThreadSpanEvent" => on_thread_event(&obj, |scope, ts| {
71 let name = scope.get::<Arc<String>>("name")?;
72 let filename = scope.get::<Arc<String>>("file")?;
73 let target = scope.get::<Arc<String>>("target")?;
74 let line = scope.get::<u32>("line")?;
75 let scope_desc = ScopeDesc::new(name, filename, target, line);
76 processor.on_end_thread_scope(block_id, event_id, scope_desc, ts)
77 })
78 .with_context(|| "reading EndThreadSpanEvent"),
79 "BeginThreadNamedSpanEvent" => on_thread_named_event(&obj, |scope, name, ts| {
80 let filename = scope.get::<Arc<String>>("file")?;
81 let target = scope.get::<Arc<String>>("target")?;
82 let line = scope.get::<u32>("line")?;
83 let scope_desc = ScopeDesc::new(name, filename, target, line);
84 processor.on_begin_thread_scope(block_id, event_id, scope_desc, ts)
85 })
86 .with_context(|| "reading BeginThreadNamedSpanEvent"),
87 "EndThreadNamedSpanEvent" => on_thread_named_event(&obj, |scope, name, ts| {
88 let filename = scope.get::<Arc<String>>("file")?;
89 let target = scope.get::<Arc<String>>("target")?;
90 let line = scope.get::<u32>("line")?;
91 let scope_desc = ScopeDesc::new(name, filename, target, line);
92 processor.on_end_thread_scope(block_id, event_id, scope_desc, ts)
93 })
94 .with_context(|| "reading EndThreadNamedSpanEvent"),
95 "BeginAsyncSpanEvent"
96 | "EndAsyncSpanEvent"
97 | "BeginAsyncNamedSpanEvent"
98 | "EndAsyncNamedSpanEvent" => {
99 Ok(true)
101 }
102 event_type => {
103 warn!("unknown event type {}", event_type);
104 Ok(true)
105 }
106 }
107 } else {
108 Ok(true) };
110 event_id += 1;
111 res
112 })
113}
114
115#[span_fn]
117pub async fn parse_thread_block<Proc: ThreadBlockProcessor>(
118 blob_storage: Arc<BlobStorage>,
119 stream: &StreamMetadata,
120 block_id: sqlx::types::Uuid,
121 object_offset: i64,
122 processor: &mut Proc,
123) -> Result<bool> {
124 let payload =
125 fetch_block_payload(blob_storage, stream.process_id, stream.stream_id, block_id).await?;
126 let block_id_str = block_id
127 .hyphenated()
128 .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer())
129 .to_owned();
130 info!(
131 "parse_thread_block stream_id={} block_id={block_id_str}",
132 stream.stream_id
133 );
134 parse_thread_block_payload(&block_id_str, object_offset, &payload, stream, processor)
135}