micromegas_analytics/lakehouse/
processes_view.rs

1use super::{
2    session_configurator::NoOpSessionConfigurator, sql_batch_view::SqlBatchView,
3    view_factory::ViewFactory,
4};
5use anyhow::Result;
6use chrono::TimeDelta;
7use datafusion::execution::runtime_env::RuntimeEnv;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use std::sync::Arc;
10
11/// Creates a new `SqlBatchView` for processes.
12pub async fn make_processes_view(
13    runtime: Arc<RuntimeEnv>,
14    lake: Arc<DataLakeConnection>,
15    view_factory: Arc<ViewFactory>,
16) -> Result<SqlBatchView> {
17    let count_src_query = Arc::new(String::from(
18        r#"
19        SELECT count(*) as count
20        FROM  blocks
21        WHERE insert_time >= '{begin}'
22        AND   insert_time < '{end}'
23        ;"#,
24    ));
25    let transform_query = Arc::new(String::from(
26        r#"
27SELECT process_id,
28       first_value("processes.exe") as exe,
29       first_value("processes.username") as username,
30       first_value("processes.realname") as realname,
31       first_value("processes.computer") as computer,
32       first_value("processes.distro") as distro,
33       first_value("processes.cpu_brand") as cpu_brand,
34       first_value("processes.tsc_frequency") as tsc_frequency,
35       first_value("processes.start_time") as start_time,
36       first_value("processes.start_ticks") as start_ticks,
37       first_value("processes.insert_time") as insert_time,
38       first_value("processes.parent_process_id") as parent_process_id,
39       first_value("processes.properties") as properties,
40       max(insert_time) as last_update_time,
41       max(end_ticks) as last_block_end_ticks,
42       max(end_time) as last_block_end_time
43FROM blocks
44GROUP BY process_id
45        ;"#,
46    ));
47    let merge_query = Arc::new(String::from(
48        r#"
49SELECT process_id,
50       first_value("exe") as exe,
51       first_value("username") as username,
52       first_value("realname") as realname,
53       first_value("computer") as computer,
54       first_value("distro") as distro,
55       first_value("cpu_brand") as cpu_brand,
56       first_value("tsc_frequency") as tsc_frequency,
57       first_value("start_time") as start_time,
58       first_value("start_ticks") as start_ticks,
59       first_value("insert_time") as insert_time,
60       first_value("parent_process_id") as parent_process_id,
61       first_value("properties") as properties,
62       max(last_update_time) as last_update_time,
63       max(last_block_end_ticks) as last_block_end_ticks,
64       max(last_block_end_time) as last_block_end_time
65FROM {source}
66GROUP BY process_id
67        ;"#,
68    ));
69    let min_time_column = Arc::new(String::from("insert_time"));
70    let max_time_column = Arc::new(String::from("last_update_time"));
71    SqlBatchView::new(
72        runtime,
73        Arc::new("processes".to_owned()),
74        min_time_column,
75        max_time_column,
76        count_src_query,
77        transform_query,
78        merge_query,
79        lake,
80        view_factory,
81        Arc::new(NoOpSessionConfigurator),
82        Some(2000),
83        TimeDelta::days(1), // from source
84        TimeDelta::days(1), // when merging
85        None,
86    )
87    .await
88}