micromegas_tracing/
flush_monitor.rs

1//! FlushMonitor triggers the flush of the telemetry streams at regular interval.
2use chrono::prelude::*;
3use std::sync::atomic::{AtomicI64, Ordering};
4
5use crate::dispatch::{flush_log_buffer, flush_metrics_buffer, for_each_thread_stream};
6
7pub struct FlushMonitor {
8    last_flush: AtomicI64,
9    flush_period_seconds: i64,
10}
11
12impl FlushMonitor {
13    pub fn new(flush_period_seconds: i64) -> Self {
14        Self {
15            last_flush: AtomicI64::new(Local::now().timestamp()),
16            flush_period_seconds,
17        }
18    }
19
20    pub fn time_to_flush_seconds(&self) -> i64 {
21        let now = Local::now().timestamp();
22        let seconds_since_flush = now - self.last_flush.load(Ordering::Relaxed);
23        self.flush_period_seconds - seconds_since_flush
24    }
25
26    pub fn tick(&self) {
27        if self.time_to_flush_seconds() <= 0 {
28            self.last_flush
29                .store(Local::now().timestamp(), Ordering::Relaxed);
30            flush_log_buffer();
31            flush_metrics_buffer();
32            for_each_thread_stream(&mut |stream_ptr| unsafe {
33                //Thread streams can't be flushed without introducing a synchronization mechanism. They are marked as full so that the calling code will flush them in a safe manner.
34                (*stream_ptr).set_full();
35            });
36        }
37    }
38}
39
40impl Default for FlushMonitor {
41    fn default() -> Self {
42        // Default is to flush every minute unless specified by the env variable
43        const DEFAULT_PERIOD: i64 = 60;
44        let nb_seconds = std::env::var("MICROMEGAS_FLUSH_PERIOD")
45            .map(|v| v.parse::<i64>().unwrap_or(DEFAULT_PERIOD))
46            .unwrap_or(DEFAULT_PERIOD);
47        Self::new(nb_seconds)
48    }
49}