micromegas_analytics/lakehouse/
process_streams.rs1use crate::dfext::string_column_accessor::string_column_by_name;
2
3pub async fn get_process_thread_list(
6 process_id: &str,
7 ctx: &datafusion::execution::context::SessionContext,
8) -> anyhow::Result<Vec<(String, i32, String)>> {
9 let sql = format!(
10 r#"
11 SELECT b.stream_id,
12 property_get("streams.properties", 'thread-name') as thread_name,
13 property_get("streams.properties", 'thread-id') as thread_id
14 FROM blocks b
15 WHERE b.process_id = '{process_id}'
16 AND array_has(b."streams.tags", 'cpu')
17 GROUP BY stream_id, thread_name, thread_id
18 ORDER BY stream_id
19 "#,
20 );
21
22 let df = ctx.sql(&sql).await?;
23 let batches = df.collect().await?;
24 let mut threads = Vec::new();
25
26 for batch in batches {
27 let stream_ids = string_column_by_name(&batch, "stream_id")?;
28 let thread_names = string_column_by_name(&batch, "thread_name")?;
29 let thread_ids = string_column_by_name(&batch, "thread_id")?;
30
31 for i in 0..batch.num_rows() {
32 let stream_id = stream_ids.value(i)?.to_owned();
33 let thread_name = thread_names.value(i)?;
34 let thread_id_str = thread_ids.value(i)?;
35
36 let thread_id_for_display = if thread_id_str.is_empty() {
38 &stream_id
39 } else {
40 thread_id_str
41 };
42
43 let thread_id_numeric = thread_id_str.parse::<i64>().unwrap_or(0) as i32;
45
46 let display_name = if thread_name.is_empty() {
48 thread_id_for_display.to_owned()
49 } else {
50 format!("{thread_name}-{thread_id_for_display}")
51 };
52
53 threads.push((stream_id, thread_id_numeric, display_name));
54 }
55 }
56
57 Ok(threads)
58}