micromegas_perfetto/
streaming_writer.rs

1use std::collections::HashMap;
2
3use crate::async_writer::AsyncWriter;
4use crate::protos::{
5    EventCategory, EventName, ProcessDescriptor, SourceLocation, ThreadDescriptor, TracePacket,
6    TrackEvent,
7    trace_packet::Data,
8    track_event::{self, NameField, SourceLocationField},
9};
10
11use crate::utils::{new_interned_data, new_trace_packet, new_track_descriptor, new_track_event};
12use prost::{
13    Message,
14    encoding::{WireType, encode_key, encode_varint},
15};
16use xxhash_rust::xxh64::xxh64;
17
18// Protobuf field numbers for Trace message
19// Note: This corresponds to the "packet" field in the Trace message:
20//   #[prost(message, repeated, tag = "1")]
21//   pub packet: ::prost::alloc::vec::Vec<TracePacket>,
22// Prost doesn't generate field number constants by default, so we define it manually.
23// This is unlikely to change as it would break protobuf compatibility.
24const TRACE_PACKET_FIELD_NUMBER: u32 = 1;
25
26/// A writer for Perfetto traces that writes packets through an AsyncWriter.
27/// Uses the AsyncWriter trait to abstract the underlying data sink.
28pub struct PerfettoWriter {
29    writer: Box<dyn AsyncWriter + Send>,
30    pid: i32,          // derived from micromegas's process_id using a hash function
31    process_uuid: u64, // derived from micromegas's process_id using a hash function
32    current_thread_uuid: Option<u64>,
33    async_track_uuid: Option<u64>, // Single async track UUID for all async spans
34    names: HashMap<String, u64>,
35    categories: HashMap<String, u64>,
36    source_locations: HashMap<(String, u32), u64>,
37}
38
39impl PerfettoWriter {
40    /// Creates a new `PerfettoWriter` instance.
41    pub fn new(writer: Box<dyn AsyncWriter + Send>, micromegas_process_id: &str) -> Self {
42        let process_uuid = xxh64(micromegas_process_id.as_bytes(), 0);
43        let pid = process_uuid as i32;
44        Self {
45            writer,
46            pid,
47            process_uuid,
48            current_thread_uuid: None,
49            async_track_uuid: None,
50            names: HashMap::new(),
51            categories: HashMap::new(),
52            source_locations: HashMap::new(),
53        }
54    }
55
56    /// Writes a single TracePacket to the chunk sender with proper protobuf framing.
57    pub async fn write_packet(&mut self, packet: TracePacket) -> anyhow::Result<()> {
58        let mut packet_buf = Vec::new();
59        // Encode the packet to get its bytes
60        packet.encode(&mut packet_buf)?;
61
62        let mut framing_buf = Vec::new();
63        // Use prost's encoding functions to write the field tag and length
64        encode_key(
65            TRACE_PACKET_FIELD_NUMBER,
66            WireType::LengthDelimited,
67            &mut framing_buf,
68        );
69        encode_varint(packet_buf.len() as u64, &mut framing_buf);
70
71        // Write the framing and packet data to the writer
72        self.writer.write(&framing_buf).await?;
73        self.writer.write(&packet_buf).await?;
74
75        Ok(())
76    }
77
78    fn set_name(&mut self, name: &str, packet: &mut TracePacket, event: &mut TrackEvent) {
79        if let Some(id) = self.names.get(name) {
80            event.name_field = Some(NameField::NameIid(*id));
81        } else {
82            let id = self.names.len() as u64 + 1;
83            self.names.insert(name.to_owned(), id);
84            event.name_field = Some(NameField::NameIid(id));
85            if packet.interned_data.is_none() {
86                packet.interned_data = Some(new_interned_data());
87            }
88            packet
89                .interned_data
90                .as_mut()
91                .unwrap()
92                .event_names
93                .push(EventName {
94                    iid: Some(id),
95                    name: Some(name.to_owned()),
96                });
97        }
98    }
99
100    fn set_category(&mut self, category: &str, packet: &mut TracePacket, event: &mut TrackEvent) {
101        if let Some(id) = self.categories.get(category) {
102            event.category_iids.push(*id);
103        } else {
104            let id = self.categories.len() as u64 + 1;
105            self.categories.insert(category.to_owned(), id);
106            event.category_iids.push(id);
107            if packet.interned_data.is_none() {
108                packet.interned_data = Some(new_interned_data());
109            }
110            packet
111                .interned_data
112                .as_mut()
113                .unwrap()
114                .event_categories
115                .push(EventCategory {
116                    iid: Some(id),
117                    name: Some(category.to_owned()),
118                });
119        }
120    }
121
122    fn set_source_location(
123        &mut self,
124        file: &str,
125        line: u32,
126        packet: &mut TracePacket,
127        event: &mut TrackEvent,
128    ) {
129        let key = (file.to_string(), line);
130        if let Some(id) = self.source_locations.get(&key) {
131            event.source_location_field = Some(SourceLocationField::SourceLocationIid(*id));
132        } else {
133            let id = self.source_locations.len() as u64 + 1;
134            self.source_locations.insert(key, id);
135            event.source_location_field = Some(SourceLocationField::SourceLocationIid(id));
136            if packet.interned_data.is_none() {
137                packet.interned_data = Some(new_interned_data());
138            }
139            packet
140                .interned_data
141                .as_mut()
142                .unwrap()
143                .source_locations
144                .push(SourceLocation {
145                    iid: Some(id),
146                    file_name: Some(file.to_owned()),
147                    line_number: Some(line),
148                    function_name: None,
149                });
150        }
151    }
152
153    /// Emits a process descriptor packet to the stream.
154    pub async fn emit_process_descriptor(&mut self, exe: &str) -> anyhow::Result<()> {
155        let mut process_track = new_track_descriptor(self.process_uuid);
156        process_track.process = Some(ProcessDescriptor {
157            pid: Some(self.pid),
158            cmdline: vec![],
159            process_name: Some(exe.into()),
160            process_priority: None,
161            start_timestamp_ns: None,
162            chrome_process_type: None,
163            legacy_sort_index: None,
164            process_labels: vec![],
165        });
166        let mut packet = new_trace_packet();
167        packet.data = Some(Data::TrackDescriptor(process_track));
168        packet.first_packet_on_sequence = Some(true);
169        packet.sequence_flags = Some(3);
170        self.write_packet(packet).await
171    }
172
173    /// Emits a thread descriptor packet to the stream.
174    pub async fn emit_thread_descriptor(
175        &mut self,
176        stream_id: &str,
177        thread_id: i32,
178        thread_name: &str,
179    ) -> anyhow::Result<()> {
180        let thread_uuid = xxh64(stream_id.as_bytes(), 0);
181        self.current_thread_uuid = Some(thread_uuid);
182        let mut thread_track = new_track_descriptor(thread_uuid);
183        thread_track.parent_uuid = Some(self.process_uuid);
184        thread_track.thread = Some(ThreadDescriptor {
185            pid: Some(self.pid),
186            tid: Some(thread_id),
187            thread_name: Some(thread_name.into()),
188            chrome_thread_type: None,
189            reference_timestamp_us: None,
190            reference_thread_time_us: None,
191            reference_thread_instruction_count: None,
192            legacy_sort_index: None,
193        });
194        let mut packet = new_trace_packet();
195        packet.data = Some(Data::TrackDescriptor(thread_track));
196        self.write_packet(packet).await
197    }
198
199    /// Sets the current thread for subsequent span emissions.
200    /// Must be called before emitting spans for a specific thread.
201    pub fn set_current_thread(&mut self, stream_id: &str) {
202        let thread_uuid = xxh64(stream_id.as_bytes(), 0);
203        self.current_thread_uuid = Some(thread_uuid);
204    }
205
206    /// Emits an async track descriptor packet to the stream (single track for all async spans).
207    pub async fn emit_async_track_descriptor(&mut self) -> anyhow::Result<()> {
208        if self.async_track_uuid.is_some() {
209            return Ok(()); // Already created
210        }
211
212        let async_track_uuid = xxh64("async_track".as_bytes(), self.process_uuid);
213        self.async_track_uuid = Some(async_track_uuid);
214
215        let mut async_track = new_track_descriptor(async_track_uuid);
216        async_track.parent_uuid = Some(self.process_uuid);
217        async_track.static_or_dynamic_name =
218            Some(crate::protos::track_descriptor::StaticOrDynamicName::Name(
219                "Async Operations".to_owned(),
220            ));
221
222        let mut packet = new_trace_packet();
223        packet.data = Some(Data::TrackDescriptor(async_track));
224        self.write_packet(packet).await
225    }
226
227    /// Initialize span event fields for thread spans
228    fn init_span_event(
229        &mut self,
230        name: &str,
231        target: &str,
232        filename: &str,
233        line: u32,
234        packet: &mut TracePacket,
235        mut track_event: TrackEvent,
236    ) {
237        track_event.track_uuid = self.current_thread_uuid;
238        self.set_name(name, packet, &mut track_event);
239        self.set_category(target, packet, &mut track_event);
240        self.set_source_location(filename, line, packet, &mut track_event);
241        packet.data = Some(Data::TrackEvent(track_event));
242    }
243
244    /// Initialize async span event fields
245    fn init_async_span_event(
246        &mut self,
247        name: &str,
248        target: &str,
249        filename: &str,
250        line: u32,
251        packet: &mut TracePacket,
252        mut track_event: TrackEvent,
253    ) {
254        assert!(
255            self.async_track_uuid.is_some(),
256            "Must call emit_async_track_descriptor() before emitting async span events"
257        );
258
259        track_event.track_uuid = self.async_track_uuid;
260        self.set_name(name, packet, &mut track_event);
261        self.set_category(target, packet, &mut track_event);
262        self.set_source_location(filename, line, packet, &mut track_event);
263        packet.data = Some(Data::TrackEvent(track_event));
264    }
265
266    /// Emits a span event to the stream.
267    pub async fn emit_span(
268        &mut self,
269        begin_ns: u64,
270        end_ns: u64,
271        name: &str,
272        target: &str,
273        filename: &str,
274        line: u32,
275    ) -> anyhow::Result<()> {
276        // Emit begin event
277        let mut packet = new_trace_packet();
278        packet.timestamp = Some(begin_ns);
279        let mut track_event = new_track_event();
280        track_event.r#type = Some(track_event::Type::SliceBegin.into());
281        self.init_span_event(name, target, filename, line, &mut packet, track_event);
282        self.write_packet(packet).await?;
283
284        // Emit end event
285        let mut packet = new_trace_packet();
286        packet.timestamp = Some(end_ns);
287        let mut track_event = new_track_event();
288        track_event.r#type = Some(track_event::Type::SliceEnd.into());
289        self.init_span_event(name, target, filename, line, &mut packet, track_event);
290        self.write_packet(packet).await?;
291
292        Ok(())
293    }
294
295    /// Emits an async span begin event to the stream.
296    pub async fn emit_async_span_begin(
297        &mut self,
298        timestamp_ns: u64,
299        name: &str,
300        target: &str,
301        filename: &str,
302        line: u32,
303    ) -> anyhow::Result<()> {
304        let mut packet = new_trace_packet();
305        packet.timestamp = Some(timestamp_ns);
306        let mut track_event = new_track_event();
307        track_event.r#type = Some(track_event::Type::SliceBegin.into());
308        self.init_async_span_event(name, target, filename, line, &mut packet, track_event);
309        self.write_packet(packet).await
310    }
311
312    /// Emits an async span end event to the stream.
313    pub async fn emit_async_span_end(
314        &mut self,
315        timestamp_ns: u64,
316        name: &str,
317        target: &str,
318        filename: &str,
319        line: u32,
320    ) -> anyhow::Result<()> {
321        let mut packet = new_trace_packet();
322        packet.timestamp = Some(timestamp_ns);
323        let mut track_event = new_track_event();
324        track_event.r#type = Some(track_event::Type::SliceEnd.into());
325        self.init_async_span_event(name, target, filename, line, &mut packet, track_event);
326        self.write_packet(packet).await
327    }
328
329    /// Flushes any buffered data in the writer.
330    pub async fn flush(&mut self) -> anyhow::Result<()> {
331        self.writer.flush().await
332    }
333
334    /// Consumes the writer and returns the underlying AsyncWriter.
335    /// This is useful for testing to extract the written data.
336    pub fn into_inner(self) -> Box<dyn AsyncWriter + Send> {
337        self.writer
338    }
339}