micromegas_tracing/event/
in_memory_sink.rs1use super::{EventSink, StreamDesc, TracingBlock};
2use crate::{
3 logs::{LogBlock, LogMetadata, LogStream},
4 metrics::{MetricsBlock, MetricsStream},
5 prelude::*,
6 property_set::Property,
7 spans::{ThreadBlock, ThreadStream},
8};
9use std::{
10 fmt,
11 sync::{Arc, Mutex},
12};
13
14pub struct MemSinkState {
15 pub process_info: Option<Arc<ProcessInfo>>,
16 pub log_stream_desc: Option<Arc<StreamDesc>>,
17 pub metrics_stream_desc: Option<Arc<StreamDesc>>,
18 pub thread_stream_descs: Vec<Arc<StreamDesc>>,
19 pub thread_blocks: Vec<Arc<ThreadBlock>>,
20 pub log_blocks: Vec<Arc<LogBlock>>,
21 pub metrics_blocks: Vec<Arc<MetricsBlock>>,
22}
23
24pub struct InMemorySink {
26 pub state: Mutex<MemSinkState>,
27}
28
29impl InMemorySink {
30 pub fn new() -> Self {
31 let state = MemSinkState {
32 process_info: None,
33 log_stream_desc: None,
34 metrics_stream_desc: None,
35 thread_stream_descs: vec![],
36 thread_blocks: vec![],
37 log_blocks: vec![],
38 metrics_blocks: vec![],
39 };
40 Self {
41 state: Mutex::new(state),
42 }
43 }
44}
45
46impl Default for InMemorySink {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl EventSink for InMemorySink {
53 fn on_startup(&self, process_info: Arc<ProcessInfo>) {
54 self.state.lock().unwrap().process_info = Some(process_info);
55 }
56
57 fn on_shutdown(&self) {}
58
59 fn on_log_enabled(&self, _metadata: &LogMetadata) -> bool {
60 true }
62
63 fn on_log(
64 &self,
65 _desc: &LogMetadata,
66 _properties: &[Property],
67 _time: i64,
68 _args: fmt::Arguments<'_>,
69 ) {
70 }
73
74 fn on_init_log_stream(&self, log_stream: &LogStream) {
75 self.state.lock().unwrap().log_stream_desc = Some(log_stream.desc());
76 }
77
78 fn on_process_log_block(&self, log_block: Arc<LogBlock>) {
79 self.state.lock().unwrap().log_blocks.push(log_block);
80 }
81
82 fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
83 self.state.lock().unwrap().metrics_stream_desc = Some(metrics_stream.desc());
84 }
85
86 fn on_process_metrics_block(&self, metrics_block: Arc<MetricsBlock>) {
87 self.state
88 .lock()
89 .unwrap()
90 .metrics_blocks
91 .push(metrics_block);
92 }
93
94 fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
95 self.state
96 .lock()
97 .unwrap()
98 .thread_stream_descs
99 .push(thread_stream.desc());
100 }
101
102 fn on_process_thread_block(&self, thread_block: Arc<ThreadBlock>) {
103 self.state.lock().unwrap().thread_blocks.push(thread_block);
104 }
105
106 fn is_busy(&self) -> bool {
107 false }
109}
110
111impl InMemorySink {
112 pub fn thread_block_count(&self) -> usize {
114 self.state.lock().unwrap().thread_blocks.len()
115 }
116
117 pub fn log_block_count(&self) -> usize {
119 self.state.lock().unwrap().log_blocks.len()
120 }
121
122 pub fn metrics_block_count(&self) -> usize {
124 self.state.lock().unwrap().metrics_blocks.len()
125 }
126
127 pub fn total_thread_events(&self) -> usize {
129 self.state
130 .lock()
131 .unwrap()
132 .thread_blocks
133 .iter()
134 .map(|block| block.nb_objects())
135 .sum()
136 }
137
138 pub fn total_log_events(&self) -> usize {
140 self.state
141 .lock()
142 .unwrap()
143 .log_blocks
144 .iter()
145 .map(|block| block.nb_objects())
146 .sum()
147 }
148
149 pub fn total_metrics_events(&self) -> usize {
151 self.state
152 .lock()
153 .unwrap()
154 .metrics_blocks
155 .iter()
156 .map(|block| block.nb_objects())
157 .sum()
158 }
159}