micromegas_analytics/
async_block_processing.rs

1use crate::{metadata::StreamMetadata, payload::parse_block, scope::ScopeDesc};
2use anyhow::{Context, Result};
3use micromegas_telemetry::block_wire_format::BlockPayload;
4use micromegas_tracing::prelude::*;
5use micromegas_transit::value::{Object, Value};
6use std::sync::Arc;
7
8/// Helper function to extract async event fields
9fn on_async_event<F>(obj: &Object, mut fun: F) -> Result<bool>
10where
11    F: FnMut(Arc<Object>, u64, u64, u32, i64) -> Result<bool>,
12{
13    let span_id = obj.get::<u64>("span_id")?;
14    let parent_span_id = obj.get::<u64>("parent_span_id")?;
15    let depth = obj.get::<u32>("depth")?;
16    let time = obj.get::<i64>("time")?;
17    let span_desc = obj.get::<Arc<Object>>("span_desc")?;
18    fun(span_desc, span_id, parent_span_id, depth, time)
19}
20
21/// Helper function to extract async named event fields
22fn on_async_named_event<F>(obj: &Object, mut fun: F) -> Result<bool>
23where
24    F: FnMut(Arc<Object>, Arc<String>, u64, u64, u32, i64) -> Result<bool>,
25{
26    let span_id = obj.get::<u64>("span_id")?;
27    let parent_span_id = obj.get::<u64>("parent_span_id")?;
28    let depth = obj.get::<u32>("depth")?;
29    let time = obj.get::<i64>("time")?;
30    let span_location = obj.get::<Arc<Object>>("span_location")?;
31    let name = obj.get::<Arc<String>>("name")?;
32    fun(span_location, name, span_id, parent_span_id, depth, time)
33}
34
35/// Trait for processing async event blocks.
36pub trait AsyncBlockProcessor {
37    fn on_begin_async_scope(
38        &mut self,
39        block_id: &str,
40        scope: ScopeDesc,
41        ts: i64,
42        span_id: i64,
43        parent_span_id: i64,
44        depth: u32,
45    ) -> Result<bool>;
46    fn on_end_async_scope(
47        &mut self,
48        block_id: &str,
49        scope: ScopeDesc,
50        ts: i64,
51        span_id: i64,
52        parent_span_id: i64,
53        depth: u32,
54    ) -> Result<bool>;
55}
56
57/// Parses async span events from a thread event block payload.
58#[span_fn]
59pub fn parse_async_block_payload<Proc: AsyncBlockProcessor>(
60    block_id: &str,
61    _object_offset: i64,
62    payload: &BlockPayload,
63    stream: &StreamMetadata,
64    processor: &mut Proc,
65) -> Result<bool> {
66    parse_block(stream, payload, |val| {
67        if let Value::Object(obj) = val {
68            match obj.type_name.as_str() {
69                "BeginAsyncSpanEvent" => {
70                    on_async_event(&obj, |span_desc, span_id, parent_span_id, depth, ts| {
71                        let name = span_desc.get::<Arc<String>>("name")?;
72                        let filename = span_desc.get::<Arc<String>>("file")?;
73                        let target = span_desc.get::<Arc<String>>("target")?;
74                        let line = span_desc.get::<u32>("line")?;
75                        let scope_desc = ScopeDesc::new(name, filename, target, line);
76                        processor.on_begin_async_scope(
77                            block_id,
78                            scope_desc,
79                            ts,
80                            span_id as i64,
81                            parent_span_id as i64,
82                            depth,
83                        )
84                    })
85                    .with_context(|| "reading BeginAsyncSpanEvent")
86                }
87                "EndAsyncSpanEvent" => {
88                    on_async_event(&obj, |span_desc, span_id, parent_span_id, depth, ts| {
89                        let name = span_desc.get::<Arc<String>>("name")?;
90                        let filename = span_desc.get::<Arc<String>>("file")?;
91                        let target = span_desc.get::<Arc<String>>("target")?;
92                        let line = span_desc.get::<u32>("line")?;
93                        let scope_desc = ScopeDesc::new(name, filename, target, line);
94                        processor.on_end_async_scope(
95                            block_id,
96                            scope_desc,
97                            ts,
98                            span_id as i64,
99                            parent_span_id as i64,
100                            depth,
101                        )
102                    })
103                    .with_context(|| "reading EndAsyncSpanEvent")
104                }
105                "BeginAsyncNamedSpanEvent" => on_async_named_event(
106                    &obj,
107                    |span_location, name, span_id, parent_span_id, depth, ts| {
108                        let filename = span_location.get::<Arc<String>>("file")?;
109                        let target = span_location.get::<Arc<String>>("target")?;
110                        let line = span_location.get::<u32>("line")?;
111                        let scope_desc = ScopeDesc::new(name, filename, target, line);
112                        processor.on_begin_async_scope(
113                            block_id,
114                            scope_desc,
115                            ts,
116                            span_id as i64,
117                            parent_span_id as i64,
118                            depth,
119                        )
120                    },
121                )
122                .with_context(|| "reading BeginAsyncNamedSpanEvent"),
123                "EndAsyncNamedSpanEvent" => on_async_named_event(
124                    &obj,
125                    |span_location, name, span_id, parent_span_id, depth, ts| {
126                        let filename = span_location.get::<Arc<String>>("file")?;
127                        let target = span_location.get::<Arc<String>>("target")?;
128                        let line = span_location.get::<u32>("line")?;
129                        let scope_desc = ScopeDesc::new(name, filename, target, line);
130                        processor.on_end_async_scope(
131                            block_id,
132                            scope_desc,
133                            ts,
134                            span_id as i64,
135                            parent_span_id as i64,
136                            depth,
137                        )
138                    },
139                )
140                .with_context(|| "reading EndAsyncNamedSpanEvent"),
141                _ => Ok(true),
142            }
143        } else {
144            Ok(true)
145        }
146    })
147}