micromegas/servers/
maintenance.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound};
4use chrono::{TimeDelta, Utc};
5use micromegas_analytics::delete::delete_old_data;
6use micromegas_analytics::lakehouse::batch_update::materialize_partition_range;
7use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
8use micromegas_analytics::lakehouse::partition_cache::PartitionCache;
9use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files;
10use micromegas_analytics::lakehouse::view::View;
11use micromegas_analytics::lakehouse::view_factory::ViewFactory;
12use micromegas_analytics::response_writer::ResponseWriter;
13use micromegas_analytics::time::TimeRange;
14use micromegas_tracing::prelude::*;
15use std::sync::Arc;
16use tokio::task::JoinSet;
17
18use super::cron_task::{CronTask, TaskCallback};
19
20type Views = Arc<Vec<Arc<dyn View>>>;
21
22/// Materializes all views within a given time range.
23///
24/// This function iterates through the provided views, materializing partitions
25/// for each view within the specified `insert_range` and `partition_time_delta`.
26#[span_fn]
27pub async fn materialize_all_views(
28    lakehouse: Arc<LakehouseContext>,
29    views: Views,
30    insert_range: TimeRange,
31    partition_time_delta: TimeDelta,
32) -> Result<()> {
33    let mut last_group = views.first().unwrap().get_update_group();
34    let mut partitions_all_views = Arc::new(
35        PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
36            .await?,
37    );
38    let null_response_writer = Arc::new(ResponseWriter::new(None));
39    for view in &*views {
40        if view.get_update_group() != last_group {
41            // views in the same group should have no inter-dependencies
42            last_group = view.get_update_group();
43            partitions_all_views = Arc::new(
44                PartitionCache::fetch_overlapping_insert_range(
45                    // we are fetching more partitions than we need, could be optimized
46                    &lakehouse.lake().db_pool,
47                    insert_range,
48                )
49                .await?,
50            );
51        }
52        materialize_partition_range(
53            partitions_all_views.clone(),
54            lakehouse.clone(),
55            view.clone(),
56            insert_range,
57            partition_time_delta,
58            null_response_writer.clone(),
59        )
60        .await?;
61    }
62    Ok(())
63}
64
65/// task running once a day to materialize older partitions
66pub struct EveryDayTask {
67    pub lakehouse: Arc<LakehouseContext>,
68    pub views: Views,
69}
70
71#[async_trait]
72impl TaskCallback for EveryDayTask {
73    #[span_fn]
74    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
75        let partition_time_delta = TimeDelta::days(1);
76        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
77        let begin_range = trunc_task_time - (partition_time_delta * 2);
78        let end_range = trunc_task_time;
79        materialize_all_views(
80            self.lakehouse.clone(),
81            self.views.clone(),
82            TimeRange::new(begin_range, end_range),
83            partition_time_delta,
84        )
85        .await
86    }
87}
88
89/// task running once an hour to materialize recent partitions
90pub struct EveryHourTask {
91    pub lakehouse: Arc<LakehouseContext>,
92    pub views: Views,
93}
94
95#[async_trait]
96impl TaskCallback for EveryHourTask {
97    #[span_fn]
98    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
99        delete_old_data(self.lakehouse.lake(), 90).await?;
100        delete_expired_temporary_files(self.lakehouse.lake().clone()).await?;
101
102        let partition_time_delta = TimeDelta::hours(1);
103        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
104        let begin_range = trunc_task_time - (partition_time_delta * 2);
105        let end_range = trunc_task_time;
106        materialize_all_views(
107            self.lakehouse.clone(),
108            self.views.clone(),
109            TimeRange::new(begin_range, end_range),
110            partition_time_delta,
111        )
112        .await
113    }
114}
115
116/// task running once a minute to materialize recent partitions
117pub struct EveryMinuteTask {
118    pub lakehouse: Arc<LakehouseContext>,
119    pub views: Views,
120}
121
122#[async_trait]
123impl TaskCallback for EveryMinuteTask {
124    #[span_fn]
125    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
126        let partition_time_delta = TimeDelta::minutes(1);
127        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
128        let begin_range = trunc_task_time - (partition_time_delta * 2);
129        // we only try to process a single partition per view
130        let end_range = trunc_task_time - partition_time_delta;
131        materialize_all_views(
132            self.lakehouse.clone(),
133            self.views.clone(),
134            TimeRange::new(begin_range, end_range),
135            partition_time_delta,
136        )
137        .await
138    }
139}
140
141/// task running once a second to materialize newest partitions
142pub struct EverySecondTask {
143    pub lakehouse: Arc<LakehouseContext>,
144    pub views: Views,
145}
146
147#[async_trait]
148impl TaskCallback for EverySecondTask {
149    #[span_fn]
150    async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
151        let delay = Utc::now() - task_scheduled_time;
152        if delay > TimeDelta::seconds(10) {
153            // we don't want to accumulate too much delay - the minutes task will fill the missing data
154            warn!("skipping `seconds` task, delay={delay}");
155            return Ok(());
156        }
157        let partition_time_delta = TimeDelta::seconds(1);
158        let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
159        let begin_range = trunc_task_time - (partition_time_delta * 2);
160        // we only try to process a single partition per view
161        let end_range = trunc_task_time - partition_time_delta;
162        materialize_all_views(
163            self.lakehouse.clone(),
164            self.views.clone(),
165            TimeRange::new(begin_range, end_range),
166            partition_time_delta,
167        )
168        .await
169    }
170}
171
172/// Runs a collection of `CronTask`s indefinitely.
173///
174/// This function continuously checks for tasks that are due to run, spawns them,
175/// and manages their execution, ensuring that `max_parallelism` is not exceeded.
176pub async fn run_tasks_forever(mut tasks: Vec<CronTask>, max_parallelism: usize) {
177    let mut task_set = JoinSet::new();
178    loop {
179        let mut next_task_run = Utc::now() + TimeDelta::days(2);
180        for task in &mut tasks {
181            if task.get_next_run() < Utc::now() {
182                task_set.spawn(task.spawn().await);
183                if task_set.len() >= max_parallelism
184                    && let Some(res) = task_set.join_next().await
185                {
186                    match res {
187                        Ok(res) => match res {
188                            Ok(res) => match res {
189                                Ok(()) => {}
190                                Err(e) => error!("{e:?}"),
191                            },
192                            Err(e) => error!("{e:?}"),
193                        },
194                        Err(e) => error!("{e:?}"),
195                    }
196                }
197            }
198            let task_next_run = task.get_next_run();
199            if task_next_run < next_task_run {
200                next_task_run = task_next_run;
201            }
202        }
203        let time_until_next_task = next_task_run - Utc::now();
204        if time_until_next_task > TimeDelta::zero() {
205            match time_until_next_task
206                .to_std()
207                .with_context(|| "delay.to_std")
208            {
209                Ok(wait) => tokio::time::sleep(wait).await,
210                Err(e) => warn!("{e:?}"),
211            }
212        }
213    }
214}
215
216/// Retrieves a list of global views that have an associated update group.
217///
218/// This function filters the global views provided by the `view_factory`,
219/// returning only those that are part of an update group.
220pub fn get_global_views_with_update_group(view_factory: &ViewFactory) -> Vec<Arc<dyn View>> {
221    view_factory
222        .get_global_views()
223        .iter()
224        .filter(|v| v.get_update_group().is_some())
225        .cloned()
226        .collect()
227}
228
229/// Starts the maintenance daemon, which runs various scheduled tasks.
230///
231/// This function initializes and spawns several `CronTask`s for daily, hourly, minute,
232/// and second-based maintenance operations, such as data materialization and cleanup.
233///
234/// # Arguments
235///
236/// * `lakehouse` - The lakehouse context with shared metadata cache.
237/// * `views_to_update` - A vector of views that need to be updated by the daemon.
238pub async fn daemon(
239    lakehouse: Arc<LakehouseContext>,
240    mut views_to_update: Vec<Arc<dyn View>>,
241) -> Result<()> {
242    views_to_update.sort_by_key(|v| v.get_update_group().unwrap_or(i32::MAX));
243    let views = Arc::new(views_to_update);
244
245    let every_day = CronTask::new(
246        String::from("every_day"),
247        TimeDelta::days(1),
248        TimeDelta::hours(4),
249        Arc::new(EveryDayTask {
250            lakehouse: lakehouse.clone(),
251            views: views.clone(),
252        }),
253    )?;
254    let every_hour = CronTask::new(
255        String::from("every_hour"),
256        TimeDelta::hours(1),
257        TimeDelta::minutes(10),
258        Arc::new(EveryHourTask {
259            lakehouse: lakehouse.clone(),
260            views: views.clone(),
261        }),
262    )?;
263    let every_minute = CronTask::new(
264        String::from("every minute"),
265        TimeDelta::minutes(1),
266        TimeDelta::seconds(30),
267        Arc::new(EveryMinuteTask {
268            lakehouse: lakehouse.clone(),
269            views: views.clone(),
270        }),
271    )?;
272    let every_second = CronTask::new(
273        String::from("every second"),
274        TimeDelta::seconds(1),
275        TimeDelta::milliseconds(500),
276        Arc::new(EverySecondTask { lakehouse, views }),
277    )?;
278
279    let mut runners = tokio::task::JoinSet::new();
280    runners.spawn(async move { run_tasks_forever(vec![every_day], 1).await });
281    runners.spawn(async move { run_tasks_forever(vec![every_hour], 1).await });
282    runners.spawn(async move { run_tasks_forever(vec![every_minute], 5).await });
283    runners.spawn(async move { run_tasks_forever(vec![every_second], 5).await });
284    runners.join_all().await;
285    Ok(())
286}