micromegas_analytics/lakehouse/
processes_view.rs1use super::{
2 session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3 view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11pub async fn make_processes_view(
13 runtime: Arc<RuntimeEnv>,
14 lake: Arc<DataLakeConnection>,
15 view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17 let count_src_query = Arc::new(String::from(
18 r#"
19 SELECT count(*) as count
20 FROM blocks
21 WHERE insert_time >= '{begin}'
22 AND insert_time < '{end}'
23 ;"#,
24 ));
25 let transform_query = Arc::new(String::from(
26 r#"
27SELECT process_id,
28 first_value("processes.exe") as exe,
29 first_value("processes.username") as username,
30 first_value("processes.realname") as realname,
31 first_value("processes.computer") as computer,
32 first_value("processes.distro") as distro,
33 first_value("processes.cpu_brand") as cpu_brand,
34 first_value("processes.tsc_frequency") as tsc_frequency,
35 first_value("processes.start_time") as start_time,
36 first_value("processes.start_ticks") as start_ticks,
37 first_value("processes.insert_time") as insert_time,
38 first_value("processes.parent_process_id") as parent_process_id,
39 first_value("processes.properties") as properties,
40 max(insert_time) as last_update_time,
41 max(end_ticks) as last_block_end_ticks,
42 max(end_time) as last_block_end_time
43FROM blocks
44GROUP BY process_id
45 ;"#,
46 ));
47 let merge_query = Arc::new(String::from(
48 r#"
49SELECT process_id,
50 first_value("exe") as exe,
51 first_value("username") as username,
52 first_value("realname") as realname,
53 first_value("computer") as computer,
54 first_value("distro") as distro,
55 first_value("cpu_brand") as cpu_brand,
56 first_value("tsc_frequency") as tsc_frequency,
57 first_value("start_time") as start_time,
58 first_value("start_ticks") as start_ticks,
59 first_value("insert_time") as insert_time,
60 first_value("parent_process_id") as parent_process_id,
61 first_value("properties") as properties,
62 max(last_update_time) as last_update_time,
63 max(last_block_end_ticks) as last_block_end_ticks,
64 max(last_block_end_time) as last_block_end_time
65FROM {source}
66GROUP BY process_id
67 ;"#,
68 ));
69 let min_time_column = Arc::new(String::from("insert_time"));
70 let max_time_column = Arc::new(String::from("last_update_time"));
71 SqlBatchView::new(
72 runtime,
73 Arc::new("processes".to_owned()),
74 min_time_column,
75 max_time_column,
76 count_src_query,
77 transform_query,
78 merge_query,
79 lake,
80 view_factory,
81 Arc::new(NoOpSessionConfigurator),
82 Some(2000),
83 TimeDelta::days(1), TimeDelta::days(1), None,
86 )
87 .await
88}