micromegas_analytics/lakehouse/
streams_view.rs1use super::{
2 session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3 view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11pub async fn make_streams_view(
13 runtime: Arc<RuntimeEnv>,
14 lake: Arc<DataLakeConnection>,
15 view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17 let count_src_query = Arc::new(String::from(
18 r#"
19 SELECT count(*) as count
20 FROM blocks
21 WHERE insert_time >= '{begin}'
22 AND insert_time < '{end}'
23 ;"#,
24 ));
25 let transform_query = Arc::new(String::from(
26 r#"
27SELECT stream_id,
28 first_value("process_id") as process_id,
29 first_value("streams.dependencies_metadata") as dependencies_metadata,
30 first_value("streams.objects_metadata") as objects_metadata,
31 first_value("streams.tags") as tags,
32 first_value("streams.properties") as properties,
33 first_value("streams.insert_time") as insert_time,
34 max(insert_time) as last_update_time
35FROM blocks
36GROUP BY stream_id
37 ;"#,
38 ));
39 let merge_query = Arc::new(String::from(
40 r#"
41SELECT stream_id,
42 first_value(process_id) as process_id,
43 first_value(dependencies_metadata) as dependencies_metadata,
44 first_value(objects_metadata) as objects_metadata,
45 first_value(tags) as tags,
46 first_value(properties) as properties,
47 first_value(insert_time) as insert_time,
48 max(last_update_time) as last_update_time
49FROM {source}
50GROUP BY stream_id
51 ;"#,
52 ));
53 let min_time_column = Arc::new(String::from("insert_time"));
54 let max_time_column = Arc::new(String::from("last_update_time"));
55 SqlBatchView::new(
56 runtime,
57 Arc::new("streams".to_owned()),
58 min_time_column,
59 max_time_column,
60 count_src_query,
61 transform_query,
62 merge_query,
63 lake,
64 view_factory,
65 Arc::new(NoOpSessionConfigurator),
66 Some(2000),
67 TimeDelta::days(1), TimeDelta::days(1), None,
70 )
71 .await
72}