1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound};
4use chrono::{TimeDelta, Utc};
5use micromegas_analytics::delete::delete_old_data;
6use micromegas_analytics::lakehouse::batch_update::materialize_partition_range;
7use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
8use micromegas_analytics::lakehouse::partition_cache::PartitionCache;
9use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files;
10use micromegas_analytics::lakehouse::view::View;
11use micromegas_analytics::lakehouse::view_factory::ViewFactory;
12use micromegas_analytics::response_writer::ResponseWriter;
13use micromegas_analytics::time::TimeRange;
14use micromegas_tracing::prelude::*;
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::task::{JoinError, JoinSet};
19
20use super::cron_task::{CronTask, TaskCallback};
21
22type Views = Arc<Vec<Arc<dyn View>>>;
23
24#[span_fn]
29pub async fn materialize_all_views(
30 lakehouse: Arc<LakehouseContext>,
31 views: Views,
32 insert_range: TimeRange,
33 partition_time_delta: TimeDelta,
34) -> Result<()> {
35 let mut last_group = views.first().unwrap().get_update_group();
36 let mut partitions_all_views = Arc::new(
37 PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
38 .await?,
39 );
40 let null_response_writer = Arc::new(ResponseWriter::new(None));
41 for view in &*views {
42 if view.get_update_group() != last_group {
43 last_group = view.get_update_group();
45 partitions_all_views = Arc::new(
46 PartitionCache::fetch_overlapping_insert_range(
47 &lakehouse.lake().db_pool,
49 insert_range,
50 )
51 .await?,
52 );
53 }
54 materialize_partition_range(
55 partitions_all_views.clone(),
56 lakehouse.clone(),
57 view.clone(),
58 insert_range,
59 partition_time_delta,
60 null_response_writer.clone(),
61 )
62 .await?;
63 }
64 Ok(())
65}
66
67pub struct EveryDayTask {
69 pub lakehouse: Arc<LakehouseContext>,
70 pub views: Views,
71}
72
73#[async_trait]
74impl TaskCallback for EveryDayTask {
75 #[span_fn]
76 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
77 let partition_time_delta = TimeDelta::days(1);
78 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
79 let begin_range = trunc_task_time - (partition_time_delta * 2);
80 let end_range = trunc_task_time;
81 materialize_all_views(
82 self.lakehouse.clone(),
83 self.views.clone(),
84 TimeRange::new(begin_range, end_range),
85 partition_time_delta,
86 )
87 .await
88 }
89}
90
91pub struct EveryHourTask {
93 pub lakehouse: Arc<LakehouseContext>,
94 pub views: Views,
95}
96
97#[async_trait]
98impl TaskCallback for EveryHourTask {
99 #[span_fn]
100 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
101 delete_old_data(self.lakehouse.lake(), 90).await?;
102 delete_expired_temporary_files(self.lakehouse.lake().clone()).await?;
103
104 let partition_time_delta = TimeDelta::hours(1);
105 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
106 let begin_range = trunc_task_time - (partition_time_delta * 2);
107 let end_range = trunc_task_time;
108 materialize_all_views(
109 self.lakehouse.clone(),
110 self.views.clone(),
111 TimeRange::new(begin_range, end_range),
112 partition_time_delta,
113 )
114 .await
115 }
116}
117
118pub struct EveryMinuteTask {
120 pub lakehouse: Arc<LakehouseContext>,
121 pub views: Views,
122}
123
124#[async_trait]
125impl TaskCallback for EveryMinuteTask {
126 #[span_fn]
127 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
128 let partition_time_delta = TimeDelta::minutes(1);
129 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
130 let begin_range = trunc_task_time - (partition_time_delta * 2);
131 let end_range = trunc_task_time - partition_time_delta;
133 materialize_all_views(
134 self.lakehouse.clone(),
135 self.views.clone(),
136 TimeRange::new(begin_range, end_range),
137 partition_time_delta,
138 )
139 .await
140 }
141}
142
143pub struct EverySecondTask {
145 pub lakehouse: Arc<LakehouseContext>,
146 pub views: Views,
147}
148
149#[async_trait]
150impl TaskCallback for EverySecondTask {
151 #[span_fn]
152 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
153 let delay = Utc::now() - task_scheduled_time;
154 if delay > TimeDelta::seconds(10) {
155 warn!("skipping `seconds` task, delay={delay}");
157 return Ok(());
158 }
159 let partition_time_delta = TimeDelta::seconds(1);
160 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
161 let begin_range = trunc_task_time - (partition_time_delta * 2);
162 let end_range = trunc_task_time - partition_time_delta;
164 materialize_all_views(
165 self.lakehouse.clone(),
166 self.views.clone(),
167 TimeRange::new(begin_range, end_range),
168 partition_time_delta,
169 )
170 .await
171 }
172}
173
174fn log_task_result(res: Result<Result<Result<()>, JoinError>, JoinError>) {
181 match res {
182 Ok(Ok(Ok(()))) => {}
183 Ok(Ok(Err(e))) => error!("{e:?}"),
184 Ok(Err(e)) => error!("{e:?}"),
185 Err(e) => error!("{e:?}"),
186 }
187}
188
189async fn drain_task_set(task_set: &mut JoinSet<Result<Result<()>, JoinError>>) {
194 while let Some(res) = task_set.join_next().await {
195 log_task_result(res);
196 }
197}
198
199pub async fn run_tasks_forever<F>(mut tasks: Vec<CronTask>, max_parallelism: usize, shutdown: F)
204where
205 F: Future<Output = ()>,
206{
207 tokio::pin!(shutdown);
208 let mut task_set = JoinSet::new();
209 loop {
210 let mut next_task_run = Utc::now() + TimeDelta::days(2);
211 for task in &mut tasks {
212 if task.get_next_run() < Utc::now() {
213 task_set.spawn(task.spawn().await);
214 if task_set.len() >= max_parallelism {
215 tokio::select! {
216 res = task_set.join_next() => {
217 if let Some(res) = res {
218 log_task_result(res);
219 }
220 }
221 _ = &mut shutdown => {
222 drain_task_set(&mut task_set).await;
223 return;
224 }
225 }
226 }
227 }
228 let task_next_run = task.get_next_run();
229 if task_next_run < next_task_run {
230 next_task_run = task_next_run;
231 }
232 }
233 let time_until_next_task = next_task_run - Utc::now();
234 if time_until_next_task > TimeDelta::zero() {
235 match time_until_next_task
236 .to_std()
237 .with_context(|| "delay.to_std")
238 {
239 Ok(wait) => {
240 tokio::select! {
241 _ = tokio::time::sleep(wait) => {}
242 _ = &mut shutdown => {
243 drain_task_set(&mut task_set).await;
244 return;
245 }
246 }
247 }
248 Err(e) => warn!("{e:?}"),
249 }
250 } else {
251 tokio::select! {
254 biased;
255 _ = &mut shutdown => {
256 drain_task_set(&mut task_set).await;
257 return;
258 }
259 _ = tokio::task::yield_now() => {}
260 }
261 }
262 }
263}
264
265pub fn get_global_views_with_update_group(view_factory: &ViewFactory) -> Vec<Arc<dyn View>> {
270 view_factory
271 .get_global_views()
272 .iter()
273 .filter(|v| v.get_update_group().is_some())
274 .cloned()
275 .collect()
276}
277
278pub async fn daemon<F>(
292 lakehouse: Arc<LakehouseContext>,
293 mut views_to_update: Vec<Arc<dyn View>>,
294 shutdown: F,
295 grace: Duration,
296) -> Result<()>
297where
298 F: Future<Output = ()> + Send + 'static,
299{
300 use super::shutdown::ShutdownFanout;
301
302 views_to_update.sort_by_key(|v| v.get_update_group().unwrap_or(i32::MAX));
303 let views = Arc::new(views_to_update);
304
305 let every_day = CronTask::new(
306 String::from("every_day"),
307 TimeDelta::days(1),
308 TimeDelta::hours(4),
309 Arc::new(EveryDayTask {
310 lakehouse: lakehouse.clone(),
311 views: views.clone(),
312 }),
313 )?;
314 let every_hour = CronTask::new(
315 String::from("every_hour"),
316 TimeDelta::hours(1),
317 TimeDelta::minutes(10),
318 Arc::new(EveryHourTask {
319 lakehouse: lakehouse.clone(),
320 views: views.clone(),
321 }),
322 )?;
323 let every_minute = CronTask::new(
324 String::from("every minute"),
325 TimeDelta::minutes(1),
326 TimeDelta::seconds(30),
327 Arc::new(EveryMinuteTask {
328 lakehouse: lakehouse.clone(),
329 views: views.clone(),
330 }),
331 )?;
332 let every_second = CronTask::new(
333 String::from("every second"),
334 TimeDelta::seconds(1),
335 TimeDelta::milliseconds(500),
336 Arc::new(EverySecondTask { lakehouse, views }),
337 )?;
338
339 let fanout = ShutdownFanout::new(shutdown);
340 let grace_secs = grace.as_secs();
341
342 let mut runners = tokio::task::JoinSet::new();
343 runners.spawn(run_tasks_forever(vec![every_day], 1, fanout.subscribe()));
344 runners.spawn(run_tasks_forever(vec![every_hour], 1, fanout.subscribe()));
345 runners.spawn(run_tasks_forever(vec![every_minute], 5, fanout.subscribe()));
346 runners.spawn(run_tasks_forever(vec![every_second], 5, fanout.subscribe()));
347
348 let deadline = {
349 let d = fanout.subscribe();
350 async move {
351 d.await;
352 tokio::time::sleep(grace).await;
353 }
354 };
355
356 tokio::select! {
357 _ = runners.join_all() => {
358 info!("daemon drain completed");
359 }
360 _ = deadline => {
361 warn!("daemon grace period of {grace_secs}s elapsed with work still in flight");
362 }
363 }
364 Ok(())
365}