micromegas_telemetry_sink/
stream_block.rs1use anyhow::Result;
2use micromegas_telemetry::{block_wire_format, compression::compress, wire_format::encode_cbor};
3use micromegas_tracing::{
4 event::{EventBlock, ExtractDeps, TracingBlock},
5 logs::LogBlock,
6 metrics::MetricsBlock,
7 prelude::*,
8 spans::ThreadBlock,
9};
10use micromegas_transit::HeterogeneousQueue;
11
12pub trait StreamBlock {
13 fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>>;
22}
23
24fn encode_block<Q>(block: &EventBlock<Q>, process_info: &ProcessInfo) -> Result<Vec<u8>>
25where
26 Q: HeterogeneousQueue + ExtractDeps,
27 <Q as ExtractDeps>::DepsQueue: HeterogeneousQueue,
28{
29 let block_id = uuid::Uuid::new_v4();
30 trace!("encoding block_id={block_id}");
31 let end = block.end.as_ref().unwrap();
32
33 let payload = block_wire_format::BlockPayload {
34 dependencies: compress(block.events.extract().as_bytes())?,
35 objects: compress(block.events.as_bytes())?,
36 };
37
38 let block = block_wire_format::Block {
39 block_id,
40 stream_id: block.stream_id,
41 process_id: block.process_id,
42 begin_time: block
43 .begin
44 .time
45 .to_rfc3339_opts(chrono::SecondsFormat::Nanos, false),
46 begin_ticks: block.begin.ticks - process_info.start_ticks,
47 end_time: end
48 .time
49 .to_rfc3339_opts(chrono::SecondsFormat::Nanos, false),
50 end_ticks: end.ticks - process_info.start_ticks,
51 payload,
52 nb_objects: block.nb_objects() as i32,
53 object_offset: block.object_offset() as i64,
54 };
55 encode_cbor(&block)
56}
57
58impl StreamBlock for LogBlock {
59 fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
60 encode_block(self, process_info)
61 }
62}
63
64impl StreamBlock for MetricsBlock {
65 fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
66 encode_block(self, process_info)
67 }
68}
69
70impl StreamBlock for ThreadBlock {
71 fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
72 encode_block(self, process_info)
73 }
74}