micromegas/client/
perfetto_trace_client.rs

1use super::flightsql_client::Client;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use datafusion::arrow::array::BinaryArray;
5use futures::stream::StreamExt;
6use micromegas_analytics::dfext::typed_column::typed_column_by_name;
7use micromegas_analytics::time::TimeRange;
8use tokio::fs::File;
9use tokio::io::AsyncWriteExt;
10
11/// Span types to include in the trace
12pub enum SpanTypes {
13    Thread,
14    Async,
15    Both,
16}
17
18impl SpanTypes {
19    fn as_str(&self) -> &'static str {
20        match self {
21            SpanTypes::Thread => "thread",
22            SpanTypes::Async => "async",
23            SpanTypes::Both => "both",
24        }
25    }
26}
27
28/// Formats a Perfetto trace with configurable span types using server-side perfetto_trace_chunks function.
29///
30/// This function queries the FlightSQL server using the perfetto_trace_chunks table function
31/// which generates Perfetto trace data server-side and streams it back as binary chunks.
32///
33/// # Arguments
34/// * `span_types` - Types of spans to include: Thread, Async, or Both
35pub async fn format_perfetto_trace(
36    client: &mut Client,
37    process_id: &str,
38    query_range: TimeRange,
39    span_types: SpanTypes,
40) -> Result<Vec<u8>> {
41    // Use the perfetto_trace_chunks table function to get binary chunks
42    // Note: ORDER BY not needed since chunks are naturally produced in order (0, 1, 2, ...)
43    let sql = format!(
44        r#"
45        SELECT chunk_id, chunk_data
46        FROM perfetto_trace_chunks(
47            '{}',
48            '{}',
49            TIMESTAMP '{}',
50            TIMESTAMP '{}'
51        )
52        "#,
53        process_id,
54        span_types.as_str(),
55        query_range.begin.to_rfc3339(),
56        query_range.end.to_rfc3339()
57    );
58
59    let batches = client.query(sql, Some(query_range)).await?;
60
61    // Collect all chunks and reassemble them in order
62    let mut trace_data = Vec::new();
63    for batch in batches {
64        let chunk_data: &BinaryArray = typed_column_by_name(&batch, "chunk_data")?;
65
66        // Chunks are already in order from server-side generation
67        for i in 0..batch.num_rows() {
68            let chunk = chunk_data.value(i);
69            trace_data.extend_from_slice(chunk);
70        }
71    }
72
73    if trace_data.is_empty() {
74        anyhow::bail!("No trace data generated for process {}", process_id);
75    }
76
77    Ok(trace_data)
78}
79
80/// Streams Perfetto trace chunks as they arrive from the server.
81///
82/// This function is useful for showing download progress to users since chunks
83/// are yielded as soon as they're received rather than buffered.
84///
85/// # Arguments
86/// * `span_types` - Types of spans to include: Thread, Async, or Both
87pub fn format_perfetto_trace_stream<'a>(
88    client: &'a mut Client,
89    process_id: &'a str,
90    query_range: TimeRange,
91    span_types: SpanTypes,
92) -> impl futures::Stream<Item = Result<Vec<u8>>> + 'a {
93    let sql = format!(
94        r#"
95        SELECT chunk_id, chunk_data
96        FROM perfetto_trace_chunks(
97            '{}',
98            '{}',
99            TIMESTAMP '{}',
100            TIMESTAMP '{}'
101        )
102        "#,
103        process_id,
104        span_types.as_str(),
105        query_range.begin.to_rfc3339(),
106        query_range.end.to_rfc3339()
107    );
108
109    async_stream::stream! {
110        let mut batch_stream = match client.query_stream(sql, Some(query_range)).await {
111            Ok(stream) => stream,
112            Err(e) => {
113                yield Err(e);
114                return;
115            }
116        };
117
118        let mut has_data = false;
119        while let Some(batch_result) = batch_stream.next().await {
120            match batch_result {
121                Ok(batch) => {
122                    let chunk_data: &BinaryArray = match typed_column_by_name(&batch, "chunk_data") {
123                        Ok(col) => col,
124                        Err(e) => {
125                            yield Err(e);
126                            return;
127                        }
128                    };
129
130                    for i in 0..batch.num_rows() {
131                        has_data = true;
132                        yield Ok(chunk_data.value(i).to_vec());
133                    }
134                }
135                Err(e) => {
136                    yield Err(anyhow::anyhow!("Error reading batch: {}", e));
137                    return;
138                }
139            }
140        }
141
142        if !has_data {
143            yield Err(anyhow::anyhow!("No trace data generated for process {}", process_id));
144        }
145    }
146}
147
148/// Writes a Perfetto trace to a file with configurable span types.
149///
150/// This function generates traces with thread spans, async spans, or both.
151///
152/// # Arguments
153/// * `span_types` - Types of spans to include: Thread, Async, or Both
154pub async fn write_perfetto_trace(
155    client: &mut Client,
156    process_id: &str,
157    begin: DateTime<Utc>,
158    end: DateTime<Utc>,
159    out_filename: &str,
160    span_types: SpanTypes,
161) -> Result<()> {
162    let buf =
163        format_perfetto_trace(client, process_id, TimeRange::new(begin, end), span_types).await?;
164    let mut file = File::create(out_filename).await?;
165    file.write_all(&buf).await?;
166    Ok(())
167}