1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use chrono::{DateTime, DurationRound};
4use chrono::{TimeDelta, Utc};
5use micromegas_analytics::delete::delete_old_data;
6use micromegas_analytics::lakehouse::batch_update::materialize_partition_range;
7use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
8use micromegas_analytics::lakehouse::partition_cache::PartitionCache;
9use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files;
10use micromegas_analytics::lakehouse::view::View;
11use micromegas_analytics::lakehouse::view_factory::ViewFactory;
12use micromegas_analytics::response_writer::ResponseWriter;
13use micromegas_analytics::time::TimeRange;
14use micromegas_tracing::prelude::*;
15use std::sync::Arc;
16use tokio::task::JoinSet;
17
18use super::cron_task::{CronTask, TaskCallback};
19
20type Views = Arc<Vec<Arc<dyn View>>>;
21
22#[span_fn]
27pub async fn materialize_all_views(
28 lakehouse: Arc<LakehouseContext>,
29 views: Views,
30 insert_range: TimeRange,
31 partition_time_delta: TimeDelta,
32) -> Result<()> {
33 let mut last_group = views.first().unwrap().get_update_group();
34 let mut partitions_all_views = Arc::new(
35 PartitionCache::fetch_overlapping_insert_range(&lakehouse.lake().db_pool, insert_range)
36 .await?,
37 );
38 let null_response_writer = Arc::new(ResponseWriter::new(None));
39 for view in &*views {
40 if view.get_update_group() != last_group {
41 last_group = view.get_update_group();
43 partitions_all_views = Arc::new(
44 PartitionCache::fetch_overlapping_insert_range(
45 &lakehouse.lake().db_pool,
47 insert_range,
48 )
49 .await?,
50 );
51 }
52 materialize_partition_range(
53 partitions_all_views.clone(),
54 lakehouse.clone(),
55 view.clone(),
56 insert_range,
57 partition_time_delta,
58 null_response_writer.clone(),
59 )
60 .await?;
61 }
62 Ok(())
63}
64
65pub struct EveryDayTask {
67 pub lakehouse: Arc<LakehouseContext>,
68 pub views: Views,
69}
70
71#[async_trait]
72impl TaskCallback for EveryDayTask {
73 #[span_fn]
74 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
75 let partition_time_delta = TimeDelta::days(1);
76 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
77 let begin_range = trunc_task_time - (partition_time_delta * 2);
78 let end_range = trunc_task_time;
79 materialize_all_views(
80 self.lakehouse.clone(),
81 self.views.clone(),
82 TimeRange::new(begin_range, end_range),
83 partition_time_delta,
84 )
85 .await
86 }
87}
88
89pub struct EveryHourTask {
91 pub lakehouse: Arc<LakehouseContext>,
92 pub views: Views,
93}
94
95#[async_trait]
96impl TaskCallback for EveryHourTask {
97 #[span_fn]
98 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
99 delete_old_data(self.lakehouse.lake(), 90).await?;
100 delete_expired_temporary_files(self.lakehouse.lake().clone()).await?;
101
102 let partition_time_delta = TimeDelta::hours(1);
103 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
104 let begin_range = trunc_task_time - (partition_time_delta * 2);
105 let end_range = trunc_task_time;
106 materialize_all_views(
107 self.lakehouse.clone(),
108 self.views.clone(),
109 TimeRange::new(begin_range, end_range),
110 partition_time_delta,
111 )
112 .await
113 }
114}
115
116pub struct EveryMinuteTask {
118 pub lakehouse: Arc<LakehouseContext>,
119 pub views: Views,
120}
121
122#[async_trait]
123impl TaskCallback for EveryMinuteTask {
124 #[span_fn]
125 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
126 let partition_time_delta = TimeDelta::minutes(1);
127 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
128 let begin_range = trunc_task_time - (partition_time_delta * 2);
129 let end_range = trunc_task_time - partition_time_delta;
131 materialize_all_views(
132 self.lakehouse.clone(),
133 self.views.clone(),
134 TimeRange::new(begin_range, end_range),
135 partition_time_delta,
136 )
137 .await
138 }
139}
140
141pub struct EverySecondTask {
143 pub lakehouse: Arc<LakehouseContext>,
144 pub views: Views,
145}
146
147#[async_trait]
148impl TaskCallback for EverySecondTask {
149 #[span_fn]
150 async fn run(&self, task_scheduled_time: DateTime<Utc>) -> Result<()> {
151 let delay = Utc::now() - task_scheduled_time;
152 if delay > TimeDelta::seconds(10) {
153 warn!("skipping `seconds` task, delay={delay}");
155 return Ok(());
156 }
157 let partition_time_delta = TimeDelta::seconds(1);
158 let trunc_task_time = task_scheduled_time.duration_trunc(partition_time_delta)?;
159 let begin_range = trunc_task_time - (partition_time_delta * 2);
160 let end_range = trunc_task_time - partition_time_delta;
162 materialize_all_views(
163 self.lakehouse.clone(),
164 self.views.clone(),
165 TimeRange::new(begin_range, end_range),
166 partition_time_delta,
167 )
168 .await
169 }
170}
171
172pub async fn run_tasks_forever(mut tasks: Vec<CronTask>, max_parallelism: usize) {
177 let mut task_set = JoinSet::new();
178 loop {
179 let mut next_task_run = Utc::now() + TimeDelta::days(2);
180 for task in &mut tasks {
181 if task.get_next_run() < Utc::now() {
182 task_set.spawn(task.spawn().await);
183 if task_set.len() >= max_parallelism
184 && let Some(res) = task_set.join_next().await
185 {
186 match res {
187 Ok(res) => match res {
188 Ok(res) => match res {
189 Ok(()) => {}
190 Err(e) => error!("{e:?}"),
191 },
192 Err(e) => error!("{e:?}"),
193 },
194 Err(e) => error!("{e:?}"),
195 }
196 }
197 }
198 let task_next_run = task.get_next_run();
199 if task_next_run < next_task_run {
200 next_task_run = task_next_run;
201 }
202 }
203 let time_until_next_task = next_task_run - Utc::now();
204 if time_until_next_task > TimeDelta::zero() {
205 match time_until_next_task
206 .to_std()
207 .with_context(|| "delay.to_std")
208 {
209 Ok(wait) => tokio::time::sleep(wait).await,
210 Err(e) => warn!("{e:?}"),
211 }
212 }
213 }
214}
215
216pub fn get_global_views_with_update_group(view_factory: &ViewFactory) -> Vec<Arc<dyn View>> {
221 view_factory
222 .get_global_views()
223 .iter()
224 .filter(|v| v.get_update_group().is_some())
225 .cloned()
226 .collect()
227}
228
229pub async fn daemon(
239 lakehouse: Arc<LakehouseContext>,
240 mut views_to_update: Vec<Arc<dyn View>>,
241) -> Result<()> {
242 views_to_update.sort_by_key(|v| v.get_update_group().unwrap_or(i32::MAX));
243 let views = Arc::new(views_to_update);
244
245 let every_day = CronTask::new(
246 String::from("every_day"),
247 TimeDelta::days(1),
248 TimeDelta::hours(4),
249 Arc::new(EveryDayTask {
250 lakehouse: lakehouse.clone(),
251 views: views.clone(),
252 }),
253 )?;
254 let every_hour = CronTask::new(
255 String::from("every_hour"),
256 TimeDelta::hours(1),
257 TimeDelta::minutes(10),
258 Arc::new(EveryHourTask {
259 lakehouse: lakehouse.clone(),
260 views: views.clone(),
261 }),
262 )?;
263 let every_minute = CronTask::new(
264 String::from("every minute"),
265 TimeDelta::minutes(1),
266 TimeDelta::seconds(30),
267 Arc::new(EveryMinuteTask {
268 lakehouse: lakehouse.clone(),
269 views: views.clone(),
270 }),
271 )?;
272 let every_second = CronTask::new(
273 String::from("every second"),
274 TimeDelta::seconds(1),
275 TimeDelta::milliseconds(500),
276 Arc::new(EverySecondTask { lakehouse, views }),
277 )?;
278
279 let mut runners = tokio::task::JoinSet::new();
280 runners.spawn(async move { run_tasks_forever(vec![every_day], 1).await });
281 runners.spawn(async move { run_tasks_forever(vec![every_hour], 1).await });
282 runners.spawn(async move { run_tasks_forever(vec![every_minute], 5).await });
283 runners.spawn(async move { run_tasks_forever(vec![every_second], 5).await });
284 runners.join_all().await;
285 Ok(())
286}