micromegas_telemetry_sink/
lib.rs1#![allow(
7 unsafe_code,
8 missing_docs,
9 clippy::missing_errors_doc,
10 clippy::new_without_default
11)]
12
13#[cfg(not(target_arch = "wasm32"))]
15pub mod api_key_decorator;
16#[cfg(not(target_arch = "wasm32"))]
17pub mod composite_event_sink;
18#[cfg(not(target_arch = "wasm32"))]
19pub mod http_event_sink;
20#[cfg(not(target_arch = "wasm32"))]
21pub mod local_event_sink;
22#[cfg(not(target_arch = "wasm32"))]
23pub mod log_interop;
24#[cfg(not(target_arch = "wasm32"))]
25pub mod oidc_client_credentials_decorator;
26#[cfg(not(target_arch = "wasm32"))]
27pub mod request_decorator;
28#[cfg(not(target_arch = "wasm32"))]
29pub mod stream_block;
30#[cfg(not(target_arch = "wasm32"))]
31pub mod stream_info;
32#[cfg(not(target_arch = "wasm32"))]
33pub mod system_monitor;
34#[cfg(not(target_arch = "wasm32"))]
35pub mod tracing_interop;
36
37#[cfg(target_arch = "wasm32")]
39mod console_event_sink;
40#[cfg(target_arch = "wasm32")]
41pub use console_event_sink::*;
42
43#[cfg(not(target_arch = "wasm32"))]
45mod native {
46 use std::any::TypeId;
47 use std::collections::HashMap;
48 use std::str::FromStr;
49 use std::sync::{Arc, Mutex, Weak};
50
51 use crate::log_interop::install_log_interop;
52 use crate::request_decorator::RequestDecorator;
53 use crate::tracing_interop::install_tracing_interop;
54 use micromegas_tracing::event::BoxedEventSink;
55 use micromegas_tracing::info;
56 use micromegas_tracing::{
57 event::EventSink,
58 guards::{TracingSystemGuard, TracingThreadGuard},
59 prelude::*,
60 };
61
62 use crate::composite_event_sink::CompositeSink;
63 use crate::http_event_sink::HttpEventSink;
64 use crate::local_event_sink::LocalEventSink;
65 use crate::system_monitor::spawn_system_monitor;
66
67 pub mod tokio_retry {
68 pub use tokio_retry2::*;
69 }
70
71 pub mod reqwest {
72 pub use reqwest::*;
73 }
74
75 pub struct TelemetryGuardBuilder {
76 logs_buffer_size: usize,
77 metrics_buffer_size: usize,
78 threads_buffer_size: usize,
79 target_max_levels: HashMap<String, String>,
80 max_queue_size: isize,
81 max_level_override: Option<LevelFilter>,
82 interop_max_level_override: Option<LevelFilter>,
83 install_log_capture: bool,
84 install_tracing_capture: bool,
85 local_sink_enabled: bool,
86 local_sink_max_level: LevelFilter,
87 telemetry_sink_url: Option<String>,
88 telemetry_sink_max_level: LevelFilter,
89 telemetry_metadata_retry:
90 Option<core::iter::Take<tokio_retry::strategy::ExponentialBackoff>>,
91 telemetry_blocks_retry: Option<core::iter::Take<tokio_retry::strategy::ExponentialBackoff>>,
92 telemetry_make_request_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
93 extra_sinks: HashMap<TypeId, (LevelFilter, BoxedEventSink)>,
94 system_metrics_enabled: bool,
95 default_system_properties_enabled: bool,
96 process_properties: HashMap<String, String>,
97 }
98
99 impl Default for TelemetryGuardBuilder {
100 fn default() -> Self {
101 Self {
102 logs_buffer_size: 10 * 1024 * 1024,
103 metrics_buffer_size: 1024 * 1024,
104 threads_buffer_size: 10 * 1024 * 1024,
105 local_sink_enabled: true,
106 local_sink_max_level: LevelFilter::Info,
107 telemetry_sink_url: None,
108 telemetry_sink_max_level: LevelFilter::Debug,
109 telemetry_metadata_retry: None,
110 telemetry_blocks_retry: None,
111 telemetry_make_request_decorator: Box::new(|| {
112 Arc::new(crate::request_decorator::TrivialRequestDecorator {})
113 }),
114 target_max_levels: HashMap::default(),
115 max_queue_size: 16, max_level_override: None,
117 interop_max_level_override: None,
118 install_log_capture: false,
119 install_tracing_capture: true,
120 extra_sinks: HashMap::default(),
121 system_metrics_enabled: true,
122 default_system_properties_enabled: true,
123 process_properties: HashMap::default(),
124 }
125 }
126 }
127
128 impl TelemetryGuardBuilder {
129 #[must_use]
131 pub fn add_sink<Sink>(mut self, max_level: LevelFilter, sink: Sink) -> Self
132 where
133 Sink: EventSink + 'static,
134 {
135 let type_id = TypeId::of::<Sink>();
136
137 self.extra_sinks
138 .entry(type_id)
139 .or_insert_with(|| (max_level, Box::new(sink)));
140
141 self
142 }
143
144 #[must_use]
146 pub fn with_max_level_override(mut self, level_filter: LevelFilter) -> Self {
147 self.max_level_override = Some(level_filter);
148 self
149 }
150
151 #[must_use]
152 pub fn with_local_sink_enabled(mut self, enabled: bool) -> Self {
153 self.local_sink_enabled = enabled;
154 self
155 }
156
157 #[must_use]
158 pub fn with_interop_max_level_override(mut self, level_filter: LevelFilter) -> Self {
159 self.interop_max_level_override = Some(level_filter);
160 self
161 }
162
163 #[must_use]
164 pub fn with_install_log_capture(mut self, enabled: bool) -> Self {
165 self.install_log_capture = enabled;
166 self
167 }
168
169 #[must_use]
170 pub fn with_install_tracing_capture(mut self, enabled: bool) -> Self {
171 self.install_tracing_capture = enabled;
172 self
173 }
174
175 #[must_use]
176 pub fn with_local_sink_max_level(mut self, level_filter: LevelFilter) -> Self {
177 self.local_sink_max_level = level_filter;
178 self
179 }
180
181 #[must_use]
182 pub fn with_ctrlc_handling(self) -> Self {
183 ctrlc::set_handler(move || {
184 info!("Ctrl+C was hit!");
185 micromegas_tracing::guards::shutdown_telemetry();
186 std::process::exit(1);
187 })
188 .expect("Error setting Ctrl+C handler");
189 self
190 }
191
192 #[must_use]
193 pub fn with_telemetry_metadata_retry(
194 mut self,
195 retry_strategy: core::iter::Take<tokio_retry::strategy::ExponentialBackoff>,
196 ) -> Self {
197 self.telemetry_metadata_retry = Some(retry_strategy);
198 self
199 }
200
201 #[must_use]
202 pub fn with_telemetry_blocks_retry(
203 mut self,
204 retry_strategy: core::iter::Take<tokio_retry::strategy::ExponentialBackoff>,
205 ) -> Self {
206 self.telemetry_blocks_retry = Some(retry_strategy);
207 self
208 }
209
210 #[must_use]
211 pub fn with_request_decorator(
212 mut self,
213 make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
214 ) -> Self {
215 self.telemetry_make_request_decorator = make_decorator;
216 self
217 }
218
219 #[must_use]
244 pub fn with_auth_from_env(mut self) -> Self {
245 use crate::api_key_decorator::ApiKeyRequestDecorator;
246 use crate::oidc_client_credentials_decorator::OidcClientCredentialsDecorator;
247
248 if let Ok(decorator) = ApiKeyRequestDecorator::from_env() {
250 info!("Configured telemetry sink with API key authentication");
251 self.telemetry_make_request_decorator = Box::new(move || Arc::new(decorator));
252 return self;
253 }
254
255 if let Ok(decorator) = OidcClientCredentialsDecorator::from_env() {
257 info!("Configured telemetry sink with OIDC client credentials authentication");
258 self.telemetry_make_request_decorator = Box::new(move || Arc::new(decorator));
259 return self;
260 }
261
262 info!(
264 "Telemetry sink authentication not configured - sending unauthenticated requests"
265 );
266 self
267 }
268
269 #[must_use]
270 pub fn with_system_metrics_enabled(mut self, enabled: bool) -> Self {
271 self.system_metrics_enabled = enabled;
272 self
273 }
274
275 #[must_use]
276 pub fn with_default_system_properties_enabled(mut self, enabled: bool) -> Self {
277 self.default_system_properties_enabled = enabled;
278 self
279 }
280
281 #[must_use]
286 pub fn with_telemetry_sink_url(mut self, url: String) -> Self {
287 self.telemetry_sink_url = Some(url);
288 self
289 }
290
291 #[must_use]
297 pub fn with_process_property(mut self, key: String, value: String) -> Self {
298 self.process_properties.insert(key, value);
299 self
300 }
301
302 #[must_use]
308 pub fn with_process_properties(
309 mut self,
310 process_properties: HashMap<String, String>,
311 ) -> Self {
312 self.process_properties.extend(process_properties);
313 self
314 }
315
316 fn populate_default_system_properties(&mut self) {
317 let props = &mut self.process_properties;
318 type DefaultProperties = Vec<(&'static str, Box<dyn FnOnce() -> String>)>;
320 let defaults: DefaultProperties = vec![
321 (
322 "exe",
323 Box::new(|| {
324 std::env::current_exe()
325 .unwrap_or_default()
326 .to_string_lossy()
327 .into_owned()
328 }),
329 ),
330 ("username", Box::new(whoami::username)),
331 ("realname", Box::new(whoami::realname)),
332 ("computer", Box::new(whoami::devicename)),
333 ("distro", Box::new(whoami::distro)),
334 (
335 "cpu_brand",
336 Box::new(|| {
337 #[cfg(target_arch = "x86_64")]
338 {
339 raw_cpuid::CpuId::new()
340 .get_processor_brand_string()
341 .map_or_else(|| "unknown".to_owned(), |b| b.as_str().to_owned())
342 }
343 #[cfg(not(target_arch = "x86_64"))]
344 {
345 String::from(std::env::consts::ARCH)
346 }
347 }),
348 ),
349 (
350 "physical_core_count",
351 Box::new(|| {
352 sysinfo::System::physical_core_count()
353 .map(|c: usize| c.to_string())
354 .unwrap_or_default()
355 }),
356 ),
357 (
358 "logical_cpu_count",
359 Box::new(|| {
360 use sysinfo::{CpuRefreshKind, RefreshKind};
361 let system = sysinfo::System::new_with_specifics(
362 RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing()),
363 );
364 system.cpus().len().to_string()
365 }),
366 ),
367 (
368 "total_memory",
369 Box::new(|| {
370 use sysinfo::{MemoryRefreshKind, RefreshKind};
371 let system = sysinfo::System::new_with_specifics(
372 RefreshKind::nothing()
373 .with_memory(MemoryRefreshKind::nothing().with_ram()),
374 );
375 system.total_memory().to_string()
376 }),
377 ),
378 ];
379 for (key, make_value) in defaults {
380 props.entry(key.to_string()).or_insert_with(make_value);
381 }
382 }
383
384 pub fn build(mut self) -> anyhow::Result<TelemetryGuard> {
385 if self.default_system_properties_enabled {
386 self.populate_default_system_properties();
387 }
388 let target_max_level: Vec<_> = self
389 .target_max_levels
390 .into_iter()
391 .filter(|(key, _val)| key != "MAX_LEVEL")
392 .map(|(key, val)| {
393 (
394 key,
395 LevelFilter::from_str(val.as_str()).unwrap_or(LevelFilter::Off),
396 )
397 })
398 .collect();
399
400 let guard = {
401 lazy_static::lazy_static! {
402 static ref GLOBAL_WEAK_GUARD: Mutex<Weak<TracingSystemGuard>> = Mutex::new(Weak::new());
403 }
404 let mut weak_guard = GLOBAL_WEAK_GUARD.lock().unwrap();
405 let weak = &mut *weak_guard;
406
407 if let Some(arc) = weak.upgrade() {
408 arc
409 } else {
410 let mut sinks: Vec<(LevelFilter, BoxedEventSink)> = vec![];
411 let telemetry_sink_url = self
412 .telemetry_sink_url
413 .or_else(|| std::env::var("MICROMEGAS_TELEMETRY_URL").ok())
414 .filter(|url| !url.trim().is_empty());
415
416 if let Some(url) = telemetry_sink_url {
417 let metadata_retry = self.telemetry_metadata_retry.unwrap_or_else(|| {
418 tokio_retry::strategy::ExponentialBackoff::from_millis(10).take(3)
419 });
420 let blocks_retry = self.telemetry_blocks_retry.unwrap_or_else(|| {
421 tokio_retry::strategy::ExponentialBackoff::from_millis(10).take(3)
422 });
423 sinks.push((
424 self.telemetry_sink_max_level,
425 Box::new(HttpEventSink::new(
426 &url,
427 self.max_queue_size,
428 metadata_retry,
429 blocks_retry,
430 self.telemetry_make_request_decorator,
431 )),
432 ));
433 }
434 if self.local_sink_enabled {
435 sinks.push((self.local_sink_max_level, Box::new(LocalEventSink::new())));
436 }
437 let mut extra_sinks = self.extra_sinks.into_values().collect();
438 sinks.append(&mut extra_sinks);
439
440 let sink: BoxedEventSink = Box::new(CompositeSink::new(
441 sinks,
442 target_max_level,
443 self.max_level_override,
444 ));
445
446 if self.install_log_capture {
448 install_log_interop(self.interop_max_level_override);
449 }
450 if self.install_tracing_capture {
451 install_tracing_interop(self.interop_max_level_override);
452 }
453
454 let arc = Arc::<TracingSystemGuard>::new(TracingSystemGuard::new(
455 self.logs_buffer_size,
456 self.metrics_buffer_size,
457 self.threads_buffer_size,
458 sink.into(),
459 self.process_properties,
460 std::env::var("MICROMEGAS_ENABLE_CPU_TRACING")
461 .map(|v| v == "true")
462 .unwrap_or(false), )?);
464
465 if self.system_metrics_enabled {
466 spawn_system_monitor();
467 }
468
469 *weak = Arc::<TracingSystemGuard>::downgrade(&arc);
470 arc
471 }
472 };
473 Ok(TelemetryGuard {
475 _guard: guard,
476 _thread_guard: TracingThreadGuard::new(),
477 })
478 }
479 }
480
481 pub struct TelemetryGuard {
482 _thread_guard: TracingThreadGuard,
484 _guard: Arc<TracingSystemGuard>,
485 }
486
487 impl TelemetryGuard {
488 pub fn new() -> anyhow::Result<Self> {
489 TelemetryGuardBuilder::default().build()
490 }
491 }
492}
493
494#[cfg(not(target_arch = "wasm32"))]
495pub use native::*;
496
497#[cfg(target_arch = "wasm32")]
499mod wasm {
500 use std::collections::HashMap;
501 use std::sync::Arc;
502
503 use micromegas_tracing::guards::TracingSystemGuard;
504
505 use crate::ConsoleEventSink;
506
507 pub struct TelemetryGuard {
508 _guard: Arc<TracingSystemGuard>,
509 }
510
511 pub fn init_telemetry() -> anyhow::Result<TelemetryGuard> {
512 let guard = Arc::new(TracingSystemGuard::new(
513 0,
514 0,
515 0,
516 Arc::new(ConsoleEventSink),
517 HashMap::new(),
518 false,
519 )?);
520 Ok(TelemetryGuard { _guard: guard })
521 }
522}
523
524#[cfg(target_arch = "wasm32")]
525pub use wasm::*;