1use crate::{metadata::StreamMetadata, payload::parse_block, scope::ScopeDesc};
2use anyhow::{Context, Result};
3use micromegas_telemetry::block_wire_format::BlockPayload;
4use micromegas_tracing::prelude::*;
5use micromegas_transit::value::{Object, Value};
6use std::sync::Arc;
7
8fn on_async_event<F>(obj: &Object, mut fun: F) -> Result<bool>
10where
11 F: FnMut(Arc<Object>, u64, u64, u32, i64) -> Result<bool>,
12{
13 let span_id = obj.get::<u64>("span_id")?;
14 let parent_span_id = obj.get::<u64>("parent_span_id")?;
15 let depth = obj.get::<u32>("depth")?;
16 let time = obj.get::<i64>("time")?;
17 let span_desc = obj.get::<Arc<Object>>("span_desc")?;
18 fun(span_desc, span_id, parent_span_id, depth, time)
19}
20
21fn on_async_named_event<F>(obj: &Object, mut fun: F) -> Result<bool>
23where
24 F: FnMut(Arc<Object>, Arc<String>, u64, u64, u32, i64) -> Result<bool>,
25{
26 let span_id = obj.get::<u64>("span_id")?;
27 let parent_span_id = obj.get::<u64>("parent_span_id")?;
28 let depth = obj.get::<u32>("depth")?;
29 let time = obj.get::<i64>("time")?;
30 let span_location = obj.get::<Arc<Object>>("span_location")?;
31 let name = obj.get::<Arc<String>>("name")?;
32 fun(span_location, name, span_id, parent_span_id, depth, time)
33}
34
35pub trait AsyncBlockProcessor {
37 fn on_begin_async_scope(
38 &mut self,
39 block_id: &str,
40 scope: ScopeDesc,
41 ts: i64,
42 span_id: i64,
43 parent_span_id: i64,
44 depth: u32,
45 ) -> Result<bool>;
46 fn on_end_async_scope(
47 &mut self,
48 block_id: &str,
49 scope: ScopeDesc,
50 ts: i64,
51 span_id: i64,
52 parent_span_id: i64,
53 depth: u32,
54 ) -> Result<bool>;
55}
56
57#[span_fn]
59pub fn parse_async_block_payload<Proc: AsyncBlockProcessor>(
60 block_id: &str,
61 _object_offset: i64,
62 payload: &BlockPayload,
63 stream: &StreamMetadata,
64 processor: &mut Proc,
65) -> Result<bool> {
66 parse_block(stream, payload, |val| {
67 if let Value::Object(obj) = val {
68 match obj.type_name.as_str() {
69 "BeginAsyncSpanEvent" => {
70 on_async_event(&obj, |span_desc, span_id, parent_span_id, depth, ts| {
71 let name = span_desc.get::<Arc<String>>("name")?;
72 let filename = span_desc.get::<Arc<String>>("file")?;
73 let target = span_desc.get::<Arc<String>>("target")?;
74 let line = span_desc.get::<u32>("line")?;
75 let scope_desc = ScopeDesc::new(name, filename, target, line);
76 processor.on_begin_async_scope(
77 block_id,
78 scope_desc,
79 ts,
80 span_id as i64,
81 parent_span_id as i64,
82 depth,
83 )
84 })
85 .with_context(|| "reading BeginAsyncSpanEvent")
86 }
87 "EndAsyncSpanEvent" => {
88 on_async_event(&obj, |span_desc, span_id, parent_span_id, depth, ts| {
89 let name = span_desc.get::<Arc<String>>("name")?;
90 let filename = span_desc.get::<Arc<String>>("file")?;
91 let target = span_desc.get::<Arc<String>>("target")?;
92 let line = span_desc.get::<u32>("line")?;
93 let scope_desc = ScopeDesc::new(name, filename, target, line);
94 processor.on_end_async_scope(
95 block_id,
96 scope_desc,
97 ts,
98 span_id as i64,
99 parent_span_id as i64,
100 depth,
101 )
102 })
103 .with_context(|| "reading EndAsyncSpanEvent")
104 }
105 "BeginAsyncNamedSpanEvent" => on_async_named_event(
106 &obj,
107 |span_location, name, span_id, parent_span_id, depth, ts| {
108 let filename = span_location.get::<Arc<String>>("file")?;
109 let target = span_location.get::<Arc<String>>("target")?;
110 let line = span_location.get::<u32>("line")?;
111 let scope_desc = ScopeDesc::new(name, filename, target, line);
112 processor.on_begin_async_scope(
113 block_id,
114 scope_desc,
115 ts,
116 span_id as i64,
117 parent_span_id as i64,
118 depth,
119 )
120 },
121 )
122 .with_context(|| "reading BeginAsyncNamedSpanEvent"),
123 "EndAsyncNamedSpanEvent" => on_async_named_event(
124 &obj,
125 |span_location, name, span_id, parent_span_id, depth, ts| {
126 let filename = span_location.get::<Arc<String>>("file")?;
127 let target = span_location.get::<Arc<String>>("target")?;
128 let line = span_location.get::<u32>("line")?;
129 let scope_desc = ScopeDesc::new(name, filename, target, line);
130 processor.on_end_async_scope(
131 block_id,
132 scope_desc,
133 ts,
134 span_id as i64,
135 parent_span_id as i64,
136 depth,
137 )
138 },
139 )
140 .with_context(|| "reading EndAsyncNamedSpanEvent"),
141 _ => Ok(true),
142 }
143 } else {
144 Ok(true)
145 }
146 })
147}