micromegas/servers/
cron_task.rs1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound, TimeDelta, Utc};
4use futures::future::BoxFuture;
5use micromegas_tracing::prelude::*;
6use std::sync::Arc;
7use tokio::task::JoinError;
8
9#[async_trait]
11pub trait TaskCallback: Send + Sync {
12 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()>;
14}
15
16pub struct CronTask {
18 name: String,
19 period: TimeDelta,
20 callback: Arc<dyn TaskCallback>,
21 next_run: DateTime<Utc>,
22}
23
24impl CronTask {
25 pub fn new(
29 name: String,
30 period: TimeDelta,
31 offset: TimeDelta,
32 callback: Arc<dyn TaskCallback>,
33 ) -> Result<Self> {
34 let now = Utc::now();
35 let next_run = now.duration_trunc(period)? + period + offset;
36 Ok(Self {
37 name,
38 period,
39 callback,
40 next_run,
41 })
42 }
43
44 pub fn get_next_run(&self) -> DateTime<Utc> {
48 self.next_run
49 }
50
51 pub async fn spawn(&mut self) -> BoxFuture<'static, Result<Result<()>, JoinError>> {
56 let now = Utc::now();
57 info!("running scheduled task name={}", &self.name);
58 let task_time: DateTime<Utc> = self.next_run;
59 self.next_run += self.period;
60 imetric!(
61 "task_tick_delay",
62 "ns",
63 (now - task_time)
64 .num_nanoseconds()
65 .with_context(|| "get tick delay as ns")
66 .unwrap() as u64
67 );
68 let callback = self.callback.clone();
69 Box::pin(spawn_with_context(async move {
70 let res = callback
71 .run(task_time)
72 .await
73 .with_context(|| "TaskDef::tick");
74 imetric!(
75 "task_tick_latency",
76 "ns",
77 (Utc::now() - task_time)
78 .num_nanoseconds()
79 .with_context(|| "get tick delay as ns")
80 .unwrap() as u64
81 );
82 res
83 }))
84 }
85}