micromegas_tracing/event/
in_memory_sink.rs

1use super::{EventSink, StreamDesc, TracingBlock};
2use crate::{
3    logs::{LogBlock, LogMetadata, LogStream},
4    metrics::{MetricsBlock, MetricsStream},
5    prelude::*,
6    property_set::Property,
7    spans::{ThreadBlock, ThreadStream},
8};
9use std::{
10    fmt,
11    sync::{Arc, Mutex},
12};
13
14pub struct MemSinkState {
15    pub process_info: Option<Arc<ProcessInfo>>,
16    pub log_stream_desc: Option<Arc<StreamDesc>>,
17    pub metrics_stream_desc: Option<Arc<StreamDesc>>,
18    pub thread_stream_descs: Vec<Arc<StreamDesc>>,
19    pub thread_blocks: Vec<Arc<ThreadBlock>>,
20    pub log_blocks: Vec<Arc<LogBlock>>,
21    pub metrics_blocks: Vec<Arc<MetricsBlock>>,
22}
23
24/// for tests where we want to inspect the collected data
25pub struct InMemorySink {
26    pub state: Mutex<MemSinkState>,
27}
28
29impl InMemorySink {
30    pub fn new() -> Self {
31        let state = MemSinkState {
32            process_info: None,
33            log_stream_desc: None,
34            metrics_stream_desc: None,
35            thread_stream_descs: vec![],
36            thread_blocks: vec![],
37            log_blocks: vec![],
38            metrics_blocks: vec![],
39        };
40        Self {
41            state: Mutex::new(state),
42        }
43    }
44}
45
46impl Default for InMemorySink {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl EventSink for InMemorySink {
53    fn on_startup(&self, process_info: Arc<ProcessInfo>) {
54        self.state.lock().unwrap().process_info = Some(process_info);
55    }
56
57    fn on_shutdown(&self) {}
58
59    fn on_log_enabled(&self, _metadata: &LogMetadata) -> bool {
60        true // Enable all log events for testing
61    }
62
63    fn on_log(
64        &self,
65        _desc: &LogMetadata,
66        _properties: &[Property],
67        _time: i64,
68        _args: fmt::Arguments<'_>,
69    ) {
70        // For testing, we primarily collect events through blocks
71        // Individual log events are handled via on_process_log_block
72    }
73
74    fn on_init_log_stream(&self, log_stream: &LogStream) {
75        self.state.lock().unwrap().log_stream_desc = Some(log_stream.desc());
76    }
77
78    fn on_process_log_block(&self, log_block: Arc<LogBlock>) {
79        self.state.lock().unwrap().log_blocks.push(log_block);
80    }
81
82    fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
83        self.state.lock().unwrap().metrics_stream_desc = Some(metrics_stream.desc());
84    }
85
86    fn on_process_metrics_block(&self, metrics_block: Arc<MetricsBlock>) {
87        self.state
88            .lock()
89            .unwrap()
90            .metrics_blocks
91            .push(metrics_block);
92    }
93
94    fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
95        self.state
96            .lock()
97            .unwrap()
98            .thread_stream_descs
99            .push(thread_stream.desc());
100    }
101
102    fn on_process_thread_block(&self, thread_block: Arc<ThreadBlock>) {
103        self.state.lock().unwrap().thread_blocks.push(thread_block);
104    }
105
106    fn is_busy(&self) -> bool {
107        false // For testing, never report as busy
108    }
109}
110
111impl InMemorySink {
112    /// Get the total number of thread blocks collected
113    pub fn thread_block_count(&self) -> usize {
114        self.state.lock().unwrap().thread_blocks.len()
115    }
116
117    /// Get the total number of log blocks collected
118    pub fn log_block_count(&self) -> usize {
119        self.state.lock().unwrap().log_blocks.len()
120    }
121
122    /// Get the total number of metrics blocks collected
123    pub fn metrics_block_count(&self) -> usize {
124        self.state.lock().unwrap().metrics_blocks.len()
125    }
126
127    /// Get the total number of events across all thread blocks
128    pub fn total_thread_events(&self) -> usize {
129        self.state
130            .lock()
131            .unwrap()
132            .thread_blocks
133            .iter()
134            .map(|block| block.nb_objects())
135            .sum()
136    }
137
138    /// Get the total number of events across all log blocks
139    pub fn total_log_events(&self) -> usize {
140        self.state
141            .lock()
142            .unwrap()
143            .log_blocks
144            .iter()
145            .map(|block| block.nb_objects())
146            .sum()
147    }
148
149    /// Get the total number of events across all metrics blocks
150    pub fn total_metrics_events(&self) -> usize {
151        self.state
152            .lock()
153            .unwrap()
154            .metrics_blocks
155            .iter()
156            .map(|block| block.nb_objects())
157            .sum()
158    }
159}