1use std::collections::HashMap;
2
3use crate::async_writer::AsyncWriter;
4use crate::protos::{
5 EventCategory, EventName, ProcessDescriptor, SourceLocation, ThreadDescriptor, TracePacket,
6 TrackEvent,
7 trace_packet::Data,
8 track_event::{self, NameField, SourceLocationField},
9};
10
11use crate::utils::{new_interned_data, new_trace_packet, new_track_descriptor, new_track_event};
12use prost::{
13 Message,
14 encoding::{WireType, encode_key, encode_varint},
15};
16use xxhash_rust::xxh64::xxh64;
17
18const TRACE_PACKET_FIELD_NUMBER: u32 = 1;
25
26pub struct PerfettoWriter {
29 writer: Box<dyn AsyncWriter + Send>,
30 pid: i32, process_uuid: u64, current_thread_uuid: Option<u64>,
33 async_track_uuid: Option<u64>, names: HashMap<String, u64>,
35 categories: HashMap<String, u64>,
36 source_locations: HashMap<(String, u32), u64>,
37}
38
39impl PerfettoWriter {
40 pub fn new(writer: Box<dyn AsyncWriter + Send>, micromegas_process_id: &str) -> Self {
42 let process_uuid = xxh64(micromegas_process_id.as_bytes(), 0);
43 let pid = process_uuid as i32;
44 Self {
45 writer,
46 pid,
47 process_uuid,
48 current_thread_uuid: None,
49 async_track_uuid: None,
50 names: HashMap::new(),
51 categories: HashMap::new(),
52 source_locations: HashMap::new(),
53 }
54 }
55
56 pub async fn write_packet(&mut self, packet: TracePacket) -> anyhow::Result<()> {
58 let mut packet_buf = Vec::new();
59 packet.encode(&mut packet_buf)?;
61
62 let mut framing_buf = Vec::new();
63 encode_key(
65 TRACE_PACKET_FIELD_NUMBER,
66 WireType::LengthDelimited,
67 &mut framing_buf,
68 );
69 encode_varint(packet_buf.len() as u64, &mut framing_buf);
70
71 self.writer.write(&framing_buf).await?;
73 self.writer.write(&packet_buf).await?;
74
75 Ok(())
76 }
77
78 fn set_name(&mut self, name: &str, packet: &mut TracePacket, event: &mut TrackEvent) {
79 if let Some(id) = self.names.get(name) {
80 event.name_field = Some(NameField::NameIid(*id));
81 } else {
82 let id = self.names.len() as u64 + 1;
83 self.names.insert(name.to_owned(), id);
84 event.name_field = Some(NameField::NameIid(id));
85 if packet.interned_data.is_none() {
86 packet.interned_data = Some(new_interned_data());
87 }
88 packet
89 .interned_data
90 .as_mut()
91 .unwrap()
92 .event_names
93 .push(EventName {
94 iid: Some(id),
95 name: Some(name.to_owned()),
96 });
97 }
98 }
99
100 fn set_category(&mut self, category: &str, packet: &mut TracePacket, event: &mut TrackEvent) {
101 if let Some(id) = self.categories.get(category) {
102 event.category_iids.push(*id);
103 } else {
104 let id = self.categories.len() as u64 + 1;
105 self.categories.insert(category.to_owned(), id);
106 event.category_iids.push(id);
107 if packet.interned_data.is_none() {
108 packet.interned_data = Some(new_interned_data());
109 }
110 packet
111 .interned_data
112 .as_mut()
113 .unwrap()
114 .event_categories
115 .push(EventCategory {
116 iid: Some(id),
117 name: Some(category.to_owned()),
118 });
119 }
120 }
121
122 fn set_source_location(
123 &mut self,
124 file: &str,
125 line: u32,
126 packet: &mut TracePacket,
127 event: &mut TrackEvent,
128 ) {
129 let key = (file.to_string(), line);
130 if let Some(id) = self.source_locations.get(&key) {
131 event.source_location_field = Some(SourceLocationField::SourceLocationIid(*id));
132 } else {
133 let id = self.source_locations.len() as u64 + 1;
134 self.source_locations.insert(key, id);
135 event.source_location_field = Some(SourceLocationField::SourceLocationIid(id));
136 if packet.interned_data.is_none() {
137 packet.interned_data = Some(new_interned_data());
138 }
139 packet
140 .interned_data
141 .as_mut()
142 .unwrap()
143 .source_locations
144 .push(SourceLocation {
145 iid: Some(id),
146 file_name: Some(file.to_owned()),
147 line_number: Some(line),
148 function_name: None,
149 });
150 }
151 }
152
153 pub async fn emit_process_descriptor(&mut self, exe: &str) -> anyhow::Result<()> {
155 let mut process_track = new_track_descriptor(self.process_uuid);
156 process_track.process = Some(ProcessDescriptor {
157 pid: Some(self.pid),
158 cmdline: vec![],
159 process_name: Some(exe.into()),
160 process_priority: None,
161 start_timestamp_ns: None,
162 chrome_process_type: None,
163 legacy_sort_index: None,
164 process_labels: vec![],
165 });
166 let mut packet = new_trace_packet();
167 packet.data = Some(Data::TrackDescriptor(process_track));
168 packet.first_packet_on_sequence = Some(true);
169 packet.sequence_flags = Some(3);
170 self.write_packet(packet).await
171 }
172
173 pub async fn emit_thread_descriptor(
175 &mut self,
176 stream_id: &str,
177 thread_id: i32,
178 thread_name: &str,
179 ) -> anyhow::Result<()> {
180 let thread_uuid = xxh64(stream_id.as_bytes(), 0);
181 self.current_thread_uuid = Some(thread_uuid);
182 let mut thread_track = new_track_descriptor(thread_uuid);
183 thread_track.parent_uuid = Some(self.process_uuid);
184 thread_track.thread = Some(ThreadDescriptor {
185 pid: Some(self.pid),
186 tid: Some(thread_id),
187 thread_name: Some(thread_name.into()),
188 chrome_thread_type: None,
189 reference_timestamp_us: None,
190 reference_thread_time_us: None,
191 reference_thread_instruction_count: None,
192 legacy_sort_index: None,
193 });
194 let mut packet = new_trace_packet();
195 packet.data = Some(Data::TrackDescriptor(thread_track));
196 self.write_packet(packet).await
197 }
198
199 pub fn set_current_thread(&mut self, stream_id: &str) {
202 let thread_uuid = xxh64(stream_id.as_bytes(), 0);
203 self.current_thread_uuid = Some(thread_uuid);
204 }
205
206 pub async fn emit_async_track_descriptor(&mut self) -> anyhow::Result<()> {
208 if self.async_track_uuid.is_some() {
209 return Ok(()); }
211
212 let async_track_uuid = xxh64("async_track".as_bytes(), self.process_uuid);
213 self.async_track_uuid = Some(async_track_uuid);
214
215 let mut async_track = new_track_descriptor(async_track_uuid);
216 async_track.parent_uuid = Some(self.process_uuid);
217 async_track.static_or_dynamic_name =
218 Some(crate::protos::track_descriptor::StaticOrDynamicName::Name(
219 "Async Operations".to_owned(),
220 ));
221
222 let mut packet = new_trace_packet();
223 packet.data = Some(Data::TrackDescriptor(async_track));
224 self.write_packet(packet).await
225 }
226
227 fn init_span_event(
229 &mut self,
230 name: &str,
231 target: &str,
232 filename: &str,
233 line: u32,
234 packet: &mut TracePacket,
235 mut track_event: TrackEvent,
236 ) {
237 track_event.track_uuid = self.current_thread_uuid;
238 self.set_name(name, packet, &mut track_event);
239 self.set_category(target, packet, &mut track_event);
240 self.set_source_location(filename, line, packet, &mut track_event);
241 packet.data = Some(Data::TrackEvent(track_event));
242 }
243
244 fn init_async_span_event(
246 &mut self,
247 name: &str,
248 target: &str,
249 filename: &str,
250 line: u32,
251 packet: &mut TracePacket,
252 mut track_event: TrackEvent,
253 ) {
254 assert!(
255 self.async_track_uuid.is_some(),
256 "Must call emit_async_track_descriptor() before emitting async span events"
257 );
258
259 track_event.track_uuid = self.async_track_uuid;
260 self.set_name(name, packet, &mut track_event);
261 self.set_category(target, packet, &mut track_event);
262 self.set_source_location(filename, line, packet, &mut track_event);
263 packet.data = Some(Data::TrackEvent(track_event));
264 }
265
266 pub async fn emit_span(
268 &mut self,
269 begin_ns: u64,
270 end_ns: u64,
271 name: &str,
272 target: &str,
273 filename: &str,
274 line: u32,
275 ) -> anyhow::Result<()> {
276 let mut packet = new_trace_packet();
278 packet.timestamp = Some(begin_ns);
279 let mut track_event = new_track_event();
280 track_event.r#type = Some(track_event::Type::SliceBegin.into());
281 self.init_span_event(name, target, filename, line, &mut packet, track_event);
282 self.write_packet(packet).await?;
283
284 let mut packet = new_trace_packet();
286 packet.timestamp = Some(end_ns);
287 let mut track_event = new_track_event();
288 track_event.r#type = Some(track_event::Type::SliceEnd.into());
289 self.init_span_event(name, target, filename, line, &mut packet, track_event);
290 self.write_packet(packet).await?;
291
292 Ok(())
293 }
294
295 pub async fn emit_async_span_begin(
297 &mut self,
298 timestamp_ns: u64,
299 name: &str,
300 target: &str,
301 filename: &str,
302 line: u32,
303 ) -> anyhow::Result<()> {
304 let mut packet = new_trace_packet();
305 packet.timestamp = Some(timestamp_ns);
306 let mut track_event = new_track_event();
307 track_event.r#type = Some(track_event::Type::SliceBegin.into());
308 self.init_async_span_event(name, target, filename, line, &mut packet, track_event);
309 self.write_packet(packet).await
310 }
311
312 pub async fn emit_async_span_end(
314 &mut self,
315 timestamp_ns: u64,
316 name: &str,
317 target: &str,
318 filename: &str,
319 line: u32,
320 ) -> anyhow::Result<()> {
321 let mut packet = new_trace_packet();
322 packet.timestamp = Some(timestamp_ns);
323 let mut track_event = new_track_event();
324 track_event.r#type = Some(track_event::Type::SliceEnd.into());
325 self.init_async_span_event(name, target, filename, line, &mut packet, track_event);
326 self.write_packet(packet).await
327 }
328
329 pub async fn flush(&mut self) -> anyhow::Result<()> {
331 self.writer.flush().await
332 }
333
334 pub fn into_inner(self) -> Box<dyn AsyncWriter + Send> {
337 self.writer
338 }
339}