micromegas_analytics/
net_block_processing.rs1use crate::metadata::StreamMetadata;
2use crate::payload::{fetch_block_payload, parse_block};
3use anyhow::{Context, Result};
4use micromegas_telemetry::blob_storage::BlobStorage;
5use micromegas_telemetry::block_wire_format::BlockPayload;
6use micromegas_tracing::prelude::*;
7use micromegas_transit::value::{Object, Value};
8use std::sync::Arc;
9
10pub trait NetBlockProcessor {
15 fn on_connection_begin(
16 &mut self,
17 event_id: i64,
18 time: i64,
19 connection_name: Arc<String>,
20 is_outgoing: bool,
21 ) -> Result<bool>;
22
23 fn on_connection_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
24
25 fn on_object_begin(
26 &mut self,
27 event_id: i64,
28 time: i64,
29 object_name: Arc<String>,
30 ) -> Result<bool>;
31
32 fn on_object_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
33
34 fn on_property(
35 &mut self,
36 event_id: i64,
37 time: i64,
38 property_name: Arc<String>,
39 bit_size: i64,
40 ) -> Result<bool>;
41
42 fn on_rpc_begin(
43 &mut self,
44 event_id: i64,
45 time: i64,
46 function_name: Arc<String>,
47 ) -> Result<bool>;
48
49 fn on_rpc_end(&mut self, event_id: i64, time: i64, bit_size: i64) -> Result<bool>;
50}
51
52fn read_time(obj: &Object) -> Result<i64> {
53 obj.get::<i64>("time")
54}
55
56fn read_bit_size(obj: &Object) -> Result<i64> {
57 Ok(obj.get::<u32>("bit_size")? as i64)
58}
59
60#[span_fn]
62pub fn parse_net_block_payload<Proc: NetBlockProcessor>(
63 object_offset: i64,
64 payload: &BlockPayload,
65 stream: &StreamMetadata,
66 processor: &mut Proc,
67) -> Result<bool> {
68 let mut event_id = object_offset;
69 parse_block(stream, payload, |val| {
70 let res = if let Value::Object(obj) = val {
71 match obj.type_name.as_str() {
72 "NetConnectionBeginEvent" => {
73 let time = read_time(&obj).with_context(|| "NetConnectionBeginEvent.time")?;
74 let connection_name = obj
75 .get::<Arc<String>>("connection_name")
76 .with_context(|| "NetConnectionBeginEvent.connection_name")?;
77 let is_outgoing = obj
78 .get::<u8>("is_outgoing")
79 .with_context(|| "NetConnectionBeginEvent.is_outgoing")?
80 != 0;
81 processor.on_connection_begin(event_id, time, connection_name, is_outgoing)
82 }
83 "NetConnectionEndEvent" => {
84 let time = read_time(&obj).with_context(|| "NetConnectionEndEvent.time")?;
85 let bit_size =
86 read_bit_size(&obj).with_context(|| "NetConnectionEndEvent.bit_size")?;
87 processor.on_connection_end(event_id, time, bit_size)
88 }
89 "NetObjectBeginEvent" => {
90 let time = read_time(&obj).with_context(|| "NetObjectBeginEvent.time")?;
91 let object_name = obj
92 .get::<Arc<String>>("object_name")
93 .with_context(|| "NetObjectBeginEvent.object_name")?;
94 processor.on_object_begin(event_id, time, object_name)
95 }
96 "NetObjectEndEvent" => {
97 let time = read_time(&obj).with_context(|| "NetObjectEndEvent.time")?;
98 let bit_size =
99 read_bit_size(&obj).with_context(|| "NetObjectEndEvent.bit_size")?;
100 processor.on_object_end(event_id, time, bit_size)
101 }
102 "NetPropertyEvent" => {
103 let time = read_time(&obj).with_context(|| "NetPropertyEvent.time")?;
104 let property_name = obj
105 .get::<Arc<String>>("property_name")
106 .with_context(|| "NetPropertyEvent.property_name")?;
107 let bit_size =
108 read_bit_size(&obj).with_context(|| "NetPropertyEvent.bit_size")?;
109 processor.on_property(event_id, time, property_name, bit_size)
110 }
111 "NetRPCBeginEvent" => {
112 let time = read_time(&obj).with_context(|| "NetRPCBeginEvent.time")?;
113 let function_name = obj
114 .get::<Arc<String>>("function_name")
115 .with_context(|| "NetRPCBeginEvent.function_name")?;
116 processor.on_rpc_begin(event_id, time, function_name)
117 }
118 "NetRPCEndEvent" => {
119 let time = read_time(&obj).with_context(|| "NetRPCEndEvent.time")?;
120 let bit_size =
121 read_bit_size(&obj).with_context(|| "NetRPCEndEvent.bit_size")?;
122 processor.on_rpc_end(event_id, time, bit_size)
123 }
124 event_type => {
125 warn!("unknown event type in net block: {}", event_type);
126 Ok(true)
127 }
128 }
129 } else {
130 Ok(true)
131 };
132 event_id += 1;
133 res
134 })
135}
136
137#[span_fn]
139pub async fn parse_net_block<Proc: NetBlockProcessor>(
140 blob_storage: Arc<BlobStorage>,
141 stream: &StreamMetadata,
142 block_id: sqlx::types::Uuid,
143 object_offset: i64,
144 processor: &mut Proc,
145) -> Result<bool> {
146 let payload =
147 fetch_block_payload(blob_storage, stream.process_id, stream.stream_id, block_id).await?;
148 parse_net_block_payload(object_offset, &payload, stream, processor)
149}