micromegas_tracing/
dispatch.rs

1//! Where events are recorded and eventually sent to a sink
2pub use crate::errors::{Error, Result};
3use crate::intern_string::intern_string;
4use crate::logs::TaggedLogString;
5use crate::metrics::{TaggedFloatMetricEvent, TaggedIntegerMetricEvent};
6use crate::prelude::*;
7use crate::property_set::PropertySet;
8use crate::{
9    event::{EventSink, NullEventSink, TracingBlock},
10    info,
11    logs::{
12        LogBlock, LogMetadata, LogStaticStrEvent, LogStaticStrInteropEvent, LogStream,
13        LogStringEvent, LogStringInteropEvent,
14    },
15    metrics::{
16        FloatMetricEvent, IntegerMetricEvent, MetricsBlock, MetricsStream, StaticMetricMetadata,
17    },
18    spans::{
19        BeginAsyncNamedSpanEvent, BeginAsyncSpanEvent, BeginThreadNamedSpanEvent,
20        BeginThreadSpanEvent, EndAsyncNamedSpanEvent, EndAsyncSpanEvent, EndThreadNamedSpanEvent,
21        EndThreadSpanEvent, SpanLocation, SpanMetadata, ThreadBlock, ThreadEventQueueTypeIndex,
22        ThreadStream,
23    },
24    warn,
25};
26use chrono::Utc;
27use std::cell::OnceCell;
28use std::cell::UnsafeCell;
29use std::collections::HashMap;
30use std::fmt;
31use std::sync::RwLock;
32use std::{
33    cell::Cell,
34    sync::{Arc, Mutex},
35};
36
37pub fn init_event_dispatch(
38    logs_buffer_size: usize,
39    metrics_buffer_size: usize,
40    threads_buffer_size: usize,
41    sink: Arc<dyn EventSink>,
42    process_properties: HashMap<String, String>,
43    cpu_tracing_enabled: bool,
44) -> Result<()> {
45    lazy_static::lazy_static! {
46        static ref INIT_MUTEX: Mutex<()> = Mutex::new(());
47    }
48    let _guard = INIT_MUTEX.lock().unwrap();
49    let dispatch_ref = &G_DISPATCH.inner;
50    unsafe {
51        if (*dispatch_ref.get()).get().is_none() {
52            (*dispatch_ref.get())
53                .set(Dispatch::new(
54                    logs_buffer_size,
55                    metrics_buffer_size,
56                    threads_buffer_size,
57                    sink,
58                    process_properties,
59                    cpu_tracing_enabled,
60                ))
61                .map_err(|_| Error::AlreadyInitialized())
62        } else {
63            info!("event dispatch already initialized");
64            Err(Error::AlreadyInitialized())
65        }
66    }
67}
68
69#[inline]
70pub fn process_id() -> Option<uuid::Uuid> {
71    G_DISPATCH.get().map(Dispatch::get_process_id)
72}
73
74#[inline]
75pub fn cpu_tracing_enabled() -> Option<bool> {
76    G_DISPATCH.get().map(Dispatch::get_cpu_tracing_enabled)
77}
78
79pub fn get_sink() -> Option<Arc<dyn EventSink>> {
80    G_DISPATCH.get().map(Dispatch::get_sink)
81}
82
83pub fn shutdown_dispatch() {
84    G_DISPATCH.get().map(Dispatch::shutdown);
85}
86
87#[inline(always)]
88pub fn int_metric(metric_desc: &'static StaticMetricMetadata, value: u64) {
89    if let Some(d) = G_DISPATCH.get() {
90        d.int_metric(metric_desc, value);
91    }
92}
93
94#[inline(always)]
95pub fn float_metric(metric_desc: &'static StaticMetricMetadata, value: f64) {
96    if let Some(d) = G_DISPATCH.get() {
97        d.float_metric(metric_desc, value);
98    }
99}
100
101#[inline(always)]
102pub fn tagged_float_metric(
103    desc: &'static StaticMetricMetadata,
104    properties: &'static PropertySet,
105    value: f64,
106) {
107    if let Some(d) = G_DISPATCH.get() {
108        d.tagged_float_metric(desc, properties, value);
109    }
110}
111
112#[inline(always)]
113pub fn tagged_integer_metric(
114    desc: &'static StaticMetricMetadata,
115    properties: &'static PropertySet,
116    value: u64,
117) {
118    if let Some(d) = G_DISPATCH.get() {
119        d.tagged_integer_metric(desc, properties, value);
120    }
121}
122
123#[inline(always)]
124pub fn log(desc: &'static LogMetadata, args: fmt::Arguments<'_>) {
125    if let Some(d) = G_DISPATCH.get() {
126        d.log(desc, args);
127    }
128}
129
130#[inline(always)]
131pub fn log_tagged(
132    desc: &'static LogMetadata,
133    properties: &'static PropertySet,
134    args: fmt::Arguments<'_>,
135) {
136    if let Some(d) = G_DISPATCH.get() {
137        d.log_tagged(desc, properties, args);
138    }
139}
140
141#[inline(always)]
142pub fn log_interop(metadata: &LogMetadata, args: fmt::Arguments<'_>) {
143    if let Some(d) = G_DISPATCH.get() {
144        d.log_interop(metadata, args);
145    }
146}
147
148#[inline(always)]
149pub fn log_enabled(metadata: &LogMetadata) -> bool {
150    if let Some(d) = G_DISPATCH.get() {
151        d.log_enabled(metadata)
152    } else {
153        false
154    }
155}
156
157#[inline(always)]
158pub fn flush_log_buffer() {
159    if let Some(d) = G_DISPATCH.get() {
160        d.flush_log_buffer();
161    }
162}
163
164#[inline(always)]
165pub fn flush_metrics_buffer() {
166    if let Some(d) = G_DISPATCH.get() {
167        d.flush_metrics_buffer();
168    }
169}
170
171//todo: should be implicit by default but limit the maximum number of tracked
172// threads
173#[inline(always)]
174pub fn init_thread_stream() {
175    LOCAL_THREAD_STREAM.with(|cell| unsafe {
176        if (*cell.as_ptr()).is_some() {
177            return;
178        }
179        #[allow(static_mut_refs)]
180        if let Some(d) = G_DISPATCH.get() {
181            // Check if CPU tracing is enabled before creating thread stream
182            if !d.cpu_tracing_enabled {
183                return;
184            }
185            d.init_thread_stream(cell);
186        } else {
187            warn!("dispatch not initialized, cannot init thread stream, events will be lost for this thread");
188        }
189    });
190}
191
192pub fn for_each_thread_stream(fun: &mut dyn FnMut(*mut ThreadStream)) {
193    if let Some(d) = G_DISPATCH.get() {
194        d.for_each_thread_stream(fun);
195    }
196}
197
198#[inline(always)]
199pub fn flush_thread_buffer() {
200    LOCAL_THREAD_STREAM.with(|cell| unsafe {
201        let opt_stream = &mut *cell.as_ptr();
202        if let Some(stream) = opt_stream {
203            #[allow(static_mut_refs)]
204            match G_DISPATCH.get() {
205                Some(d) => {
206                    d.flush_thread_buffer(stream);
207                }
208                None => {
209                    panic!("threads are recording but there is no event dispatch");
210                }
211            }
212        }
213    });
214}
215
216/// Unregisters the current thread's stream from the global dispatch to prevent
217/// dangling pointers when the thread is destroyed.
218#[inline(always)]
219pub fn unregister_thread_stream() {
220    LOCAL_THREAD_STREAM.with(|cell| unsafe {
221        let opt_stream = &mut *cell.as_ptr();
222        if let Some(stream) = opt_stream {
223            #[allow(static_mut_refs)]
224            match G_DISPATCH.get() {
225                Some(d) => {
226                    // Flush any remaining events before unregistering
227                    d.flush_thread_buffer(stream);
228                    // Unregister the thread stream
229                    d.unregister_thread_stream(stream);
230                    // Clear the local thread stream
231                    *opt_stream = None;
232                }
233                None => {
234                    // If dispatch is already shut down, just clear the local stream
235                    *opt_stream = None;
236                }
237            }
238        }
239    });
240}
241
242#[inline(always)]
243pub fn on_begin_scope(scope: &'static SpanMetadata) {
244    on_thread_event(BeginThreadSpanEvent {
245        time: now(),
246        thread_span_desc: scope,
247    });
248}
249
250#[inline(always)]
251pub fn on_end_scope(scope: &'static SpanMetadata) {
252    on_thread_event(EndThreadSpanEvent {
253        time: now(),
254        thread_span_desc: scope,
255    });
256}
257
258#[inline(always)]
259pub fn on_begin_named_scope(thread_span_location: &'static SpanLocation, name: &'static str) {
260    on_thread_event(BeginThreadNamedSpanEvent {
261        thread_span_location,
262        name: name.into(),
263        time: now(),
264    });
265}
266
267#[inline(always)]
268pub fn on_end_named_scope(thread_span_location: &'static SpanLocation, name: &'static str) {
269    on_thread_event(EndThreadNamedSpanEvent {
270        thread_span_location,
271        name: name.into(),
272        time: now(),
273    });
274}
275
276#[inline(always)]
277pub fn on_begin_async_scope(scope: &'static SpanMetadata, parent_span_id: u64, depth: u32) -> u64 {
278    let id = G_ASYNC_SPAN_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
279    on_thread_event(BeginAsyncSpanEvent {
280        span_desc: scope,
281        span_id: id as u64,
282        parent_span_id,
283        depth,
284        time: now(),
285    });
286    id as u64
287}
288
289#[inline(always)]
290pub fn on_end_async_scope(
291    span_id: u64,
292    parent_span_id: u64,
293    scope: &'static SpanMetadata,
294    depth: u32,
295) {
296    on_thread_event(EndAsyncSpanEvent {
297        span_desc: scope,
298        span_id,
299        parent_span_id,
300        depth,
301        time: now(),
302    });
303}
304
305#[inline(always)]
306pub fn on_begin_async_named_scope(
307    span_location: &'static SpanLocation,
308    name: &'static str,
309    parent_span_id: u64,
310    depth: u32,
311) -> u64 {
312    let id = G_ASYNC_SPAN_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
313    on_thread_event(BeginAsyncNamedSpanEvent {
314        span_location,
315        name: name.into(),
316        span_id: id as u64,
317        parent_span_id,
318        depth,
319        time: now(),
320    });
321    id as u64
322}
323
324#[inline(always)]
325pub fn on_end_async_named_scope(
326    span_id: u64,
327    parent_span_id: u64,
328    span_location: &'static SpanLocation,
329    name: &'static str,
330    depth: u32,
331) {
332    on_thread_event(EndAsyncNamedSpanEvent {
333        span_location,
334        name: name.into(),
335        span_id,
336        parent_span_id,
337        depth,
338        time: now(),
339    });
340}
341
342pub struct DispatchCell {
343    // unsafecell is only necessary for force_uninit()
344    inner: UnsafeCell<OnceCell<Dispatch>>,
345}
346
347impl DispatchCell {
348    const fn new() -> Self {
349        Self {
350            inner: UnsafeCell::new(OnceCell::new()),
351        }
352    }
353
354    fn get(&self) -> Option<&Dispatch> {
355        unsafe { (*self.inner.get()).get() }
356    }
357
358    unsafe fn take(&self) -> Option<Dispatch> {
359        unsafe { (*self.inner.get()).take() }
360    }
361}
362
363// very unsafe indeed - we don't want to pay for locking every time we need to record an event
364unsafe impl Sync for DispatchCell {}
365
366static G_DISPATCH: DispatchCell = DispatchCell::new();
367static G_ASYNC_SPAN_COUNTER: std::sync::atomic::AtomicUsize =
368    std::sync::atomic::AtomicUsize::new(1);
369
370/// # Safety
371/// very unsafe! make sure there is no thread running that could be sending events
372/// normally that global state should not be destroyed
373pub unsafe fn force_uninit() {
374    unsafe {
375        G_DISPATCH.take();
376    }
377}
378
379thread_local! {
380    static LOCAL_THREAD_STREAM: Cell<Option<ThreadStream>> = const { Cell::new(None) };
381}
382
383#[inline(always)]
384fn on_thread_event<T>(event: T)
385where
386    T: micromegas_transit::InProcSerialize + ThreadEventQueueTypeIndex,
387{
388    LOCAL_THREAD_STREAM.with(|cell| unsafe {
389        let opt_stream = &mut *cell.as_ptr();
390        if let Some(stream) = opt_stream {
391            stream.get_events_mut().push(event);
392            if stream.is_full() {
393                flush_thread_buffer();
394            }
395        }
396    });
397}
398
399struct Dispatch {
400    process_id: uuid::Uuid,
401    logs_buffer_size: usize,
402    metrics_buffer_size: usize,
403    threads_buffer_size: usize,
404    cpu_tracing_enabled: bool,
405    log_stream: Mutex<LogStream>,
406    metrics_stream: Mutex<MetricsStream>,
407    thread_streams: Mutex<Vec<*mut ThreadStream>>, // very very unsafe - threads would need to be unregistered before they are destroyed
408    sink: RwLock<Arc<dyn EventSink>>,
409}
410
411impl Dispatch {
412    pub fn new(
413        logs_buffer_size: usize,
414        metrics_buffer_size: usize,
415        threads_buffer_size: usize,
416        sink: Arc<dyn EventSink>,
417        process_properties: HashMap<String, String>,
418        cpu_tracing_enabled: bool,
419    ) -> Self {
420        let process_id = uuid::Uuid::new_v4();
421        let obj = Self {
422            process_id,
423            logs_buffer_size,
424            metrics_buffer_size,
425            threads_buffer_size,
426            cpu_tracing_enabled,
427            log_stream: Mutex::new(LogStream::new(
428                logs_buffer_size,
429                process_id,
430                &[String::from("log")],
431                HashMap::new(),
432            )),
433            metrics_stream: Mutex::new(MetricsStream::new(
434                metrics_buffer_size,
435                process_id,
436                &[String::from("metrics")],
437                HashMap::new(),
438            )),
439            thread_streams: Mutex::new(vec![]),
440            sink: RwLock::new(sink),
441        };
442        obj.startup(process_properties);
443        obj.init_log_stream();
444        obj.init_metrics_stream();
445        obj
446    }
447
448    pub fn get_process_id(&self) -> uuid::Uuid {
449        self.process_id
450    }
451
452    pub fn get_cpu_tracing_enabled(&self) -> bool {
453        self.cpu_tracing_enabled
454    }
455
456    pub fn get_sink(&self) -> Arc<dyn EventSink> {
457        if let Ok(guard) = self.sink.try_read() {
458            (*guard).clone()
459        } else {
460            Arc::new(NullEventSink {})
461        }
462    }
463
464    fn shutdown(&self) {
465        let old_sink = self.get_sink();
466        let null_sink = Arc::new(NullEventSink {});
467        if let Ok(mut guard) = self.sink.write() {
468            *guard = null_sink;
469            drop(guard)
470        }
471        old_sink.on_shutdown();
472    }
473
474    fn startup(&self, process_properties: HashMap<String, String>) {
475        let mut parent_process = None;
476
477        if let Ok(parent_process_guid) = std::env::var("MICROMEGAS_TELEMETRY_PARENT_PROCESS")
478            && let Ok(parent_process_id) = uuid::Uuid::try_parse(&parent_process_guid)
479        {
480            parent_process = Some(parent_process_id);
481        }
482
483        unsafe {
484            std::env::set_var(
485                "MICROMEGAS_TELEMETRY_PARENT_PROCESS",
486                self.process_id.to_string(),
487            );
488        }
489
490        let process_info = Arc::new(make_process_info(
491            self.process_id,
492            parent_process,
493            process_properties,
494        ));
495
496        self.get_sink().on_startup(process_info);
497    }
498
499    fn init_log_stream(&self) {
500        let log_stream = self.log_stream.lock().unwrap();
501        self.get_sink().on_init_log_stream(&log_stream);
502    }
503
504    fn init_metrics_stream(&self) {
505        let metrics_stream = self.metrics_stream.lock().unwrap();
506        self.get_sink().on_init_metrics_stream(&metrics_stream);
507    }
508
509    fn init_thread_stream(&self, cell: &Cell<Option<ThreadStream>>) {
510        // Early return if CPU tracing is disabled
511        if !self.cpu_tracing_enabled {
512            return;
513        }
514
515        let mut properties = HashMap::new();
516        properties.insert(String::from("thread-id"), thread_id::get().to_string());
517        if let Some(name) = std::thread::current().name() {
518            properties.insert("thread-name".to_owned(), name.to_owned());
519        }
520        let thread_stream = ThreadStream::new(
521            self.threads_buffer_size,
522            self.process_id,
523            &["cpu".to_owned()],
524            properties,
525        );
526        unsafe {
527            let opt_ref = &mut *cell.as_ptr();
528            self.get_sink().on_init_thread_stream(&thread_stream);
529            *opt_ref = Some(thread_stream);
530            let mut vec_guard = self.thread_streams.lock().unwrap();
531            vec_guard.push(opt_ref.as_mut().unwrap());
532        }
533    }
534
535    fn for_each_thread_stream(&self, fun: &mut dyn FnMut(*mut ThreadStream)) {
536        let mut vec_guard = self.thread_streams.lock().unwrap();
537        for stream in &mut *vec_guard {
538            fun(*stream);
539        }
540    }
541
542    fn unregister_thread_stream(&self, stream_to_remove: &mut ThreadStream) {
543        let mut vec_guard = self.thread_streams.lock().unwrap();
544        let stream_ptr = stream_to_remove as *mut ThreadStream;
545
546        // Find and remove the thread stream pointer from the vector
547        if let Some(pos) = vec_guard.iter().position(|&ptr| ptr == stream_ptr) {
548            vec_guard.remove(pos);
549        }
550    }
551
552    #[inline]
553    fn int_metric(&self, desc: &'static StaticMetricMetadata, value: u64) {
554        let time = now();
555        let mut metrics_stream = self.metrics_stream.lock().unwrap();
556        metrics_stream
557            .get_events_mut()
558            .push(IntegerMetricEvent { desc, value, time });
559        if metrics_stream.is_full() {
560            // Release the lock before calling flush_metrics_buffer
561            drop(metrics_stream);
562            self.flush_metrics_buffer();
563        }
564    }
565
566    #[inline]
567    fn float_metric(&self, desc: &'static StaticMetricMetadata, value: f64) {
568        let time = now();
569        let mut metrics_stream = self.metrics_stream.lock().unwrap();
570        metrics_stream
571            .get_events_mut()
572            .push(FloatMetricEvent { desc, value, time });
573        if metrics_stream.is_full() {
574            drop(metrics_stream);
575            // Release the lock before calling flush_metrics_buffer
576            self.flush_metrics_buffer();
577        }
578    }
579
580    #[inline]
581    fn tagged_float_metric(
582        &self,
583        desc: &'static StaticMetricMetadata,
584        properties: &'static PropertySet,
585        value: f64,
586    ) {
587        let time = now();
588        let mut metrics_stream = self.metrics_stream.lock().unwrap();
589        metrics_stream
590            .get_events_mut()
591            .push(TaggedFloatMetricEvent {
592                desc,
593                properties,
594                value,
595                time,
596            });
597        if metrics_stream.is_full() {
598            drop(metrics_stream);
599            // Release the lock before calling flush_metrics_buffer
600            self.flush_metrics_buffer();
601        }
602    }
603
604    #[inline]
605    fn tagged_integer_metric(
606        &self,
607        desc: &'static StaticMetricMetadata,
608        properties: &'static PropertySet,
609        value: u64,
610    ) {
611        let time = now();
612        let mut metrics_stream = self.metrics_stream.lock().unwrap();
613        metrics_stream
614            .get_events_mut()
615            .push(TaggedIntegerMetricEvent {
616                desc,
617                properties,
618                value,
619                time,
620            });
621        if metrics_stream.is_full() {
622            drop(metrics_stream);
623            // Release the lock before calling flush_metrics_buffer
624            self.flush_metrics_buffer();
625        }
626    }
627
628    #[inline]
629    fn flush_metrics_buffer(&self) {
630        let mut metrics_stream = self.metrics_stream.lock().unwrap();
631        if metrics_stream.is_empty() {
632            return;
633        }
634        let stream_id = metrics_stream.stream_id();
635        let next_offset = metrics_stream.get_block_ref().object_offset()
636            + metrics_stream.get_block_ref().nb_objects();
637        let mut old_event_block = metrics_stream.replace_block(Arc::new(MetricsBlock::new(
638            self.metrics_buffer_size,
639            self.process_id,
640            stream_id,
641            next_offset,
642        )));
643        assert!(!metrics_stream.is_full());
644        Arc::get_mut(&mut old_event_block).unwrap().close();
645        self.get_sink().on_process_metrics_block(old_event_block);
646    }
647
648    fn log_enabled(&self, metadata: &LogMetadata) -> bool {
649        self.get_sink().on_log_enabled(metadata)
650    }
651
652    #[inline]
653    fn log(&self, metadata: &'static LogMetadata, args: fmt::Arguments<'_>) {
654        if !self.log_enabled(metadata) {
655            return;
656        }
657        let time = now();
658        self.get_sink().on_log(metadata, &[], time, args);
659        let mut log_stream = self.log_stream.lock().unwrap();
660        if args.as_str().is_some() {
661            log_stream.get_events_mut().push(LogStaticStrEvent {
662                desc: metadata,
663                time,
664            });
665        } else {
666            log_stream.get_events_mut().push(LogStringEvent {
667                desc: metadata,
668                time,
669                msg: micromegas_transit::DynString(args.to_string()),
670            });
671        }
672        if log_stream.is_full() {
673            // Release the lock before calling flush_log_buffer
674            drop(log_stream);
675            self.flush_log_buffer();
676        }
677    }
678
679    #[inline]
680    fn log_tagged(
681        &self,
682        desc: &'static LogMetadata,
683        properties: &'static PropertySet,
684        args: fmt::Arguments<'_>,
685    ) {
686        if !self.log_enabled(desc) {
687            return;
688        }
689        let time = now();
690        self.get_sink()
691            .on_log(desc, properties.get_properties(), time, args);
692        let mut log_stream = self.log_stream.lock().unwrap();
693        log_stream.get_events_mut().push(TaggedLogString {
694            desc,
695            properties,
696            time,
697            msg: micromegas_transit::DynString(args.to_string()),
698        });
699        if log_stream.is_full() {
700            // Release the lock before calling flush_log_buffer
701            drop(log_stream);
702            self.flush_log_buffer();
703        }
704    }
705
706    #[inline]
707    fn log_interop(&self, desc: &LogMetadata, args: fmt::Arguments<'_>) {
708        let time = now();
709        self.get_sink().on_log(desc, &[], time, args);
710        let mut log_stream = self.log_stream.lock().unwrap();
711        if let Some(msg) = args.as_str() {
712            log_stream.get_events_mut().push(LogStaticStrInteropEvent {
713                time,
714                level: desc.level as u32,
715                target: intern_string(desc.target).into(),
716                msg: msg.into(),
717            });
718        } else {
719            log_stream.get_events_mut().push(LogStringInteropEvent {
720                time,
721                level: desc.level as u8,
722                target: intern_string(desc.target).into(),
723                msg: micromegas_transit::DynString(args.to_string()),
724            });
725        }
726        if log_stream.is_full() {
727            // Release the lock before calling flush_log_buffer
728            drop(log_stream);
729            self.flush_log_buffer();
730        }
731    }
732
733    #[inline]
734    fn flush_log_buffer(&self) {
735        let mut log_stream = self.log_stream.lock().unwrap();
736        if log_stream.is_empty() {
737            return;
738        }
739        let stream_id = log_stream.stream_id();
740        let next_offset =
741            log_stream.get_block_ref().object_offset() + log_stream.get_block_ref().nb_objects();
742        let mut old_event_block = log_stream.replace_block(Arc::new(LogBlock::new(
743            self.logs_buffer_size,
744            self.process_id,
745            stream_id,
746            next_offset,
747        )));
748        assert!(!log_stream.is_full());
749        Arc::get_mut(&mut old_event_block).unwrap().close();
750        self.get_sink().on_process_log_block(old_event_block);
751    }
752
753    #[inline]
754    fn flush_thread_buffer(&self, stream: &mut ThreadStream) {
755        if stream.is_empty() {
756            return;
757        }
758        let next_offset =
759            stream.get_block_ref().object_offset() + stream.get_block_ref().nb_objects();
760        let mut old_block = stream.replace_block(Arc::new(ThreadBlock::new(
761            self.threads_buffer_size,
762            self.process_id,
763            stream.stream_id(),
764            next_offset,
765        )));
766        assert!(!stream.is_full());
767        Arc::get_mut(&mut old_block).unwrap().close();
768        self.get_sink().on_process_thread_block(old_block);
769    }
770}
771
772fn get_cpu_brand() -> String {
773    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
774    return raw_cpuid::CpuId::new()
775        .get_processor_brand_string()
776        .map_or_else(|| "unknown".to_owned(), |b| b.as_str().to_owned());
777    #[cfg(target_arch = "aarch64")]
778    return String::from("aarch64");
779}
780
781pub fn make_process_info(
782    process_id: uuid::Uuid,
783    parent_process_id: Option<uuid::Uuid>,
784    properties: HashMap<String, String>,
785) -> ProcessInfo {
786    let start_ticks = now();
787    let start_time = Utc::now();
788    let cpu_brand = get_cpu_brand();
789    ProcessInfo {
790        process_id,
791        username: whoami::username(),
792        realname: whoami::realname(),
793        exe: std::env::current_exe()
794            .unwrap_or_default()
795            .to_string_lossy()
796            .into_owned(),
797        computer: whoami::devicename(),
798        distro: whoami::distro(),
799        cpu_brand,
800        tsc_frequency: frequency(),
801        start_time,
802        start_ticks,
803        parent_process_id,
804        properties,
805    }
806}