micromegas_tracing/
flush_monitor.rs1use chrono::prelude::*;
3use std::sync::atomic::{AtomicI64, Ordering};
4
5use crate::dispatch::{flush_log_buffer, flush_metrics_buffer, for_each_thread_stream};
6
7pub struct FlushMonitor {
8 last_flush: AtomicI64,
9 flush_period_seconds: i64,
10}
11
12impl FlushMonitor {
13 pub fn new(flush_period_seconds: i64) -> Self {
14 Self {
15 last_flush: AtomicI64::new(Local::now().timestamp()),
16 flush_period_seconds,
17 }
18 }
19
20 pub fn time_to_flush_seconds(&self) -> i64 {
21 let now = Local::now().timestamp();
22 let seconds_since_flush = now - self.last_flush.load(Ordering::Relaxed);
23 self.flush_period_seconds - seconds_since_flush
24 }
25
26 pub fn tick(&self) {
27 if self.time_to_flush_seconds() <= 0 {
28 self.last_flush
29 .store(Local::now().timestamp(), Ordering::Relaxed);
30 flush_log_buffer();
31 flush_metrics_buffer();
32 for_each_thread_stream(&mut |stream_ptr| unsafe {
33 (*stream_ptr).set_full();
35 });
36 }
37 }
38}
39
40impl Default for FlushMonitor {
41 fn default() -> Self {
42 const DEFAULT_PERIOD: i64 = 60;
44 let nb_seconds = std::env::var("MICROMEGAS_FLUSH_PERIOD")
45 .map(|v| v.parse::<i64>().unwrap_or(DEFAULT_PERIOD))
46 .unwrap_or(DEFAULT_PERIOD);
47 Self::new(nb_seconds)
48 }
49}