micromegas_telemetry_sink/
composite_event_sink.rs

1use micromegas_tracing::{
2    event::{BoxedEventSink, EventSink},
3    logs::{LogBlock, LogMetadata, LogStream},
4    metrics::{MetricsBlock, MetricsStream},
5    prelude::*,
6    property_set::Property,
7    spans::{ThreadBlock, ThreadStream},
8};
9use std::{fmt, sync::Arc};
10
11/// An `EventSink` that dispatches events to multiple other `EventSink`s.
12///
13/// This allows for fanning out telemetry events to different sinks, each with its own
14/// filtering and processing logic.
15pub struct CompositeSink {
16    sinks: Vec<(LevelFilter, BoxedEventSink)>,
17    target_level_filters: Vec<(String, LevelFilter)>,
18}
19
20impl CompositeSink {
21    /// Creates a new `CompositeSink`.
22    pub fn new(
23        sinks: Vec<(LevelFilter, BoxedEventSink)>,
24        target_max_level: Vec<(String, LevelFilter)>,
25        max_level_override: Option<LevelFilter>,
26    ) -> Self {
27        if let Some(max_level) = max_level_override {
28            micromegas_tracing::levels::set_max_level(max_level);
29        } else {
30            let mut max_level = LevelFilter::Off;
31            for (_, level_filter) in &target_max_level {
32                max_level = max_level.max(*level_filter);
33            }
34            for (level_filter, _) in &sinks {
35                max_level = max_level.max(*level_filter);
36            }
37            micromegas_tracing::levels::set_max_level(max_level);
38        }
39
40        let mut target_max_level = target_max_level;
41        target_max_level.sort_by_key(|(name, _)| name.len().wrapping_neg());
42
43        Self {
44            sinks,
45            target_level_filters: target_max_level,
46        }
47    }
48
49    fn target_max_level(&self, metadata: &LogMetadata) -> Option<LevelFilter> {
50        const GENERATION: u16 = 1;
51        // At this point we would have already tested the max level on the macro
52        match metadata.level_filter(GENERATION) {
53            micromegas_tracing::logs::FilterState::Outdated => {
54                let level_filter =
55                    Self::find_max_match(metadata.target, &self.target_level_filters);
56                metadata.set_level_filter(GENERATION, level_filter);
57                level_filter
58            }
59            micromegas_tracing::logs::FilterState::NotSet => None,
60            micromegas_tracing::logs::FilterState::Set(level_filter) => Some(level_filter),
61        }
62    }
63
64    /// This needs to be optimized
65    fn find_max_match(
66        target: &str,
67        level_filters: &[(String, LevelFilter)],
68    ) -> Option<LevelFilter> {
69        for (t, l) in level_filters.iter() {
70            if target.starts_with(t) {
71                return Some(*l);
72            }
73        }
74        None
75    }
76}
77
78impl EventSink for CompositeSink {
79    fn on_startup(&self, process_info: Arc<ProcessInfo>) {
80        if self.sinks.len() == 1 {
81            self.sinks[0].1.on_startup(process_info);
82        } else {
83            self.sinks
84                .iter()
85                .for_each(|(_, sink)| sink.on_startup(process_info.clone()));
86        }
87    }
88
89    fn on_shutdown(&self) {
90        self.sinks.iter().for_each(|(_, sink)| sink.on_shutdown());
91    }
92
93    fn on_log_enabled(&self, metadata: &LogMetadata) -> bool {
94        // The log is enabled if any of the sinks are enabled
95        // If the sinks vec is empty `false` will be returned
96        let target_max_level = self.target_max_level(metadata);
97        self.sinks.iter().any(|(max_level, sink)| {
98            metadata.level <= target_max_level.unwrap_or(*max_level)
99                && sink.on_log_enabled(metadata)
100        })
101    }
102
103    fn on_log(
104        &self,
105        metadata: &LogMetadata,
106        properties: &[Property],
107        time: i64,
108        args: fmt::Arguments<'_>,
109    ) {
110        let target_max_level = self.target_max_level(metadata);
111        self.sinks.iter().for_each(|(max_level, sink)| {
112            if metadata.level <= target_max_level.unwrap_or(*max_level)
113                && sink.on_log_enabled(metadata)
114            {
115                sink.on_log(metadata, properties, time, args);
116            }
117        });
118    }
119
120    fn on_init_log_stream(&self, log_stream: &LogStream) {
121        self.sinks
122            .iter()
123            .for_each(|(_, sink)| sink.on_init_log_stream(log_stream));
124    }
125
126    fn on_process_log_block(&self, old_event_block: Arc<LogBlock>) {
127        self.sinks
128            .iter()
129            .for_each(|(_, sink)| sink.on_process_log_block(old_event_block.clone()));
130    }
131
132    fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
133        self.sinks
134            .iter()
135            .for_each(|(_, sink)| sink.on_init_metrics_stream(metrics_stream));
136    }
137
138    fn on_process_metrics_block(&self, old_event_block: Arc<MetricsBlock>) {
139        self.sinks
140            .iter()
141            .for_each(|(_, sink)| sink.on_process_metrics_block(old_event_block.clone()));
142    }
143
144    fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
145        self.sinks
146            .iter()
147            .for_each(|(_, sink)| sink.on_init_thread_stream(thread_stream));
148    }
149
150    fn on_process_thread_block(&self, old_event_block: Arc<ThreadBlock>) {
151        self.sinks
152            .iter()
153            .for_each(|(_, sink)| sink.on_process_thread_block(old_event_block.clone()));
154    }
155
156    fn is_busy(&self) -> bool {
157        for (_, sink) in &self.sinks {
158            if sink.is_busy() {
159                return true;
160            }
161        }
162        false
163    }
164}