micromegas_analytics/
thread_block_processor.rs

1use crate::metadata::StreamMetadata;
2use crate::payload::{fetch_block_payload, parse_block};
3use crate::scope::ScopeDesc;
4use anyhow::{Context, Result};
5use micromegas_telemetry::blob_storage::BlobStorage;
6use micromegas_tracing::prelude::*;
7use micromegas_transit::value::{Object, Value};
8use std::sync::Arc;
9
10/// A trait for processing thread event blocks.
11pub trait ThreadBlockProcessor {
12    // return true to continue
13    fn on_begin_thread_scope(
14        &mut self,
15        block_id: &str,
16        event_id: i64,
17        scope: ScopeDesc,
18        ts: i64,
19    ) -> Result<bool>;
20    fn on_end_thread_scope(
21        &mut self,
22        block_id: &str,
23        event_id: i64,
24        scope: ScopeDesc,
25        ts: i64,
26    ) -> Result<bool>;
27}
28
29fn on_thread_event<F>(obj: &Object, mut fun: F) -> Result<bool>
30where
31    F: FnMut(Arc<Object>, i64) -> Result<bool>,
32{
33    let tick = obj.get::<i64>("time")?;
34    let scope = obj.get::<Arc<Object>>("thread_span_desc")?;
35    fun(scope, tick)
36}
37
38fn on_thread_named_event<F>(obj: &Object, mut fun: F) -> Result<bool>
39where
40    F: FnMut(Arc<Object>, Arc<String>, i64) -> Result<bool>,
41{
42    let tick = obj.get::<i64>("time")?;
43    let scope = obj.get::<Arc<Object>>("thread_span_location")?;
44    let name = obj.get::<Arc<String>>("name")?;
45    fun(scope, name, tick)
46}
47
48/// Parses a thread event block from a payload.
49#[span_fn]
50pub fn parse_thread_block_payload<Proc: ThreadBlockProcessor>(
51    block_id: &str,
52    object_offset: i64,
53    payload: &micromegas_telemetry::block_wire_format::BlockPayload,
54    stream: &StreamMetadata,
55    processor: &mut Proc,
56) -> Result<bool> {
57    let mut event_id = object_offset;
58    parse_block(stream, payload, |val| {
59        let res = if let Value::Object(obj) = val {
60            match obj.type_name.as_str() {
61                "BeginThreadSpanEvent" => on_thread_event(&obj, |scope, ts| {
62                    let name = scope.get::<Arc<String>>("name")?;
63                    let filename = scope.get::<Arc<String>>("file")?;
64                    let target = scope.get::<Arc<String>>("target")?;
65                    let line = scope.get::<u32>("line")?;
66                    let scope_desc = ScopeDesc::new(name, filename, target, line);
67                    processor.on_begin_thread_scope(block_id, event_id, scope_desc, ts)
68                })
69                .with_context(|| "reading BeginThreadSpanEvent"),
70                "EndThreadSpanEvent" => on_thread_event(&obj, |scope, ts| {
71                    let name = scope.get::<Arc<String>>("name")?;
72                    let filename = scope.get::<Arc<String>>("file")?;
73                    let target = scope.get::<Arc<String>>("target")?;
74                    let line = scope.get::<u32>("line")?;
75                    let scope_desc = ScopeDesc::new(name, filename, target, line);
76                    processor.on_end_thread_scope(block_id, event_id, scope_desc, ts)
77                })
78                .with_context(|| "reading EndThreadSpanEvent"),
79                "BeginThreadNamedSpanEvent" => on_thread_named_event(&obj, |scope, name, ts| {
80                    let filename = scope.get::<Arc<String>>("file")?;
81                    let target = scope.get::<Arc<String>>("target")?;
82                    let line = scope.get::<u32>("line")?;
83                    let scope_desc = ScopeDesc::new(name, filename, target, line);
84                    processor.on_begin_thread_scope(block_id, event_id, scope_desc, ts)
85                })
86                .with_context(|| "reading BeginThreadNamedSpanEvent"),
87                "EndThreadNamedSpanEvent" => on_thread_named_event(&obj, |scope, name, ts| {
88                    let filename = scope.get::<Arc<String>>("file")?;
89                    let target = scope.get::<Arc<String>>("target")?;
90                    let line = scope.get::<u32>("line")?;
91                    let scope_desc = ScopeDesc::new(name, filename, target, line);
92                    processor.on_end_thread_scope(block_id, event_id, scope_desc, ts)
93                })
94                .with_context(|| "reading EndThreadNamedSpanEvent"),
95                "BeginAsyncSpanEvent"
96                | "EndAsyncSpanEvent"
97                | "BeginAsyncNamedSpanEvent"
98                | "EndAsyncNamedSpanEvent" => {
99                    // Ignore async span events as they are not relevant for thread spans view
100                    Ok(true)
101                }
102                event_type => {
103                    warn!("unknown event type {}", event_type);
104                    Ok(true)
105                }
106            }
107        } else {
108            Ok(true) // continue
109        };
110        event_id += 1;
111        res
112    })
113}
114
115/// Parses a thread event block.
116#[span_fn]
117pub async fn parse_thread_block<Proc: ThreadBlockProcessor>(
118    blob_storage: Arc<BlobStorage>,
119    stream: &StreamMetadata,
120    block_id: sqlx::types::Uuid,
121    object_offset: i64,
122    processor: &mut Proc,
123) -> Result<bool> {
124    let payload =
125        fetch_block_payload(blob_storage, stream.process_id, stream.stream_id, block_id).await?;
126    let block_id_str = block_id
127        .hyphenated()
128        .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer())
129        .to_owned();
130    info!(
131        "parse_thread_block stream_id={} block_id={block_id_str}",
132        stream.stream_id
133    );
134    parse_thread_block_payload(&block_id_str, object_offset, &payload, stream, processor)
135}