micromegas_analytics/lakehouse/
process_streams.rs

1use crate::dfext::string_column_accessor::string_column_by_name;
2
3/// Get thread information from the streams table for a given process.
4/// Returns (stream_id, thread_id_numeric, display_name) where display_name is "name-id" or just "id".
5pub async fn get_process_thread_list(
6    process_id: &str,
7    ctx: &datafusion::execution::context::SessionContext,
8) -> anyhow::Result<Vec<(String, i32, String)>> {
9    let sql = format!(
10        r#"
11        SELECT b.stream_id,
12               property_get("streams.properties", 'thread-name') as thread_name,
13               property_get("streams.properties", 'thread-id') as thread_id
14        FROM blocks b
15        WHERE b.process_id = '{process_id}'
16        AND array_has(b."streams.tags", 'cpu')
17        GROUP BY stream_id, thread_name, thread_id
18        ORDER BY stream_id
19        "#,
20    );
21
22    let df = ctx.sql(&sql).await?;
23    let batches = df.collect().await?;
24    let mut threads = Vec::new();
25
26    for batch in batches {
27        let stream_ids = string_column_by_name(&batch, "stream_id")?;
28        let thread_names = string_column_by_name(&batch, "thread_name")?;
29        let thread_ids = string_column_by_name(&batch, "thread_id")?;
30
31        for i in 0..batch.num_rows() {
32            let stream_id = stream_ids.value(i)?.to_owned();
33            let thread_name = thread_names.value(i)?;
34            let thread_id_str = thread_ids.value(i)?;
35
36            // thread_id falls back to stream_id if not available
37            let thread_id_for_display = if thread_id_str.is_empty() {
38                &stream_id
39            } else {
40                thread_id_str
41            };
42
43            // Parse numeric thread_id for Perfetto (use 0 if not parseable)
44            let thread_id_numeric = thread_id_str.parse::<i64>().unwrap_or(0) as i32;
45
46            // Build display name: "name-id" if name exists, otherwise just "id"
47            let display_name = if thread_name.is_empty() {
48                thread_id_for_display.to_owned()
49            } else {
50                format!("{thread_name}-{thread_id_for_display}")
51            };
52
53            threads.push((stream_id, thread_id_numeric, display_name));
54        }
55    }
56
57    Ok(threads)
58}