micromegas/servers/
cron_task.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound, TimeDelta, Utc};
4use futures::future::BoxFuture;
5use micromegas_tracing::prelude::*;
6use std::sync::Arc;
7use tokio::task::JoinError;
8
9/// Trait for a task that can be run periodically by the cron service.
10#[async_trait]
11pub trait TaskCallback: Send + Sync {
12    /// Runs the task at the scheduled time.
13    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()>;
14}
15
16/// Represents a task scheduled to run periodically.
17pub struct CronTask {
18    name: String,
19    period: TimeDelta,
20    callback: Arc<dyn TaskCallback>,
21    next_run: DateTime<Utc>,
22}
23
24impl CronTask {
25    /// Creates a new `CronTask`.
26    ///
27    /// The `next_run` time is calculated based on the current time, period, and offset.
28    pub fn new(
29        name: String,
30        period: TimeDelta,
31        offset: TimeDelta,
32        callback: Arc<dyn TaskCallback>,
33    ) -> Result<Self> {
34        let now = Utc::now();
35        let next_run = now.duration_trunc(period)? + period + offset;
36        Ok(Self {
37            name,
38            period,
39            callback,
40            next_run,
41        })
42    }
43
44    /// Returns the next scheduled run time for the task.
45    ///
46    /// This value is updated after each successful `spawn` operation.
47    pub fn get_next_run(&self) -> DateTime<Utc> {
48        self.next_run
49    }
50
51    /// Spawns the task to run in the background.
52    ///
53    /// This function calculates the next scheduled run time, records metrics about task delay,
54    /// and then spawns an asynchronous task to execute the `TaskCallback`.
55    pub async fn spawn(&mut self) -> BoxFuture<'static, Result<Result<()>, JoinError>> {
56        let now = Utc::now();
57        info!("running scheduled task name={}", &self.name);
58        let task_time: DateTime<Utc> = self.next_run;
59        self.next_run += self.period;
60        imetric!(
61            "task_tick_delay",
62            "ns",
63            (now - task_time)
64                .num_nanoseconds()
65                .with_context(|| "get tick delay as ns")
66                .unwrap() as u64
67        );
68        let callback = self.callback.clone();
69        Box::pin(spawn_with_context(async move {
70            let res = callback
71                .run(task_time)
72                .await
73                .with_context(|| "TaskDef::tick");
74            imetric!(
75                "task_tick_latency",
76                "ns",
77                (Utc::now() - task_time)
78                    .num_nanoseconds()
79                    .with_context(|| "get tick delay as ns")
80                    .unwrap() as u64
81            );
82            res
83        }))
84    }
85}