micromegas_telemetry_sink/
composite_event_sink.rs1use micromegas_tracing::{
2 event::{BoxedEventSink, EventSink},
3 logs::{LogBlock, LogMetadata, LogStream},
4 metrics::{MetricsBlock, MetricsStream},
5 prelude::*,
6 property_set::Property,
7 spans::{ThreadBlock, ThreadStream},
8};
9use std::{fmt, sync::Arc};
10
11pub struct CompositeSink {
16 sinks: Vec<(LevelFilter, BoxedEventSink)>,
17 target_level_filters: Vec<(String, LevelFilter)>,
18}
19
20impl CompositeSink {
21 pub fn new(
23 sinks: Vec<(LevelFilter, BoxedEventSink)>,
24 target_max_level: Vec<(String, LevelFilter)>,
25 max_level_override: Option<LevelFilter>,
26 ) -> Self {
27 if let Some(max_level) = max_level_override {
28 micromegas_tracing::levels::set_max_level(max_level);
29 } else {
30 let mut max_level = LevelFilter::Off;
31 for (_, level_filter) in &target_max_level {
32 max_level = max_level.max(*level_filter);
33 }
34 for (level_filter, _) in &sinks {
35 max_level = max_level.max(*level_filter);
36 }
37 micromegas_tracing::levels::set_max_level(max_level);
38 }
39
40 let mut target_max_level = target_max_level;
41 target_max_level.sort_by_key(|(name, _)| name.len().wrapping_neg());
42
43 Self {
44 sinks,
45 target_level_filters: target_max_level,
46 }
47 }
48
49 fn target_max_level(&self, metadata: &LogMetadata) -> Option<LevelFilter> {
50 const GENERATION: u16 = 1;
51 match metadata.level_filter(GENERATION) {
53 micromegas_tracing::logs::FilterState::Outdated => {
54 let level_filter =
55 Self::find_max_match(metadata.target, &self.target_level_filters);
56 metadata.set_level_filter(GENERATION, level_filter);
57 level_filter
58 }
59 micromegas_tracing::logs::FilterState::NotSet => None,
60 micromegas_tracing::logs::FilterState::Set(level_filter) => Some(level_filter),
61 }
62 }
63
64 fn find_max_match(
66 target: &str,
67 level_filters: &[(String, LevelFilter)],
68 ) -> Option<LevelFilter> {
69 for (t, l) in level_filters.iter() {
70 if target.starts_with(t) {
71 return Some(*l);
72 }
73 }
74 None
75 }
76}
77
78impl EventSink for CompositeSink {
79 fn on_startup(&self, process_info: Arc<ProcessInfo>) {
80 if self.sinks.len() == 1 {
81 self.sinks[0].1.on_startup(process_info);
82 } else {
83 self.sinks
84 .iter()
85 .for_each(|(_, sink)| sink.on_startup(process_info.clone()));
86 }
87 }
88
89 fn on_shutdown(&self) {
90 self.sinks.iter().for_each(|(_, sink)| sink.on_shutdown());
91 }
92
93 fn on_log_enabled(&self, metadata: &LogMetadata) -> bool {
94 let target_max_level = self.target_max_level(metadata);
97 self.sinks.iter().any(|(max_level, sink)| {
98 metadata.level <= target_max_level.unwrap_or(*max_level)
99 && sink.on_log_enabled(metadata)
100 })
101 }
102
103 fn on_log(
104 &self,
105 metadata: &LogMetadata,
106 properties: &[Property],
107 time: i64,
108 args: fmt::Arguments<'_>,
109 ) {
110 let target_max_level = self.target_max_level(metadata);
111 self.sinks.iter().for_each(|(max_level, sink)| {
112 if metadata.level <= target_max_level.unwrap_or(*max_level)
113 && sink.on_log_enabled(metadata)
114 {
115 sink.on_log(metadata, properties, time, args);
116 }
117 });
118 }
119
120 fn on_init_log_stream(&self, log_stream: &LogStream) {
121 self.sinks
122 .iter()
123 .for_each(|(_, sink)| sink.on_init_log_stream(log_stream));
124 }
125
126 fn on_process_log_block(&self, old_event_block: Arc<LogBlock>) {
127 self.sinks
128 .iter()
129 .for_each(|(_, sink)| sink.on_process_log_block(old_event_block.clone()));
130 }
131
132 fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
133 self.sinks
134 .iter()
135 .for_each(|(_, sink)| sink.on_init_metrics_stream(metrics_stream));
136 }
137
138 fn on_process_metrics_block(&self, old_event_block: Arc<MetricsBlock>) {
139 self.sinks
140 .iter()
141 .for_each(|(_, sink)| sink.on_process_metrics_block(old_event_block.clone()));
142 }
143
144 fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
145 self.sinks
146 .iter()
147 .for_each(|(_, sink)| sink.on_init_thread_stream(thread_stream));
148 }
149
150 fn on_process_thread_block(&self, old_event_block: Arc<ThreadBlock>) {
151 self.sinks
152 .iter()
153 .for_each(|(_, sink)| sink.on_process_thread_block(old_event_block.clone()));
154 }
155
156 fn is_busy(&self) -> bool {
157 for (_, sink) in &self.sinks {
158 if sink.is_busy() {
159 return true;
160 }
161 }
162 false
163 }
164}