micromegas_analytics/
net_block_processing.rs

1use crate::metadata::StreamMetadata;
2use crate::payload::{fetch_block_payload, parse_block};
3use anyhow::{Context, Result};
4use micromegas_telemetry::blob_storage::BlobStorage;
5use micromegas_telemetry::block_wire_format::BlockPayload;
6use micromegas_tracing::prelude::*;
7use micromegas_transit::value::{Object, Value};
8use std::sync::Arc;
9
10/// A trait for processing network tracing event blocks.
11///
12/// Implementors receive one callback per decoded net event. Returning `Ok(true)`
13/// continues iteration; returning `Ok(false)` stops parsing the current block.
14pub trait NetBlockProcessor {
15    fn on_connection_begin(
16        &mut self,
17        event_id: i64,
18        time: i64,
19        connection_name: Arc<String>,
20        is_outgoing: bool,
21    ) -> Result<bool>;
22
23    fn on_connection_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
24
25    fn on_object_begin(
26        &mut self,
27        event_id: i64,
28        time: i64,
29        object_name: Arc<String>,
30    ) -> Result<bool>;
31
32    fn on_object_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
33
34    fn on_property(
35        &mut self,
36        event_id: i64,
37        time: i64,
38        property_name: Arc<String>,
39        bit_size: i64,
40    ) -> Result<bool>;
41
42    fn on_rpc_begin(
43        &mut self,
44        event_id: i64,
45        time: i64,
46        function_name: Arc<String>,
47    ) -> Result<bool>;
48
49    fn on_rpc_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
50}
51
52fn read_time(obj: &Object) -> Result<i64> {
53    obj.get::<i64>("time")
54}
55
56fn read_bit_size(obj: &Object) -> Result<i64> {
57    Ok(obj.get::<u32>("bit_size")? as i64)
58}
59
60/// Parses a net event block payload and calls the appropriate processor callback for each event.
61#[span_fn]
62pub fn parse_net_block_payload<Proc: NetBlockProcessor>(
63    object_offset: i64,
64    payload: &BlockPayload,
65    stream: &StreamMetadata,
66    processor: &mut Proc,
67) -> Result<bool> {
68    let mut event_id = object_offset;
69    parse_block(stream, payload, |val| {
70        let res = if let Value::Object(obj) = val {
71            match obj.type_name.as_str() {
72                "NetConnectionBeginEvent" => {
73                    let time = read_time(&obj).with_context(|| "NetConnectionBeginEvent.time")?;
74                    let connection_name = obj
75                        .get::<Arc<String>>("connection_name")
76                        .with_context(|| "NetConnectionBeginEvent.connection_name")?;
77                    let is_outgoing = obj
78                        .get::<u8>("is_outgoing")
79                        .with_context(|| "NetConnectionBeginEvent.is_outgoing")?
80                        != 0;
81                    processor.on_connection_begin(event_id, time, connection_name, is_outgoing)
82                }
83                "NetConnectionEndEvent" => {
84                    let time = read_time(&obj).with_context(|| "NetConnectionEndEvent.time")?;
85                    let bit_size =
86                        read_bit_size(&obj).with_context(|| "NetConnectionEndEvent.bit_size")?;
87                    processor.on_connection_end(event_id, time, bit_size)
88                }
89                "NetObjectBeginEvent" => {
90                    let time = read_time(&obj).with_context(|| "NetObjectBeginEvent.time")?;
91                    let object_name = obj
92                        .get::<Arc<String>>("object_name")
93                        .with_context(|| "NetObjectBeginEvent.object_name")?;
94                    processor.on_object_begin(event_id, time, object_name)
95                }
96                "NetObjectEndEvent" => {
97                    let time = read_time(&obj).with_context(|| "NetObjectEndEvent.time")?;
98                    let bit_size =
99                        read_bit_size(&obj).with_context(|| "NetObjectEndEvent.bit_size")?;
100                    processor.on_object_end(event_id, time, bit_size)
101                }
102                "NetPropertyEvent" => {
103                    let time = read_time(&obj).with_context(|| "NetPropertyEvent.time")?;
104                    let property_name = obj
105                        .get::<Arc<String>>("property_name")
106                        .with_context(|| "NetPropertyEvent.property_name")?;
107                    let bit_size =
108                        read_bit_size(&obj).with_context(|| "NetPropertyEvent.bit_size")?;
109                    processor.on_property(event_id, time, property_name, bit_size)
110                }
111                "NetRPCBeginEvent" => {
112                    let time = read_time(&obj).with_context(|| "NetRPCBeginEvent.time")?;
113                    let function_name = obj
114                        .get::<Arc<String>>("function_name")
115                        .with_context(|| "NetRPCBeginEvent.function_name")?;
116                    processor.on_rpc_begin(event_id, time, function_name)
117                }
118                "NetRPCEndEvent" => {
119                    let time = read_time(&obj).with_context(|| "NetRPCEndEvent.time")?;
120                    let bit_size =
121                        read_bit_size(&obj).with_context(|| "NetRPCEndEvent.bit_size")?;
122                    processor.on_rpc_end(event_id, time, bit_size)
123                }
124                event_type => {
125                    warn!("unknown event type in net block: {}", event_type);
126                    Ok(true)
127                }
128            }
129        } else {
130            Ok(true)
131        };
132        event_id += 1;
133        res
134    })
135}
136
137/// Fetches and parses a net event block.
138#[span_fn]
139pub async fn parse_net_block<Proc: NetBlockProcessor>(
140    blob_storage: Arc<BlobStorage>,
141    stream: &StreamMetadata,
142    block_id: sqlx::types::Uuid,
143    object_offset: i64,
144    processor: &mut Proc,
145) -> Result<bool> {
146    let payload =
147        fetch_block_payload(blob_storage, stream.process_id, stream.stream_id, block_id).await?;
148    parse_net_block_payload(object_offset, &payload, stream, processor)
149}