micromegas_telemetry_sink/
stream_block.rs

1use anyhow::Result;
2use micromegas_telemetry::{block_wire_format, compression::compress, wire_format::encode_cbor};
3use micromegas_tracing::{
4    event::{EventBlock, ExtractDeps, TracingBlock},
5    logs::LogBlock,
6    metrics::MetricsBlock,
7    prelude::*,
8    spans::ThreadBlock,
9};
10use micromegas_transit::HeterogeneousQueue;
11
12pub trait StreamBlock {
13    /// Encodes the stream block into a binary format.
14    ///
15    /// This function serializes the block data, compresses it, and then encodes it
16    /// into the wire format for transmission.
17    ///
18    /// # Arguments
19    ///
20    /// * `process_info` - Information about the current process, used for time calibration.
21    fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>>;
22}
23
24fn encode_block<Q>(block: &EventBlock<Q>, process_info: &ProcessInfo) -> Result<Vec<u8>>
25where
26    Q: HeterogeneousQueue + ExtractDeps,
27    <Q as ExtractDeps>::DepsQueue: HeterogeneousQueue,
28{
29    let block_id = uuid::Uuid::new_v4();
30    trace!("encoding block_id={block_id}");
31    let end = block.end.as_ref().unwrap();
32
33    let payload = block_wire_format::BlockPayload {
34        dependencies: compress(block.events.extract().as_bytes())?,
35        objects: compress(block.events.as_bytes())?,
36    };
37
38    let block = block_wire_format::Block {
39        block_id,
40        stream_id: block.stream_id,
41        process_id: block.process_id,
42        begin_time: block
43            .begin
44            .time
45            .to_rfc3339_opts(chrono::SecondsFormat::Nanos, false),
46        begin_ticks: block.begin.ticks - process_info.start_ticks,
47        end_time: end
48            .time
49            .to_rfc3339_opts(chrono::SecondsFormat::Nanos, false),
50        end_ticks: end.ticks - process_info.start_ticks,
51        payload,
52        nb_objects: block.nb_objects() as i32,
53        object_offset: block.object_offset() as i64,
54    };
55    encode_cbor(&block)
56}
57
58impl StreamBlock for LogBlock {
59    fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
60        encode_block(self, process_info)
61    }
62}
63
64impl StreamBlock for MetricsBlock {
65    fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
66        encode_block(self, process_info)
67    }
68}
69
70impl StreamBlock for ThreadBlock {
71    fn encode_bin(&self, process_info: &ProcessInfo) -> Result<Vec<u8>> {
72        encode_block(self, process_info)
73    }
74}