micromegas/client/
perfetto_trace_client.rs1use super::flightsql_client::Client;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use datafusion::arrow::array::BinaryArray;
5use futures::stream::StreamExt;
6use micromegas_analytics::dfext::typed_column::typed_column_by_name;
7use micromegas_analytics::time::TimeRange;
8use tokio::fs::File;
9use tokio::io::AsyncWriteExt;
10
11pub enum SpanTypes {
13 Thread,
14 Async,
15 Both,
16}
17
18impl SpanTypes {
19 fn as_str(&self) -> &'static str {
20 match self {
21 SpanTypes::Thread => "thread",
22 SpanTypes::Async => "async",
23 SpanTypes::Both => "both",
24 }
25 }
26}
27
28pub async fn format_perfetto_trace(
36 client: &mut Client,
37 process_id: &str,
38 query_range: TimeRange,
39 span_types: SpanTypes,
40) -> Result<Vec<u8>> {
41 let sql = format!(
44 r#"
45 SELECT chunk_id, chunk_data
46 FROM perfetto_trace_chunks(
47 '{}',
48 '{}',
49 TIMESTAMP '{}',
50 TIMESTAMP '{}'
51 )
52 "#,
53 process_id,
54 span_types.as_str(),
55 query_range.begin.to_rfc3339(),
56 query_range.end.to_rfc3339()
57 );
58
59 let batches = client.query(sql, Some(query_range)).await?;
60
61 let mut trace_data = Vec::new();
63 for batch in batches {
64 let chunk_data: &BinaryArray = typed_column_by_name(&batch, "chunk_data")?;
65
66 for i in 0..batch.num_rows() {
68 let chunk = chunk_data.value(i);
69 trace_data.extend_from_slice(chunk);
70 }
71 }
72
73 if trace_data.is_empty() {
74 anyhow::bail!("No trace data generated for process {}", process_id);
75 }
76
77 Ok(trace_data)
78}
79
80pub fn format_perfetto_trace_stream<'a>(
88 client: &'a mut Client,
89 process_id: &'a str,
90 query_range: TimeRange,
91 span_types: SpanTypes,
92) -> impl futures::Stream<Item = Result<Vec<u8>>> + 'a {
93 let sql = format!(
94 r#"
95 SELECT chunk_id, chunk_data
96 FROM perfetto_trace_chunks(
97 '{}',
98 '{}',
99 TIMESTAMP '{}',
100 TIMESTAMP '{}'
101 )
102 "#,
103 process_id,
104 span_types.as_str(),
105 query_range.begin.to_rfc3339(),
106 query_range.end.to_rfc3339()
107 );
108
109 async_stream::stream! {
110 let mut batch_stream = match client.query_stream(sql, Some(query_range)).await {
111 Ok(stream) => stream,
112 Err(e) => {
113 yield Err(e);
114 return;
115 }
116 };
117
118 let mut has_data = false;
119 while let Some(batch_result) = batch_stream.next().await {
120 match batch_result {
121 Ok(batch) => {
122 let chunk_data: &BinaryArray = match typed_column_by_name(&batch, "chunk_data") {
123 Ok(col) => col,
124 Err(e) => {
125 yield Err(e);
126 return;
127 }
128 };
129
130 for i in 0..batch.num_rows() {
131 has_data = true;
132 yield Ok(chunk_data.value(i).to_vec());
133 }
134 }
135 Err(e) => {
136 yield Err(anyhow::anyhow!("Error reading batch: {}", e));
137 return;
138 }
139 }
140 }
141
142 if !has_data {
143 yield Err(anyhow::anyhow!("No trace data generated for process {}", process_id));
144 }
145 }
146}
147
148pub async fn write_perfetto_trace(
155 client: &mut Client,
156 process_id: &str,
157 begin: DateTime<Utc>,
158 end: DateTime<Utc>,
159 out_filename: &str,
160 span_types: SpanTypes,
161) -> Result<()> {
162 let buf =
163 format_perfetto_trace(client, process_id, TimeRange::new(begin, end), span_types).await?;
164 let mut file = File::create(out_filename).await?;
165 file.write_all(&buf).await?;
166 Ok(())
167}