micromegas_telemetry_sink/
lib.rs

1//! Telemetry HTTP sink library
2//!
3//! Provides logging, metrics, memory and performance profiling
4
5// crate-specific lint exceptions:
6#![allow(
7    unsafe_code,
8    missing_docs,
9    clippy::missing_errors_doc,
10    clippy::new_without_default
11)]
12
13// ---- Native-only modules ----
14#[cfg(not(target_arch = "wasm32"))]
15pub mod api_key_decorator;
16#[cfg(not(target_arch = "wasm32"))]
17pub mod composite_event_sink;
18#[cfg(not(target_arch = "wasm32"))]
19pub mod http_event_sink;
20#[cfg(not(target_arch = "wasm32"))]
21pub mod local_event_sink;
22#[cfg(not(target_arch = "wasm32"))]
23pub mod log_interop;
24#[cfg(not(target_arch = "wasm32"))]
25pub mod oidc_client_credentials_decorator;
26#[cfg(not(target_arch = "wasm32"))]
27pub mod request_decorator;
28#[cfg(not(target_arch = "wasm32"))]
29pub mod stream_block;
30#[cfg(not(target_arch = "wasm32"))]
31pub mod stream_info;
32#[cfg(not(target_arch = "wasm32"))]
33pub mod system_monitor;
34#[cfg(not(target_arch = "wasm32"))]
35pub mod tracing_interop;
36
37// ---- Wasm-only modules ----
38#[cfg(target_arch = "wasm32")]
39mod console_event_sink;
40#[cfg(target_arch = "wasm32")]
41pub use console_event_sink::*;
42
43// ---- Native implementation ----
44#[cfg(not(target_arch = "wasm32"))]
45mod native {
46    use std::any::TypeId;
47    use std::collections::HashMap;
48    use std::str::FromStr;
49    use std::sync::{Arc, Mutex, Weak};
50
51    use crate::log_interop::install_log_interop;
52    use crate::request_decorator::RequestDecorator;
53    use crate::tracing_interop::install_tracing_interop;
54    use micromegas_tracing::event::BoxedEventSink;
55    use micromegas_tracing::info;
56    use micromegas_tracing::{
57        event::EventSink,
58        guards::{TracingSystemGuard, TracingThreadGuard},
59        prelude::*,
60    };
61
62    use crate::composite_event_sink::CompositeSink;
63    use crate::http_event_sink::HttpEventSink;
64    use crate::local_event_sink::LocalEventSink;
65    use crate::system_monitor::spawn_system_monitor;
66
67    pub mod tokio_retry {
68        pub use tokio_retry2::*;
69    }
70
71    pub mod reqwest {
72        pub use reqwest::*;
73    }
74
75    pub struct TelemetryGuardBuilder {
76        logs_buffer_size: usize,
77        metrics_buffer_size: usize,
78        threads_buffer_size: usize,
79        target_max_levels: HashMap<String, String>,
80        max_queue_size: isize,
81        max_level_override: Option<LevelFilter>,
82        interop_max_level_override: Option<LevelFilter>,
83        install_log_capture: bool,
84        install_tracing_capture: bool,
85        local_sink_enabled: bool,
86        local_sink_max_level: LevelFilter,
87        telemetry_sink_url: Option<String>,
88        telemetry_sink_max_level: LevelFilter,
89        telemetry_metadata_retry:
90            Option<core::iter::Take<tokio_retry::strategy::ExponentialBackoff>>,
91        telemetry_blocks_retry: Option<core::iter::Take<tokio_retry::strategy::ExponentialBackoff>>,
92        telemetry_make_request_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
93        extra_sinks: HashMap<TypeId, (LevelFilter, BoxedEventSink)>,
94        system_metrics_enabled: bool,
95        default_system_properties_enabled: bool,
96        process_properties: HashMap<String, String>,
97    }
98
99    impl Default for TelemetryGuardBuilder {
100        fn default() -> Self {
101            Self {
102                logs_buffer_size: 10 * 1024 * 1024,
103                metrics_buffer_size: 1024 * 1024,
104                threads_buffer_size: 10 * 1024 * 1024,
105                local_sink_enabled: true,
106                local_sink_max_level: LevelFilter::Info,
107                telemetry_sink_url: None,
108                telemetry_sink_max_level: LevelFilter::Debug,
109                telemetry_metadata_retry: None,
110                telemetry_blocks_retry: None,
111                telemetry_make_request_decorator: Box::new(|| {
112                    Arc::new(crate::request_decorator::TrivialRequestDecorator {})
113                }),
114                target_max_levels: HashMap::default(),
115                max_queue_size: 16, //todo: change to nb_threads * 2
116                max_level_override: None,
117                interop_max_level_override: None,
118                install_log_capture: false,
119                install_tracing_capture: true,
120                extra_sinks: HashMap::default(),
121                system_metrics_enabled: true,
122                default_system_properties_enabled: true,
123                process_properties: HashMap::default(),
124            }
125        }
126    }
127
128    impl TelemetryGuardBuilder {
129        // Only one sink per type ?
130        #[must_use]
131        pub fn add_sink<Sink>(mut self, max_level: LevelFilter, sink: Sink) -> Self
132        where
133            Sink: EventSink + 'static,
134        {
135            let type_id = TypeId::of::<Sink>();
136
137            self.extra_sinks
138                .entry(type_id)
139                .or_insert_with(|| (max_level, Box::new(sink)));
140
141            self
142        }
143
144        /// Programmatic override
145        #[must_use]
146        pub fn with_max_level_override(mut self, level_filter: LevelFilter) -> Self {
147            self.max_level_override = Some(level_filter);
148            self
149        }
150
151        #[must_use]
152        pub fn with_local_sink_enabled(mut self, enabled: bool) -> Self {
153            self.local_sink_enabled = enabled;
154            self
155        }
156
157        #[must_use]
158        pub fn with_interop_max_level_override(mut self, level_filter: LevelFilter) -> Self {
159            self.interop_max_level_override = Some(level_filter);
160            self
161        }
162
163        #[must_use]
164        pub fn with_install_log_capture(mut self, enabled: bool) -> Self {
165            self.install_log_capture = enabled;
166            self
167        }
168
169        #[must_use]
170        pub fn with_install_tracing_capture(mut self, enabled: bool) -> Self {
171            self.install_tracing_capture = enabled;
172            self
173        }
174
175        #[must_use]
176        pub fn with_local_sink_max_level(mut self, level_filter: LevelFilter) -> Self {
177            self.local_sink_max_level = level_filter;
178            self
179        }
180
181        #[must_use]
182        pub fn with_ctrlc_handling(self) -> Self {
183            ctrlc::set_handler(move || {
184                info!("Ctrl+C was hit!");
185                micromegas_tracing::guards::shutdown_telemetry();
186                std::process::exit(1);
187            })
188            .expect("Error setting Ctrl+C handler");
189            self
190        }
191
192        #[must_use]
193        pub fn with_telemetry_metadata_retry(
194            mut self,
195            retry_strategy: core::iter::Take<tokio_retry::strategy::ExponentialBackoff>,
196        ) -> Self {
197            self.telemetry_metadata_retry = Some(retry_strategy);
198            self
199        }
200
201        #[must_use]
202        pub fn with_telemetry_blocks_retry(
203            mut self,
204            retry_strategy: core::iter::Take<tokio_retry::strategy::ExponentialBackoff>,
205        ) -> Self {
206            self.telemetry_blocks_retry = Some(retry_strategy);
207            self
208        }
209
210        #[must_use]
211        pub fn with_request_decorator(
212            mut self,
213            make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
214        ) -> Self {
215            self.telemetry_make_request_decorator = make_decorator;
216            self
217        }
218
219        /// Automatically configure authentication from environment variables.
220        ///
221        /// Checks for authentication configuration in this order:
222        /// 1. API key authentication via `MICROMEGAS_INGESTION_API_KEY`
223        /// 2. OIDC client credentials via `MICROMEGAS_OIDC_TOKEN_ENDPOINT`,
224        ///    `MICROMEGAS_OIDC_CLIENT_ID`, and `MICROMEGAS_OIDC_CLIENT_SECRET`
225        /// 3. Falls back to no authentication (TrivialRequestDecorator)
226        ///
227        /// # Example
228        ///
229        /// ```rust,no_run
230        /// use micromegas_telemetry_sink::TelemetryGuardBuilder;
231        ///
232        /// // Set environment variable
233        /// unsafe {
234        ///     std::env::set_var("MICROMEGAS_INGESTION_API_KEY", "secret-key-123");
235        /// }
236        ///
237        /// // Builder automatically configures API key authentication
238        /// let _guard = TelemetryGuardBuilder::default()
239        ///     .with_auth_from_env()
240        ///     .build()
241        ///     .expect("Failed to build telemetry guard");
242        /// ```
243        #[must_use]
244        pub fn with_auth_from_env(mut self) -> Self {
245            use crate::api_key_decorator::ApiKeyRequestDecorator;
246            use crate::oidc_client_credentials_decorator::OidcClientCredentialsDecorator;
247
248            // Try API key authentication first
249            if let Ok(decorator) = ApiKeyRequestDecorator::from_env() {
250                info!("Configured telemetry sink with API key authentication");
251                self.telemetry_make_request_decorator = Box::new(move || Arc::new(decorator));
252                return self;
253            }
254
255            // Try OIDC client credentials authentication
256            if let Ok(decorator) = OidcClientCredentialsDecorator::from_env() {
257                info!("Configured telemetry sink with OIDC client credentials authentication");
258                self.telemetry_make_request_decorator = Box::new(move || Arc::new(decorator));
259                return self;
260            }
261
262            // No authentication configured - use trivial decorator (no-op)
263            info!(
264                "Telemetry sink authentication not configured - sending unauthenticated requests"
265            );
266            self
267        }
268
269        #[must_use]
270        pub fn with_system_metrics_enabled(mut self, enabled: bool) -> Self {
271            self.system_metrics_enabled = enabled;
272            self
273        }
274
275        #[must_use]
276        pub fn with_default_system_properties_enabled(mut self, enabled: bool) -> Self {
277            self.default_system_properties_enabled = enabled;
278            self
279        }
280
281        /// Set the URL of telemetry sink.
282        ///
283        /// If not explicitly set, the URL will be read from the `MICROMEGAS_TELEMETRY_URL` environment
284        /// variable.
285        #[must_use]
286        pub fn with_telemetry_sink_url(mut self, url: String) -> Self {
287            self.telemetry_sink_url = Some(url);
288            self
289        }
290
291        /// Add a single property to the process info.
292        ///
293        /// # Warning
294        ///
295        /// This will override any existing properties.
296        #[must_use]
297        pub fn with_process_property(mut self, key: String, value: String) -> Self {
298            self.process_properties.insert(key, value);
299            self
300        }
301
302        /// Add multiple properties to the process info.
303        ///
304        /// # Warning
305        ///
306        /// This will override any existing properties.
307        #[must_use]
308        pub fn with_process_properties(
309            mut self,
310            process_properties: HashMap<String, String>,
311        ) -> Self {
312            self.process_properties.extend(process_properties);
313            self
314        }
315
316        fn populate_default_system_properties(&mut self) {
317            let props = &mut self.process_properties;
318            // Process identity (duplicates ProcessInfo fields into properties)
319            type DefaultProperties = Vec<(&'static str, Box<dyn FnOnce() -> String>)>;
320            let defaults: DefaultProperties = vec![
321                (
322                    "exe",
323                    Box::new(|| {
324                        std::env::current_exe()
325                            .unwrap_or_default()
326                            .to_string_lossy()
327                            .into_owned()
328                    }),
329                ),
330                ("username", Box::new(whoami::username)),
331                ("realname", Box::new(whoami::realname)),
332                ("computer", Box::new(whoami::devicename)),
333                ("distro", Box::new(whoami::distro)),
334                (
335                    "cpu_brand",
336                    Box::new(|| {
337                        #[cfg(target_arch = "x86_64")]
338                        {
339                            raw_cpuid::CpuId::new()
340                                .get_processor_brand_string()
341                                .map_or_else(|| "unknown".to_owned(), |b| b.as_str().to_owned())
342                        }
343                        #[cfg(not(target_arch = "x86_64"))]
344                        {
345                            String::from(std::env::consts::ARCH)
346                        }
347                    }),
348                ),
349                (
350                    "physical_core_count",
351                    Box::new(|| {
352                        sysinfo::System::physical_core_count()
353                            .map(|c: usize| c.to_string())
354                            .unwrap_or_default()
355                    }),
356                ),
357                (
358                    "logical_cpu_count",
359                    Box::new(|| {
360                        use sysinfo::{CpuRefreshKind, RefreshKind};
361                        let system = sysinfo::System::new_with_specifics(
362                            RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing()),
363                        );
364                        system.cpus().len().to_string()
365                    }),
366                ),
367                (
368                    "total_memory",
369                    Box::new(|| {
370                        use sysinfo::{MemoryRefreshKind, RefreshKind};
371                        let system = sysinfo::System::new_with_specifics(
372                            RefreshKind::nothing()
373                                .with_memory(MemoryRefreshKind::nothing().with_ram()),
374                        );
375                        system.total_memory().to_string()
376                    }),
377                ),
378            ];
379            for (key, make_value) in defaults {
380                props.entry(key.to_string()).or_insert_with(make_value);
381            }
382        }
383
384        pub fn build(mut self) -> anyhow::Result<TelemetryGuard> {
385            if self.default_system_properties_enabled {
386                self.populate_default_system_properties();
387            }
388            let target_max_level: Vec<_> = self
389                .target_max_levels
390                .into_iter()
391                .filter(|(key, _val)| key != "MAX_LEVEL")
392                .map(|(key, val)| {
393                    (
394                        key,
395                        LevelFilter::from_str(val.as_str()).unwrap_or(LevelFilter::Off),
396                    )
397                })
398                .collect();
399
400            let guard = {
401                lazy_static::lazy_static! {
402                    static ref GLOBAL_WEAK_GUARD: Mutex<Weak<TracingSystemGuard>> = Mutex::new(Weak::new());
403                }
404                let mut weak_guard = GLOBAL_WEAK_GUARD.lock().unwrap();
405                let weak = &mut *weak_guard;
406
407                if let Some(arc) = weak.upgrade() {
408                    arc
409                } else {
410                    let mut sinks: Vec<(LevelFilter, BoxedEventSink)> = vec![];
411                    let telemetry_sink_url = self
412                        .telemetry_sink_url
413                        .or_else(|| std::env::var("MICROMEGAS_TELEMETRY_URL").ok())
414                        .filter(|url| !url.trim().is_empty());
415
416                    if let Some(url) = telemetry_sink_url {
417                        let metadata_retry = self.telemetry_metadata_retry.unwrap_or_else(|| {
418                            tokio_retry::strategy::ExponentialBackoff::from_millis(10).take(3)
419                        });
420                        let blocks_retry = self.telemetry_blocks_retry.unwrap_or_else(|| {
421                            tokio_retry::strategy::ExponentialBackoff::from_millis(10).take(3)
422                        });
423                        sinks.push((
424                            self.telemetry_sink_max_level,
425                            Box::new(HttpEventSink::new(
426                                &url,
427                                self.max_queue_size,
428                                metadata_retry,
429                                blocks_retry,
430                                self.telemetry_make_request_decorator,
431                            )),
432                        ));
433                    }
434                    if self.local_sink_enabled {
435                        sinks.push((self.local_sink_max_level, Box::new(LocalEventSink::new())));
436                    }
437                    let mut extra_sinks = self.extra_sinks.into_values().collect();
438                    sinks.append(&mut extra_sinks);
439
440                    let sink: BoxedEventSink = Box::new(CompositeSink::new(
441                        sinks,
442                        target_max_level,
443                        self.max_level_override,
444                    ));
445
446                    // the composite sink inits micromegas_tracing::levels::set_max_level, which install_log_interop needs
447                    if self.install_log_capture {
448                        install_log_interop(self.interop_max_level_override);
449                    }
450                    if self.install_tracing_capture {
451                        install_tracing_interop(self.interop_max_level_override);
452                    }
453
454                    let arc = Arc::<TracingSystemGuard>::new(TracingSystemGuard::new(
455                        self.logs_buffer_size,
456                        self.metrics_buffer_size,
457                        self.threads_buffer_size,
458                        sink.into(),
459                        self.process_properties,
460                        std::env::var("MICROMEGAS_ENABLE_CPU_TRACING")
461                            .map(|v| v == "true")
462                            .unwrap_or(false), // Default to disabled for minimal overhead
463                    )?);
464
465                    if self.system_metrics_enabled {
466                        spawn_system_monitor();
467                    }
468
469                    *weak = Arc::<TracingSystemGuard>::downgrade(&arc);
470                    arc
471                }
472            };
473            // order here is important
474            Ok(TelemetryGuard {
475                _guard: guard,
476                _thread_guard: TracingThreadGuard::new(),
477            })
478        }
479    }
480
481    pub struct TelemetryGuard {
482        // note we rely here on the drop order being the same as the declaration order
483        _thread_guard: TracingThreadGuard,
484        _guard: Arc<TracingSystemGuard>,
485    }
486
487    impl TelemetryGuard {
488        pub fn new() -> anyhow::Result<Self> {
489            TelemetryGuardBuilder::default().build()
490        }
491    }
492}
493
494#[cfg(not(target_arch = "wasm32"))]
495pub use native::*;
496
497// ---- Wasm implementation ----
498#[cfg(target_arch = "wasm32")]
499mod wasm {
500    use std::collections::HashMap;
501    use std::sync::Arc;
502
503    use micromegas_tracing::guards::TracingSystemGuard;
504
505    use crate::ConsoleEventSink;
506
507    pub struct TelemetryGuard {
508        _guard: Arc<TracingSystemGuard>,
509    }
510
511    pub fn init_telemetry() -> anyhow::Result<TelemetryGuard> {
512        let guard = Arc::new(TracingSystemGuard::new(
513            0,
514            0,
515            0,
516            Arc::new(ConsoleEventSink),
517            HashMap::new(),
518            false,
519        )?);
520        Ok(TelemetryGuard { _guard: guard })
521    }
522}
523
524#[cfg(target_arch = "wasm32")]
525pub use wasm::*;