micromegas_analytics/lakehouse/
streams_view.rs1use super::{
2 session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3 view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11pub async fn make_streams_view(
13 runtime: Arc<RuntimeEnv>,
14 lake: Arc<DataLakeConnection>,
15 view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17 let count_src_query = Arc::new(String::from(
18 r#"
19 SELECT count(*) as count
20 FROM blocks
21 WHERE insert_time >= '{begin}'
22 AND insert_time < '{end}'
23 ;"#,
24 ));
25 let transform_query = Arc::new(String::from(
26 r#"
27SELECT stream_id,
28 first_value("process_id") as process_id,
29 first_value("streams.dependencies_metadata") as dependencies_metadata,
30 first_value("streams.objects_metadata") as objects_metadata,
31 first_value("streams.tags") as tags,
32 first_value("streams.properties") as properties,
33 first_value("streams.insert_time") as insert_time,
34 first_value("streams.format") as format,
35 max(insert_time) as last_update_time
36FROM blocks
37GROUP BY stream_id
38 ;"#,
39 ));
40 let merge_query = Arc::new(String::from(
41 r#"
42SELECT stream_id,
43 first_value(process_id) as process_id,
44 first_value(dependencies_metadata) as dependencies_metadata,
45 first_value(objects_metadata) as objects_metadata,
46 first_value(tags) as tags,
47 first_value(properties) as properties,
48 first_value(insert_time) as insert_time,
49 first_value(format) as format,
50 max(last_update_time) as last_update_time
51FROM {source}
52GROUP BY stream_id
53 ;"#,
54 ));
55 let min_time_column = Arc::new(String::from("insert_time"));
56 let max_time_column = Arc::new(String::from("last_update_time"));
57 SqlBatchView::new(
58 runtime,
59 Arc::new("streams".to_owned()),
60 min_time_column,
61 max_time_column,
62 count_src_query,
63 transform_query,
64 merge_query,
65 lake,
66 view_factory,
67 Arc::new(NoOpSessionConfigurator),
68 Some(2000),
69 TimeDelta::days(1), TimeDelta::days(1), None,
72 )
73 .await
74}