micromegas/client/
query_processes.rs1use super::flightsql_client::Client;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use datafusion::arrow::array::RecordBatch;
5use micromegas_analytics::time::TimeRange;
6
7pub struct ProcessQueryBuilder {
13 filters: Vec<String>,
14 begin: Option<DateTime<Utc>>,
15 end: Option<DateTime<Utc>>,
16}
17
18impl ProcessQueryBuilder {
19 pub fn new() -> Self {
21 Self {
22 filters: vec![],
23 begin: None,
24 end: None,
25 }
26 }
27
28 pub fn with_process_id(mut self, process_id: &str) -> Self {
30 self.filters.push(format!("(process_id='{process_id}')"));
31 self
32 }
33
34 pub fn with_username(mut self, username: &str) -> Self {
36 self.filters
37 .push(format!(r#"("processes.username"='{username}')"#));
38 self
39 }
40
41 pub fn with_exe(mut self, exe: &str) -> Self {
43 self.filters.push(format!(r#"("processes.exe"='{exe}')"#));
44 self
45 }
46
47 pub fn since(mut self, begin: DateTime<Utc>) -> Self {
49 let iso = begin.to_rfc3339();
50 self.filters.push(format!(r#"("end_time" >= '{iso}')"#));
51 self.begin = Some(begin);
52 self
53 }
54
55 pub fn until(mut self, end: DateTime<Utc>) -> Self {
57 let iso = end.to_rfc3339();
58 self.filters.push(format!(r#"("begin_time" <= '{iso}')"#));
59 self.end = Some(end);
60 self
61 }
62
63 pub fn with_cpu_blocks(mut self) -> Self {
65 self.filters
66 .push(r#"array_has( "streams.tags", 'cpu' )"#.into());
67 self
68 }
69
70 pub fn with_thread_named(mut self, thread_name: &str) -> Self {
72 let filter =
73 format!(r#"property_get("streams.properties", 'thread-name') = '{thread_name}'"#);
74 self.filters.push(filter);
75 self
76 }
77
78 pub fn into_where(&self) -> String {
80 if self.filters.is_empty() {
81 String::from("")
82 } else {
83 format!("WHERE {}", self.filters.join(" AND "))
84 }
85 }
86
87 pub async fn query(self, client: &mut Client) -> Result<Vec<RecordBatch>> {
99 let sql_where = self.into_where();
100 let sql = format!(
101 r#"SELECT process_id,
102 min(begin_time) as begin,
103 max(end_time) as end,
104 "processes.exe" as exe,
105 "processes.properties" as properties,
106 "processes.computer" as computer,
107 "processes.username" as username,
108 "processes.cpu_brand" as cpu_brand,
109 "processes.distro" as distro
110 FROM blocks
111 {sql_where}
112 GROUP BY process_id, exe, properties, computer, username, cpu_brand, distro;"#
113 );
114 let query_time_range = if let (Some(begin), Some(end)) = (self.begin, self.end) {
115 Some(TimeRange::new(begin, end))
116 } else {
117 None
118 };
119 client.query(sql, query_time_range).await
120 }
121}
122
123impl Default for ProcessQueryBuilder {
124 fn default() -> Self {
125 Self::new()
126 }
127}