micromegas/client/
query_processes.rs

1use super::flightsql_client::Client;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use datafusion::arrow::array::RecordBatch;
5use micromegas_analytics::time::TimeRange;
6
7/// Builder for creating process queries with various filters.
8///
9/// This builder allows you to construct queries to find processes based on
10/// various criteria like process ID, username, executable name, time ranges,
11/// and whether they contain CPU blocks or specific thread names.
12pub struct ProcessQueryBuilder {
13    filters: Vec<String>,
14    begin: Option<DateTime<Utc>>,
15    end: Option<DateTime<Utc>>,
16}
17
18impl ProcessQueryBuilder {
19    /// Creates a new ProcessQueryBuilder with no filters.
20    pub fn new() -> Self {
21        Self {
22            filters: vec![],
23            begin: None,
24            end: None,
25        }
26    }
27
28    /// Filters processes by exact process ID.
29    pub fn with_process_id(mut self, process_id: &str) -> Self {
30        self.filters.push(format!("(process_id='{process_id}')"));
31        self
32    }
33
34    /// Filters processes by exact username.
35    pub fn with_username(mut self, username: &str) -> Self {
36        self.filters
37            .push(format!(r#"("processes.username"='{username}')"#));
38        self
39    }
40
41    /// Filters processes by exact executable name.
42    pub fn with_exe(mut self, exe: &str) -> Self {
43        self.filters.push(format!(r#"("processes.exe"='{exe}')"#));
44        self
45    }
46
47    /// Filters processes that ended at or after the given time.
48    pub fn since(mut self, begin: DateTime<Utc>) -> Self {
49        let iso = begin.to_rfc3339();
50        self.filters.push(format!(r#"("end_time" >= '{iso}')"#));
51        self.begin = Some(begin);
52        self
53    }
54
55    /// Filters processes that began at or before the given time.
56    pub fn until(mut self, end: DateTime<Utc>) -> Self {
57        let iso = end.to_rfc3339();
58        self.filters.push(format!(r#"("begin_time" <= '{iso}')"#));
59        self.end = Some(end);
60        self
61    }
62
63    /// Filters processes that have CPU blocks (contain telemetry streams tagged with 'cpu').
64    pub fn with_cpu_blocks(mut self) -> Self {
65        self.filters
66            .push(r#"array_has( "streams.tags", 'cpu' )"#.into());
67        self
68    }
69
70    /// Filters processes that have a thread with the specified name.
71    pub fn with_thread_named(mut self, thread_name: &str) -> Self {
72        let filter =
73            format!(r#"property_get("streams.properties", 'thread-name') = '{thread_name}'"#);
74        self.filters.push(filter);
75        self
76    }
77
78    /// Converts the filters into a SQL WHERE clause.
79    pub fn into_where(&self) -> String {
80        if self.filters.is_empty() {
81            String::from("")
82        } else {
83            format!("WHERE {}", self.filters.join(" AND "))
84        }
85    }
86
87    /// Executes the query and returns the matching processes.
88    ///
89    /// Returns a vector of RecordBatch containing process information including:
90    /// - process_id
91    /// - begin/end times
92    /// - exe (executable name)
93    /// - properties
94    /// - computer
95    /// - username
96    /// - cpu_brand
97    /// - distro
98    pub async fn query(self, client: &mut Client) -> Result<Vec<RecordBatch>> {
99        let sql_where = self.into_where();
100        let sql = format!(
101            r#"SELECT process_id,
102                      min(begin_time) as begin,
103                      max(end_time) as end,
104                      "processes.exe" as exe,
105                      "processes.properties" as properties,
106                      "processes.computer" as computer,
107                      "processes.username" as username,
108                      "processes.cpu_brand" as cpu_brand,
109                      "processes.distro" as distro
110            FROM blocks
111            {sql_where}
112            GROUP BY process_id, exe, properties, computer, username, cpu_brand, distro;"#
113        );
114        let query_time_range = if let (Some(begin), Some(end)) = (self.begin, self.end) {
115            Some(TimeRange::new(begin, end))
116        } else {
117            None
118        };
119        client.query(sql, query_time_range).await
120    }
121}
122
123impl Default for ProcessQueryBuilder {
124    fn default() -> Self {
125        Self::new()
126    }
127}