micromegas/servers/
maintenance.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound};
4use chrono::{TimeDelta, Utc};
5use micromegas_analytics::delete::delete_old_data;
6use micromegas_analytics::lakehouse::batch_update::materialize_partition_range;
7use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
8use micromegas_analytics::lakehouse::partition_cache::PartitionCache;
9use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files;
10use micromegas_analytics::lakehouse::view::View;
11use micromegas_analytics::lakehouse::view_factory::ViewFactory;
12use micromegas_analytics::response_writer::ResponseWriter;
13use micromegas_analytics::time::TimeRange;
14use micromegas_tracing::prelude::*;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::task::{JoinError, JoinSet};
19
20use super::cron_task::{CronTask, TaskCallback};
21
22type Views = Arc<Vec<Arc<dyn View>>>;
23
24/// Materializes all views within a given time range.
25///
26/// This function iterates through the provided views, materializing partitions
27/// for each view within the specified `insert_range` and `partition_time_delta`.
28#[span_fn]
29pub async fn materialize_all_views(
30    lakehouse: Arc<LakehouseContext>,
31    views: Views,
32    insert_range: TimeRange,
33    partition_time_delta: TimeDelta,
34) -> Result<()> {
35    let mut last_group = views.first().unwrap().get_update_group();
36    let mut partitions_all_views = Arc::new(
37        PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
38            .await?,
39    );
40    let null_response_writer = Arc::new(ResponseWriter::new(None));
41    for view in &*views {
42        if view.get_update_group() != last_group {
43            // views in the same group should have no inter-dependencies
44            last_group = view.get_update_group();
45            partitions_all_views = Arc::new(
46                PartitionCache::fetch_overlapping_insert_range(
47                    // we are fetching more partitions than we need, could be optimized
48                    &lakehouse.lake().db_pool,
49                    insert_range,
50                )
51                .await?,
52            );
53        }
54        materialize_partition_range(
55            partitions_all_views.clone(),
56            lakehouse.clone(),
57            view.clone(),
58            insert_range,
59            partition_time_delta,
60            null_response_writer.clone(),
61        )
62        .await?;
63    }
64    Ok(())
65}
66
67/// task running once a day to materialize older partitions
68pub struct EveryDayTask {
69    pub lakehouse: Arc<LakehouseContext>,
70    pub views: Views,
71}
72
73#[async_trait]
74impl TaskCallback for EveryDayTask {
75    #[span_fn]
76    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
77        let partition_time_delta = TimeDelta::days(1);
78        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
79        let begin_range = trunc_task_time - (partition_time_delta * 2);
80        let end_range = trunc_task_time;
81        materialize_all_views(
82            self.lakehouse.clone(),
83            self.views.clone(),
84            TimeRange::new(begin_range, end_range),
85            partition_time_delta,
86        )
87        .await
88    }
89}
90
91/// task running once an hour to materialize recent partitions
92pub struct EveryHourTask {
93    pub lakehouse: Arc<LakehouseContext>,
94    pub views: Views,
95}
96
97#[async_trait]
98impl TaskCallback for EveryHourTask {
99    #[span_fn]
100    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
101        delete_old_data(self.lakehouse.lake(), 90).await?;
102        delete_expired_temporary_files(self.lakehouse.lake().clone()).await?;
103
104        let partition_time_delta = TimeDelta::hours(1);
105        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
106        let begin_range = trunc_task_time - (partition_time_delta * 2);
107        let end_range = trunc_task_time;
108        materialize_all_views(
109            self.lakehouse.clone(),
110            self.views.clone(),
111            TimeRange::new(begin_range, end_range),
112            partition_time_delta,
113        )
114        .await
115    }
116}
117
118/// task running once a minute to materialize recent partitions
119pub struct EveryMinuteTask {
120    pub lakehouse: Arc<LakehouseContext>,
121    pub views: Views,
122}
123
124#[async_trait]
125impl TaskCallback for EveryMinuteTask {
126    #[span_fn]
127    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
128        let partition_time_delta = TimeDelta::minutes(1);
129        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
130        let begin_range = trunc_task_time - (partition_time_delta * 2);
131        // we only try to process a single partition per view
132        let end_range = trunc_task_time - partition_time_delta;
133        materialize_all_views(
134            self.lakehouse.clone(),
135            self.views.clone(),
136            TimeRange::new(begin_range, end_range),
137            partition_time_delta,
138        )
139        .await
140    }
141}
142
143/// task running once a second to materialize newest partitions
144pub struct EverySecondTask {
145    pub lakehouse: Arc<LakehouseContext>,
146    pub views: Views,
147}
148
149#[async_trait]
150impl TaskCallback for EverySecondTask {
151    #[span_fn]
152    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
153        let delay = Utc::now() - task_scheduled_time;
154        if delay > TimeDelta::seconds(10) {
155            // we don't want to accumulate too much delay - the minutes task will fill the missing data
156            warn!("skipping `seconds` task, delay={delay}");
157            return Ok(());
158        }
159        let partition_time_delta = TimeDelta::seconds(1);
160        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
161        let begin_range = trunc_task_time - (partition_time_delta * 2);
162        // we only try to process a single partition per view
163        let end_range = trunc_task_time - partition_time_delta;
164        materialize_all_views(
165            self.lakehouse.clone(),
166            self.views.clone(),
167            TimeRange::new(begin_range, end_range),
168            partition_time_delta,
169        )
170        .await
171    }
172}
173
174/// Logs the outcome of a completed cron task.
175///
176/// The result is triply nested: the outer `JoinError` reports a panicked or
177/// cancelled task, the inner `JoinError` comes from the spawned future, and the
178/// innermost `Result` is the task callback's own outcome. Any error at any layer
179/// is logged; a fully successful run is a no-op.
180fn log_task_result(res: Result<Result<Result<()>, JoinError>, JoinError>) {
181    match res {
182        Ok(Ok(Ok(()))) => {}
183        Ok(Ok(Err(e))) => error!("{e:?}"),
184        Ok(Err(e)) => error!("{e:?}"),
185        Err(e) => error!("{e:?}"),
186    }
187}
188
189/// Awaits and logs every in-flight task, returning once the set is empty.
190///
191/// Used to drain currently running tasks before the loop exits on shutdown, so
192/// their work completes rather than being dropped.
193async fn drain_task_set(task_set: &mut JoinSet<Result<Result<()>, JoinError>>) {
194    while let Some(res) = task_set.join_next().await {
195        log_task_result(res);
196    }
197}
198
199/// Runs a collection of `CronTask`s until `shutdown` fires.
200///
201/// When `shutdown` completes, the loop stops scheduling new tasks and drains
202/// any currently running tasks before returning.
203pub async fn run_tasks_forever<F>(mut tasks: Vec<CronTask>, max_parallelism: usize, shutdown: F)
204where
205    F: Future<Output = ()>,
206{
207    tokio::pin!(shutdown);
208    let mut task_set = JoinSet::new();
209    loop {
210        let mut next_task_run = Utc::now() + TimeDelta::days(2);
211        for task in &mut tasks {
212            if task.get_next_run() < Utc::now() {
213                task_set.spawn(task.spawn().await);
214                if task_set.len() >= max_parallelism {
215                    tokio::select! {
216                        res = task_set.join_next() => {
217                            if let Some(res) = res {
218                                log_task_result(res);
219                            }
220                        }
221                        _ = &mut shutdown => {
222                            drain_task_set(&mut task_set).await;
223                            return;
224                        }
225                    }
226                }
227            }
228            let task_next_run = task.get_next_run();
229            if task_next_run < next_task_run {
230                next_task_run = task_next_run;
231            }
232        }
233        let time_until_next_task = next_task_run - Utc::now();
234        if time_until_next_task > TimeDelta::zero() {
235            match time_until_next_task
236                .to_std()
237                .with_context(|| "delay.to_std")
238            {
239                Ok(wait) => {
240                    tokio::select! {
241                        _ = tokio::time::sleep(wait) => {}
242                        _ = &mut shutdown => {
243                            drain_task_set(&mut task_set).await;
244                            return;
245                        }
246                    }
247                }
248                Err(e) => warn!("{e:?}"),
249            }
250        } else {
251            // No sleep needed, but still poll the shutdown future so the loop
252            // can exit even when tasks run longer than their period.
253            tokio::select! {
254                biased;
255                _ = &mut shutdown => {
256                    drain_task_set(&mut task_set).await;
257                    return;
258                }
259                _ = tokio::task::yield_now() => {}
260            }
261        }
262    }
263}
264
265/// Retrieves a list of global views that have an associated update group.
266///
267/// This function filters the global views provided by the `view_factory`,
268/// returning only those that are part of an update group.
269pub fn get_global_views_with_update_group(view_factory: &ViewFactory) -> Vec<Arc<dyn View>> {
270    view_factory
271        .get_global_views()
272        .iter()
273        .filter(|v| v.get_update_group().is_some())
274        .cloned()
275        .collect()
276}
277
278/// Starts the maintenance daemon, which runs various scheduled tasks.
279///
280/// This function initializes and spawns several `CronTask`s for daily, hourly, minute,
281/// and second-based maintenance operations, such as data materialization and cleanup.
282/// All four runner loops react to `shutdown`: they stop scheduling and drain in-flight
283/// tasks. A deadline arm forces return after `grace` even if tasks haven't drained.
284///
285/// # Arguments
286///
287/// * `lakehouse` - The lakehouse context with shared metadata cache.
288/// * `views_to_update` - A vector of views that need to be updated by the daemon.
289/// * `shutdown` - Future that completes when the process should begin shutting down.
290/// * `grace` - Maximum time to wait for in-flight tasks after the shutdown signal.
291pub async fn daemon<F>(
292    lakehouse: Arc<LakehouseContext>,
293    mut views_to_update: Vec<Arc<dyn View>>,
294    shutdown: F,
295    grace: Duration,
296) -> Result<()>
297where
298    F: Future<Output = ()> + Send + 'static,
299{
300    use super::shutdown::ShutdownFanout;
301
302    views_to_update.sort_by_key(|v| v.get_update_group().unwrap_or(i32::MAX));
303    let views = Arc::new(views_to_update);
304
305    let every_day = CronTask::new(
306        String::from("every_day"),
307        TimeDelta::days(1),
308        TimeDelta::hours(4),
309        Arc::new(EveryDayTask {
310            lakehouse: lakehouse.clone(),
311            views: views.clone(),
312        }),
313    )?;
314    let every_hour = CronTask::new(
315        String::from("every_hour"),
316        TimeDelta::hours(1),
317        TimeDelta::minutes(10),
318        Arc::new(EveryHourTask {
319            lakehouse: lakehouse.clone(),
320            views: views.clone(),
321        }),
322    )?;
323    let every_minute = CronTask::new(
324        String::from("every minute"),
325        TimeDelta::minutes(1),
326        TimeDelta::seconds(30),
327        Arc::new(EveryMinuteTask {
328            lakehouse: lakehouse.clone(),
329            views: views.clone(),
330        }),
331    )?;
332    let every_second = CronTask::new(
333        String::from("every second"),
334        TimeDelta::seconds(1),
335        TimeDelta::milliseconds(500),
336        Arc::new(EverySecondTask { lakehouse, views }),
337    )?;
338
339    let fanout = ShutdownFanout::new(shutdown);
340    let grace_secs = grace.as_secs();
341
342    let mut runners = tokio::task::JoinSet::new();
343    runners.spawn(run_tasks_forever(vec![every_day], 1, fanout.subscribe()));
344    runners.spawn(run_tasks_forever(vec![every_hour], 1, fanout.subscribe()));
345    runners.spawn(run_tasks_forever(vec![every_minute], 5, fanout.subscribe()));
346    runners.spawn(run_tasks_forever(vec![every_second], 5, fanout.subscribe()));
347
348    let deadline = {
349        let d = fanout.subscribe();
350        async move {
351            d.await;
352            tokio::time::sleep(grace).await;
353        }
354    };
355
356    tokio::select! {
357        _ = runners.join_all() => {
358            info!("daemon drain completed");
359        }
360        _ = deadline => {
361            warn!("daemon grace period of {grace_secs}s elapsed with work still in flight");
362        }
363    }
364    Ok(())
365}