1pub use crate::errors::{Error, Result};
3use crate::intern_string::intern_string;
4use crate::logs::TaggedLogString;
5use crate::metrics::{TaggedFloatMetricEvent, TaggedIntegerMetricEvent};
6use crate::prelude::*;
7use crate::property_set::PropertySet;
8use crate::{
9 event::{EventSink, NullEventSink, TracingBlock},
10 info,
11 logs::{
12 LogBlock, LogMetadata, LogStaticStrEvent, LogStaticStrInteropEvent, LogStream,
13 LogStringEvent, LogStringInteropEvent,
14 },
15 metrics::{
16 FloatMetricEvent, IntegerMetricEvent, MetricsBlock, MetricsStream, StaticMetricMetadata,
17 },
18 spans::{
19 BeginAsyncNamedSpanEvent, BeginAsyncSpanEvent, BeginThreadNamedSpanEvent,
20 BeginThreadSpanEvent, EndAsyncNamedSpanEvent, EndAsyncSpanEvent, EndThreadNamedSpanEvent,
21 EndThreadSpanEvent, SpanLocation, SpanMetadata, ThreadBlock, ThreadEventQueueTypeIndex,
22 ThreadStream,
23 },
24 warn,
25};
26use chrono::Utc;
27use std::cell::OnceCell;
28use std::cell::UnsafeCell;
29use std::collections::HashMap;
30use std::fmt;
31use std::sync::RwLock;
32use std::{
33 cell::Cell,
34 sync::{Arc, Mutex},
35};
36
37pub fn init_event_dispatch(
38 logs_buffer_size: usize,
39 metrics_buffer_size: usize,
40 threads_buffer_size: usize,
41 sink: Arc<dyn EventSink>,
42 process_properties: HashMap<String, String>,
43 cpu_tracing_enabled: bool,
44) -> Result<()> {
45 lazy_static::lazy_static! {
46 static ref INIT_MUTEX: Mutex<()> = Mutex::new(());
47 }
48 let _guard = INIT_MUTEX.lock().unwrap();
49 let dispatch_ref = &G_DISPATCH.inner;
50 unsafe {
51 if (*dispatch_ref.get()).get().is_none() {
52 (*dispatch_ref.get())
53 .set(Dispatch::new(
54 logs_buffer_size,
55 metrics_buffer_size,
56 threads_buffer_size,
57 sink,
58 process_properties,
59 cpu_tracing_enabled,
60 ))
61 .map_err(|_| Error::AlreadyInitialized())
62 } else {
63 info!("event dispatch already initialized");
64 Err(Error::AlreadyInitialized())
65 }
66 }
67}
68
69#[inline]
70pub fn process_id() -> Option<uuid::Uuid> {
71 G_DISPATCH.get().map(Dispatch::get_process_id)
72}
73
74#[inline]
75pub fn cpu_tracing_enabled() -> Option<bool> {
76 G_DISPATCH.get().map(Dispatch::get_cpu_tracing_enabled)
77}
78
79pub fn get_sink() -> Option<Arc<dyn EventSink>> {
80 G_DISPATCH.get().map(Dispatch::get_sink)
81}
82
83pub fn shutdown_dispatch() {
84 G_DISPATCH.get().map(Dispatch::shutdown);
85}
86
87#[inline(always)]
88pub fn int_metric(metric_desc: &'static StaticMetricMetadata, value: u64) {
89 if let Some(d) = G_DISPATCH.get() {
90 d.int_metric(metric_desc, value);
91 }
92}
93
94#[inline(always)]
95pub fn float_metric(metric_desc: &'static StaticMetricMetadata, value: f64) {
96 if let Some(d) = G_DISPATCH.get() {
97 d.float_metric(metric_desc, value);
98 }
99}
100
101#[inline(always)]
102pub fn tagged_float_metric(
103 desc: &'static StaticMetricMetadata,
104 properties: &'static PropertySet,
105 value: f64,
106) {
107 if let Some(d) = G_DISPATCH.get() {
108 d.tagged_float_metric(desc, properties, value);
109 }
110}
111
112#[inline(always)]
113pub fn tagged_integer_metric(
114 desc: &'static StaticMetricMetadata,
115 properties: &'static PropertySet,
116 value: u64,
117) {
118 if let Some(d) = G_DISPATCH.get() {
119 d.tagged_integer_metric(desc, properties, value);
120 }
121}
122
123#[inline(always)]
124pub fn log(desc: &'static LogMetadata, args: fmt::Arguments<'_>) {
125 if let Some(d) = G_DISPATCH.get() {
126 d.log(desc, args);
127 }
128}
129
130#[inline(always)]
131pub fn log_tagged(
132 desc: &'static LogMetadata,
133 properties: &'static PropertySet,
134 args: fmt::Arguments<'_>,
135) {
136 if let Some(d) = G_DISPATCH.get() {
137 d.log_tagged(desc, properties, args);
138 }
139}
140
141#[inline(always)]
142pub fn log_interop(metadata: &LogMetadata, args: fmt::Arguments<'_>) {
143 if let Some(d) = G_DISPATCH.get() {
144 d.log_interop(metadata, args);
145 }
146}
147
148#[inline(always)]
149pub fn log_enabled(metadata: &LogMetadata) -> bool {
150 if let Some(d) = G_DISPATCH.get() {
151 d.log_enabled(metadata)
152 } else {
153 false
154 }
155}
156
157#[inline(always)]
158pub fn flush_log_buffer() {
159 if let Some(d) = G_DISPATCH.get() {
160 d.flush_log_buffer();
161 }
162}
163
164#[inline(always)]
165pub fn flush_metrics_buffer() {
166 if let Some(d) = G_DISPATCH.get() {
167 d.flush_metrics_buffer();
168 }
169}
170
171#[inline(always)]
174pub fn init_thread_stream() {
175 LOCAL_THREAD_STREAM.with(|cell| unsafe {
176 if (*cell.as_ptr()).is_some() {
177 return;
178 }
179 #[allow(static_mut_refs)]
180 if let Some(d) = G_DISPATCH.get() {
181 if !d.cpu_tracing_enabled {
183 return;
184 }
185 d.init_thread_stream(cell);
186 } else {
187 warn!("dispatch not initialized, cannot init thread stream, events will be lost for this thread");
188 }
189 });
190}
191
192pub fn for_each_thread_stream(fun: &mut dyn FnMut(*mut ThreadStream)) {
193 if let Some(d) = G_DISPATCH.get() {
194 d.for_each_thread_stream(fun);
195 }
196}
197
198#[inline(always)]
199pub fn flush_thread_buffer() {
200 LOCAL_THREAD_STREAM.with(|cell| unsafe {
201 let opt_stream = &mut *cell.as_ptr();
202 if let Some(stream) = opt_stream {
203 #[allow(static_mut_refs)]
204 match G_DISPATCH.get() {
205 Some(d) => {
206 d.flush_thread_buffer(stream);
207 }
208 None => {
209 panic!("threads are recording but there is no event dispatch");
210 }
211 }
212 }
213 });
214}
215
216#[inline(always)]
219pub fn unregister_thread_stream() {
220 LOCAL_THREAD_STREAM.with(|cell| unsafe {
221 let opt_stream = &mut *cell.as_ptr();
222 if let Some(stream) = opt_stream {
223 #[allow(static_mut_refs)]
224 match G_DISPATCH.get() {
225 Some(d) => {
226 d.flush_thread_buffer(stream);
228 d.unregister_thread_stream(stream);
230 *opt_stream = None;
232 }
233 None => {
234 *opt_stream = None;
236 }
237 }
238 }
239 });
240}
241
242#[inline(always)]
243pub fn on_begin_scope(scope: &'static SpanMetadata) {
244 on_thread_event(BeginThreadSpanEvent {
245 time: now(),
246 thread_span_desc: scope,
247 });
248}
249
250#[inline(always)]
251pub fn on_end_scope(scope: &'static SpanMetadata) {
252 on_thread_event(EndThreadSpanEvent {
253 time: now(),
254 thread_span_desc: scope,
255 });
256}
257
258#[inline(always)]
259pub fn on_begin_named_scope(thread_span_location: &'static SpanLocation, name: &'static str) {
260 on_thread_event(BeginThreadNamedSpanEvent {
261 thread_span_location,
262 name: name.into(),
263 time: now(),
264 });
265}
266
267#[inline(always)]
268pub fn on_end_named_scope(thread_span_location: &'static SpanLocation, name: &'static str) {
269 on_thread_event(EndThreadNamedSpanEvent {
270 thread_span_location,
271 name: name.into(),
272 time: now(),
273 });
274}
275
276#[inline(always)]
277pub fn on_begin_async_scope(scope: &'static SpanMetadata, parent_span_id: u64, depth: u32) -> u64 {
278 let id = G_ASYNC_SPAN_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
279 on_thread_event(BeginAsyncSpanEvent {
280 span_desc: scope,
281 span_id: id as u64,
282 parent_span_id,
283 depth,
284 time: now(),
285 });
286 id as u64
287}
288
289#[inline(always)]
290pub fn on_end_async_scope(
291 span_id: u64,
292 parent_span_id: u64,
293 scope: &'static SpanMetadata,
294 depth: u32,
295) {
296 on_thread_event(EndAsyncSpanEvent {
297 span_desc: scope,
298 span_id,
299 parent_span_id,
300 depth,
301 time: now(),
302 });
303}
304
305#[inline(always)]
306pub fn on_begin_async_named_scope(
307 span_location: &'static SpanLocation,
308 name: &'static str,
309 parent_span_id: u64,
310 depth: u32,
311) -> u64 {
312 let id = G_ASYNC_SPAN_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
313 on_thread_event(BeginAsyncNamedSpanEvent {
314 span_location,
315 name: name.into(),
316 span_id: id as u64,
317 parent_span_id,
318 depth,
319 time: now(),
320 });
321 id as u64
322}
323
324#[inline(always)]
325pub fn on_end_async_named_scope(
326 span_id: u64,
327 parent_span_id: u64,
328 span_location: &'static SpanLocation,
329 name: &'static str,
330 depth: u32,
331) {
332 on_thread_event(EndAsyncNamedSpanEvent {
333 span_location,
334 name: name.into(),
335 span_id,
336 parent_span_id,
337 depth,
338 time: now(),
339 });
340}
341
342pub struct DispatchCell {
343 inner: UnsafeCell<OnceCell<Dispatch>>,
345}
346
347impl DispatchCell {
348 const fn new() -> Self {
349 Self {
350 inner: UnsafeCell::new(OnceCell::new()),
351 }
352 }
353
354 fn get(&self) -> Option<&Dispatch> {
355 unsafe { (*self.inner.get()).get() }
356 }
357
358 unsafe fn take(&self) -> Option<Dispatch> {
359 unsafe { (*self.inner.get()).take() }
360 }
361}
362
363unsafe impl Sync for DispatchCell {}
365
366static G_DISPATCH: DispatchCell = DispatchCell::new();
367static G_ASYNC_SPAN_COUNTER: std::sync::atomic::AtomicUsize =
368 std::sync::atomic::AtomicUsize::new(1);
369
370pub unsafe fn force_uninit() {
374 unsafe {
375 G_DISPATCH.take();
376 }
377}
378
379thread_local! {
380 static LOCAL_THREAD_STREAM: Cell<Option<ThreadStream>> = const { Cell::new(None) };
381}
382
383#[inline(always)]
384fn on_thread_event<T>(event: T)
385where
386 T: micromegas_transit::InProcSerialize + ThreadEventQueueTypeIndex,
387{
388 LOCAL_THREAD_STREAM.with(|cell| unsafe {
389 let opt_stream = &mut *cell.as_ptr();
390 if let Some(stream) = opt_stream {
391 stream.get_events_mut().push(event);
392 if stream.is_full() {
393 flush_thread_buffer();
394 }
395 }
396 });
397}
398
399struct Dispatch {
400 process_id: uuid::Uuid,
401 logs_buffer_size: usize,
402 metrics_buffer_size: usize,
403 threads_buffer_size: usize,
404 cpu_tracing_enabled: bool,
405 log_stream: Mutex<LogStream>,
406 metrics_stream: Mutex<MetricsStream>,
407 thread_streams: Mutex<Vec<*mut ThreadStream>>, sink: RwLock<Arc<dyn EventSink>>,
409}
410
411impl Dispatch {
412 pub fn new(
413 logs_buffer_size: usize,
414 metrics_buffer_size: usize,
415 threads_buffer_size: usize,
416 sink: Arc<dyn EventSink>,
417 process_properties: HashMap<String, String>,
418 cpu_tracing_enabled: bool,
419 ) -> Self {
420 let process_id = uuid::Uuid::new_v4();
421 let obj = Self {
422 process_id,
423 logs_buffer_size,
424 metrics_buffer_size,
425 threads_buffer_size,
426 cpu_tracing_enabled,
427 log_stream: Mutex::new(LogStream::new(
428 logs_buffer_size,
429 process_id,
430 &[String::from("log")],
431 HashMap::new(),
432 )),
433 metrics_stream: Mutex::new(MetricsStream::new(
434 metrics_buffer_size,
435 process_id,
436 &[String::from("metrics")],
437 HashMap::new(),
438 )),
439 thread_streams: Mutex::new(vec![]),
440 sink: RwLock::new(sink),
441 };
442 obj.startup(process_properties);
443 obj.init_log_stream();
444 obj.init_metrics_stream();
445 obj
446 }
447
448 pub fn get_process_id(&self) -> uuid::Uuid {
449 self.process_id
450 }
451
452 pub fn get_cpu_tracing_enabled(&self) -> bool {
453 self.cpu_tracing_enabled
454 }
455
456 pub fn get_sink(&self) -> Arc<dyn EventSink> {
457 if let Ok(guard) = self.sink.try_read() {
458 (*guard).clone()
459 } else {
460 Arc::new(NullEventSink {})
461 }
462 }
463
464 fn shutdown(&self) {
465 let old_sink = self.get_sink();
466 let null_sink = Arc::new(NullEventSink {});
467 if let Ok(mut guard) = self.sink.write() {
468 *guard = null_sink;
469 drop(guard)
470 }
471 old_sink.on_shutdown();
472 }
473
474 fn startup(&self, process_properties: HashMap<String, String>) {
475 let mut parent_process = None;
476
477 if let Ok(parent_process_guid) = std::env::var("MICROMEGAS_TELEMETRY_PARENT_PROCESS")
478 && let Ok(parent_process_id) = uuid::Uuid::try_parse(&parent_process_guid)
479 {
480 parent_process = Some(parent_process_id);
481 }
482
483 unsafe {
484 std::env::set_var(
485 "MICROMEGAS_TELEMETRY_PARENT_PROCESS",
486 self.process_id.to_string(),
487 );
488 }
489
490 let process_info = Arc::new(make_process_info(
491 self.process_id,
492 parent_process,
493 process_properties,
494 ));
495
496 self.get_sink().on_startup(process_info);
497 }
498
499 fn init_log_stream(&self) {
500 let log_stream = self.log_stream.lock().unwrap();
501 self.get_sink().on_init_log_stream(&log_stream);
502 }
503
504 fn init_metrics_stream(&self) {
505 let metrics_stream = self.metrics_stream.lock().unwrap();
506 self.get_sink().on_init_metrics_stream(&metrics_stream);
507 }
508
509 fn init_thread_stream(&self, cell: &Cell<Option<ThreadStream>>) {
510 if !self.cpu_tracing_enabled {
512 return;
513 }
514
515 let mut properties = HashMap::new();
516 properties.insert(String::from("thread-id"), thread_id::get().to_string());
517 if let Some(name) = std::thread::current().name() {
518 properties.insert("thread-name".to_owned(), name.to_owned());
519 }
520 let thread_stream = ThreadStream::new(
521 self.threads_buffer_size,
522 self.process_id,
523 &["cpu".to_owned()],
524 properties,
525 );
526 unsafe {
527 let opt_ref = &mut *cell.as_ptr();
528 self.get_sink().on_init_thread_stream(&thread_stream);
529 *opt_ref = Some(thread_stream);
530 let mut vec_guard = self.thread_streams.lock().unwrap();
531 vec_guard.push(opt_ref.as_mut().unwrap());
532 }
533 }
534
535 fn for_each_thread_stream(&self, fun: &mut dyn FnMut(*mut ThreadStream)) {
536 let mut vec_guard = self.thread_streams.lock().unwrap();
537 for stream in &mut *vec_guard {
538 fun(*stream);
539 }
540 }
541
542 fn unregister_thread_stream(&self, stream_to_remove: &mut ThreadStream) {
543 let mut vec_guard = self.thread_streams.lock().unwrap();
544 let stream_ptr = stream_to_remove as *mut ThreadStream;
545
546 if let Some(pos) = vec_guard.iter().position(|&ptr| ptr == stream_ptr) {
548 vec_guard.remove(pos);
549 }
550 }
551
552 #[inline]
553 fn int_metric(&self, desc: &'static StaticMetricMetadata, value: u64) {
554 let time = now();
555 let mut metrics_stream = self.metrics_stream.lock().unwrap();
556 metrics_stream
557 .get_events_mut()
558 .push(IntegerMetricEvent { desc, value, time });
559 if metrics_stream.is_full() {
560 drop(metrics_stream);
562 self.flush_metrics_buffer();
563 }
564 }
565
566 #[inline]
567 fn float_metric(&self, desc: &'static StaticMetricMetadata, value: f64) {
568 let time = now();
569 let mut metrics_stream = self.metrics_stream.lock().unwrap();
570 metrics_stream
571 .get_events_mut()
572 .push(FloatMetricEvent { desc, value, time });
573 if metrics_stream.is_full() {
574 drop(metrics_stream);
575 self.flush_metrics_buffer();
577 }
578 }
579
580 #[inline]
581 fn tagged_float_metric(
582 &self,
583 desc: &'static StaticMetricMetadata,
584 properties: &'static PropertySet,
585 value: f64,
586 ) {
587 let time = now();
588 let mut metrics_stream = self.metrics_stream.lock().unwrap();
589 metrics_stream
590 .get_events_mut()
591 .push(TaggedFloatMetricEvent {
592 desc,
593 properties,
594 value,
595 time,
596 });
597 if metrics_stream.is_full() {
598 drop(metrics_stream);
599 self.flush_metrics_buffer();
601 }
602 }
603
604 #[inline]
605 fn tagged_integer_metric(
606 &self,
607 desc: &'static StaticMetricMetadata,
608 properties: &'static PropertySet,
609 value: u64,
610 ) {
611 let time = now();
612 let mut metrics_stream = self.metrics_stream.lock().unwrap();
613 metrics_stream
614 .get_events_mut()
615 .push(TaggedIntegerMetricEvent {
616 desc,
617 properties,
618 value,
619 time,
620 });
621 if metrics_stream.is_full() {
622 drop(metrics_stream);
623 self.flush_metrics_buffer();
625 }
626 }
627
628 #[inline]
629 fn flush_metrics_buffer(&self) {
630 let mut metrics_stream = self.metrics_stream.lock().unwrap();
631 if metrics_stream.is_empty() {
632 return;
633 }
634 let stream_id = metrics_stream.stream_id();
635 let next_offset = metrics_stream.get_block_ref().object_offset()
636 + metrics_stream.get_block_ref().nb_objects();
637 let mut old_event_block = metrics_stream.replace_block(Arc::new(MetricsBlock::new(
638 self.metrics_buffer_size,
639 self.process_id,
640 stream_id,
641 next_offset,
642 )));
643 assert!(!metrics_stream.is_full());
644 Arc::get_mut(&mut old_event_block).unwrap().close();
645 self.get_sink().on_process_metrics_block(old_event_block);
646 }
647
648 fn log_enabled(&self, metadata: &LogMetadata) -> bool {
649 self.get_sink().on_log_enabled(metadata)
650 }
651
652 #[inline]
653 fn log(&self, metadata: &'static LogMetadata, args: fmt::Arguments<'_>) {
654 if !self.log_enabled(metadata) {
655 return;
656 }
657 let time = now();
658 self.get_sink().on_log(metadata, &[], time, args);
659 let mut log_stream = self.log_stream.lock().unwrap();
660 if args.as_str().is_some() {
661 log_stream.get_events_mut().push(LogStaticStrEvent {
662 desc: metadata,
663 time,
664 });
665 } else {
666 log_stream.get_events_mut().push(LogStringEvent {
667 desc: metadata,
668 time,
669 msg: micromegas_transit::DynString(args.to_string()),
670 });
671 }
672 if log_stream.is_full() {
673 drop(log_stream);
675 self.flush_log_buffer();
676 }
677 }
678
679 #[inline]
680 fn log_tagged(
681 &self,
682 desc: &'static LogMetadata,
683 properties: &'static PropertySet,
684 args: fmt::Arguments<'_>,
685 ) {
686 if !self.log_enabled(desc) {
687 return;
688 }
689 let time = now();
690 self.get_sink()
691 .on_log(desc, properties.get_properties(), time, args);
692 let mut log_stream = self.log_stream.lock().unwrap();
693 log_stream.get_events_mut().push(TaggedLogString {
694 desc,
695 properties,
696 time,
697 msg: micromegas_transit::DynString(args.to_string()),
698 });
699 if log_stream.is_full() {
700 drop(log_stream);
702 self.flush_log_buffer();
703 }
704 }
705
706 #[inline]
707 fn log_interop(&self, desc: &LogMetadata, args: fmt::Arguments<'_>) {
708 let time = now();
709 self.get_sink().on_log(desc, &[], time, args);
710 let mut log_stream = self.log_stream.lock().unwrap();
711 if let Some(msg) = args.as_str() {
712 log_stream.get_events_mut().push(LogStaticStrInteropEvent {
713 time,
714 level: desc.level as u32,
715 target: intern_string(desc.target).into(),
716 msg: msg.into(),
717 });
718 } else {
719 log_stream.get_events_mut().push(LogStringInteropEvent {
720 time,
721 level: desc.level as u8,
722 target: intern_string(desc.target).into(),
723 msg: micromegas_transit::DynString(args.to_string()),
724 });
725 }
726 if log_stream.is_full() {
727 drop(log_stream);
729 self.flush_log_buffer();
730 }
731 }
732
733 #[inline]
734 fn flush_log_buffer(&self) {
735 let mut log_stream = self.log_stream.lock().unwrap();
736 if log_stream.is_empty() {
737 return;
738 }
739 let stream_id = log_stream.stream_id();
740 let next_offset =
741 log_stream.get_block_ref().object_offset() + log_stream.get_block_ref().nb_objects();
742 let mut old_event_block = log_stream.replace_block(Arc::new(LogBlock::new(
743 self.logs_buffer_size,
744 self.process_id,
745 stream_id,
746 next_offset,
747 )));
748 assert!(!log_stream.is_full());
749 Arc::get_mut(&mut old_event_block).unwrap().close();
750 self.get_sink().on_process_log_block(old_event_block);
751 }
752
753 #[inline]
754 fn flush_thread_buffer(&self, stream: &mut ThreadStream) {
755 if stream.is_empty() {
756 return;
757 }
758 let next_offset =
759 stream.get_block_ref().object_offset() + stream.get_block_ref().nb_objects();
760 let mut old_block = stream.replace_block(Arc::new(ThreadBlock::new(
761 self.threads_buffer_size,
762 self.process_id,
763 stream.stream_id(),
764 next_offset,
765 )));
766 assert!(!stream.is_full());
767 Arc::get_mut(&mut old_block).unwrap().close();
768 self.get_sink().on_process_thread_block(old_block);
769 }
770}
771
772fn get_cpu_brand() -> String {
773 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
774 return raw_cpuid::CpuId::new()
775 .get_processor_brand_string()
776 .map_or_else(|| "unknown".to_owned(), |b| b.as_str().to_owned());
777 #[cfg(target_arch = "aarch64")]
778 return String::from("aarch64");
779}
780
781pub fn make_process_info(
782 process_id: uuid::Uuid,
783 parent_process_id: Option<uuid::Uuid>,
784 properties: HashMap<String, String>,
785) -> ProcessInfo {
786 let start_ticks = now();
787 let start_time = Utc::now();
788 let cpu_brand = get_cpu_brand();
789 ProcessInfo {
790 process_id,
791 username: whoami::username(),
792 realname: whoami::realname(),
793 exe: std::env::current_exe()
794 .unwrap_or_default()
795 .to_string_lossy()
796 .into_owned(),
797 computer: whoami::devicename(),
798 distro: whoami::distro(),
799 cpu_brand,
800 tsc_frequency: frequency(),
801 start_time,
802 start_ticks,
803 parent_process_id,
804 properties,
805 }
806}