1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 process_streams::get_process_thread_list, session_configurator::NoOpSessionConfigurator,
4 view_factory::ViewFactory,
5};
6use crate::dfext::{
7 string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
8};
9use crate::time::TimeRange;
10use async_stream::stream;
11use datafusion::{
12 arrow::{
13 array::{RecordBatch, TimestampNanosecondArray, UInt32Array},
14 datatypes::SchemaRef,
15 },
16 catalog::{Session, TableProvider},
17 common::Result as DFResult,
18 execution::{SendableRecordBatchStream, TaskContext},
19 logical_expr::{Expr, TableType},
20 physical_expr::EquivalenceProperties,
21 physical_plan::{
22 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
23 execution_plan::{Boundedness, EmissionType},
24 limit::GlobalLimitExec,
25 stream::RecordBatchStreamAdapter,
26 },
27};
28use futures::{StreamExt, TryStreamExt, stream};
29use micromegas_perfetto::{chunk_sender::ChunkSender, streaming_writer::PerfettoWriter};
30use micromegas_tracing::prelude::*;
31use std::{
32 any::Any,
33 fmt::{self, Debug, Formatter},
34 sync::Arc,
35};
36
37pub use super::process_spans_table_function::SpanTypes;
38
39pub struct PerfettoTraceExecutionPlan {
41 schema: SchemaRef,
42 process_id: String,
43 span_types: SpanTypes,
44 time_range: TimeRange,
45 lakehouse: Arc<LakehouseContext>,
46 view_factory: Arc<ViewFactory>,
47 part_provider: Arc<dyn QueryPartitionProvider>,
48 properties: PlanProperties,
49}
50
51impl PerfettoTraceExecutionPlan {
52 pub fn new(
53 schema: SchemaRef,
54 process_id: String,
55 span_types: SpanTypes,
56 time_range: TimeRange,
57 lakehouse: Arc<LakehouseContext>,
58 view_factory: Arc<ViewFactory>,
59 part_provider: Arc<dyn QueryPartitionProvider>,
60 ) -> Self {
61 let properties = PlanProperties::new(
62 EquivalenceProperties::new(schema.clone()),
63 Partitioning::UnknownPartitioning(1),
64 EmissionType::Final,
65 Boundedness::Bounded,
66 );
67
68 Self {
69 schema,
70 process_id,
71 span_types,
72 time_range,
73 lakehouse,
74 view_factory,
75 part_provider,
76 properties,
77 }
78 }
79}
80
81impl Debug for PerfettoTraceExecutionPlan {
82 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
83 f.debug_struct("PerfettoTraceExecutionPlan")
84 .field("process_id", &self.process_id)
85 .field("span_types", &self.span_types)
86 .field("time_range", &self.time_range)
87 .finish()
88 }
89}
90
91impl DisplayAs for PerfettoTraceExecutionPlan {
92 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
93 write!(
94 f,
95 "PerfettoTraceExecutionPlan: process_id={}, span_types={:?}, time_range={}..{}",
96 self.process_id, self.span_types, self.time_range.begin, self.time_range.end
97 )
98 }
99}
100
101impl ExecutionPlan for PerfettoTraceExecutionPlan {
102 fn name(&self) -> &str {
103 "PerfettoTraceExecutionPlan"
104 }
105
106 fn as_any(&self) -> &dyn Any {
107 self
108 }
109
110 fn schema(&self) -> SchemaRef {
111 self.schema.clone()
112 }
113
114 fn properties(&self) -> &PlanProperties {
115 &self.properties
116 }
117
118 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
119 vec![]
120 }
121
122 fn with_new_children(
123 self: Arc<Self>,
124 _children: Vec<Arc<dyn ExecutionPlan>>,
125 ) -> DFResult<Arc<dyn ExecutionPlan>> {
126 Ok(self)
127 }
128
129 #[span_fn]
130 fn execute(
131 &self,
132 _partition: usize,
133 _context: Arc<TaskContext>,
134 ) -> DFResult<SendableRecordBatchStream> {
135 let schema = self.schema.clone();
136 let process_id = self.process_id.clone();
137 let span_types = self.span_types;
138 let time_range = self.time_range;
139 let lakehouse = self.lakehouse.clone();
140 let view_factory = self.view_factory.clone();
141 let part_provider = self.part_provider.clone();
142
143 let stream = generate_perfetto_trace_stream(
145 process_id,
146 span_types,
147 time_range,
148 lakehouse,
149 view_factory,
150 part_provider,
151 );
152
153 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
154 }
155}
156
157#[span_fn]
159fn generate_perfetto_trace_stream(
160 process_id: String,
161 span_types: SpanTypes,
162 time_range: TimeRange,
163 lakehouse: Arc<LakehouseContext>,
164 view_factory: Arc<ViewFactory>,
165 part_provider: Arc<dyn QueryPartitionProvider>,
166) -> impl futures::Stream<Item = DFResult<RecordBatch>> {
167 stream! {
168 const CHUNK_SIZE: usize = 8 * 1024; let (chunk_sender, mut chunk_receiver) = tokio::sync::mpsc::channel(16);
171
172 let chunk_sender_writer = ChunkSender::new(chunk_sender, CHUNK_SIZE);
174
175 let generation_task = spawn_with_context(async move {
177 generate_streaming_perfetto_trace(
178 chunk_sender_writer,
179 process_id,
180 span_types,
181 time_range,
182 lakehouse,
183 view_factory,
184 part_provider,
185 ).await
186 });
187
188 while let Some(chunk_result) = chunk_receiver.recv().await {
190 match chunk_result {
191 Ok(batch) => yield Ok(batch),
192 Err(e) => {
193 error!("Error in chunk generation: {:?}", e);
194 yield Err(datafusion::error::DataFusionError::Execution(
195 format!("Chunk generation failed: {}", e)
196 ));
197 return;
198 }
199 }
200 }
201
202 match generation_task.await {
204 Ok(Ok(())) => {}, Ok(Err(e)) => {
206 error!("Trace generation failed: {:?}", e);
207 yield Err(datafusion::error::DataFusionError::Execution(
208 format!("Trace generation failed: {}", e)
209 ));
210 }
211 Err(e) => {
212 error!("Task panicked: {:?}", e);
213 yield Err(datafusion::error::DataFusionError::Execution(
214 format!("Task panicked: {}", e)
215 ));
216 }
217 }
218 }
219}
220
221async fn generate_streaming_perfetto_trace(
223 chunk_sender: ChunkSender,
224 process_id: String,
225 span_types: SpanTypes,
226 time_range: TimeRange,
227 lakehouse: Arc<LakehouseContext>,
228 view_factory: Arc<ViewFactory>,
229 part_provider: Arc<dyn QueryPartitionProvider>,
230) -> anyhow::Result<()> {
231 info!(
232 "Generating streaming Perfetto trace for process {} with span types {:?} from {} to {}",
233 process_id, span_types, time_range.begin, time_range.end
234 );
235
236 let ctx = super::query::make_session_context(
238 lakehouse,
239 part_provider,
240 Some(TimeRange {
241 begin: time_range.begin,
242 end: time_range.end,
243 }),
244 view_factory,
245 Arc::new(NoOpSessionConfigurator),
246 )
247 .await?;
248
249 let mut writer = PerfettoWriter::new(Box::new(chunk_sender), &process_id);
251
252 let process_exe = get_process_exe(&process_id, &ctx).await?;
253 writer.emit_process_descriptor(&process_exe).await?;
254 writer.flush().await?; let threads = get_process_thread_list(&process_id, &ctx).await?;
257 for (stream_id, thread_id, thread_name) in &threads {
258 writer
259 .emit_thread_descriptor(stream_id, *thread_id, thread_name)
260 .await?;
261 }
262 if !threads.is_empty() {
263 writer.flush().await?; }
265
266 if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
267 writer.emit_async_track_descriptor().await?;
268 writer.flush().await?; }
270
271 if matches!(span_types, SpanTypes::Thread | SpanTypes::Both) {
272 generate_thread_spans_with_writer(&mut writer, &ctx, &time_range, &threads).await?;
273 }
274
275 if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
276 generate_async_spans_with_writer(&mut writer, &process_id, &ctx, &time_range).await?;
277 }
278
279 writer.flush().await?; Ok(())
281}
282
283async fn get_process_exe(
285 process_id: &str,
286 ctx: &datafusion::execution::context::SessionContext,
287) -> anyhow::Result<String> {
288 let sql = format!(
289 r#"
290 SELECT exe
291 FROM processes
292 WHERE process_id = '{}'
293 LIMIT 1
294 "#,
295 process_id
296 );
297
298 let df = ctx.sql(&sql).await?;
299 let batches = df.collect().await?;
300
301 if batches.is_empty() || batches[0].num_rows() == 0 {
302 anyhow::bail!("Process {} not found", process_id);
303 }
304
305 let exes = string_column_by_name(&batches[0], "exe")?;
306 Ok(exes.value(0)?.to_owned())
307}
308
309fn format_thread_spans_query(stream_id: &str, time_range: &TimeRange) -> String {
311 format!(
312 r#"
313 SELECT "begin", "end", name, filename, target, line
314 FROM view_instance('thread_spans', '{}')
315 WHERE begin <= TIMESTAMP '{}'
316 AND end >= TIMESTAMP '{}'
317 ORDER BY begin
318 "#,
319 stream_id,
320 time_range.end.to_rfc3339(),
321 time_range.begin.to_rfc3339()
322 )
323}
324
325async fn generate_thread_spans_with_writer(
337 writer: &mut PerfettoWriter,
338 ctx: &datafusion::execution::context::SessionContext,
339 time_range: &TimeRange,
340 threads: &[(String, i32, String)],
341) -> anyhow::Result<()> {
342 let max_concurrent = std::thread::available_parallelism()
343 .map(|n| n.get())
344 .unwrap_or(4);
345
346 let queries: Vec<(String, String)> = threads
348 .iter()
349 .map(|(stream_id, _, _)| {
350 (
351 stream_id.clone(),
352 format_thread_spans_query(stream_id, time_range),
353 )
354 })
355 .collect();
356
357 let streams: Vec<(String, SendableRecordBatchStream)> = stream::iter(queries)
359 .map(|(stream_id, sql)| {
360 let ctx = ctx.clone();
361 async move {
362 spawn_with_context(async move {
363 let df = ctx.sql(&sql).await?;
364 let stream = df.execute_stream().await?;
365 Ok::<_, anyhow::Error>((stream_id, stream))
366 })
367 .await?
368 }
369 })
370 .buffered(max_concurrent)
371 .try_collect()
372 .await?;
373
374 for (stream_id, mut data_stream) in streams {
376 writer.set_current_thread(&stream_id);
377
378 let mut span_count = 0;
379 while let Some(batch_result) = data_stream.next().await {
380 let batch = batch_result?;
381 let begin_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "begin")?;
382 let end_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "end")?;
383 let names = string_column_by_name(&batch, "name")?;
384 let filenames = string_column_by_name(&batch, "filename")?;
385 let targets = string_column_by_name(&batch, "target")?;
386 let lines: &UInt32Array = typed_column_by_name(&batch, "line")?;
387
388 for i in 0..batch.num_rows() {
389 let begin_ns = begin_times.value(i) as u64;
390 let end_ns = end_times.value(i) as u64;
391 let name = names.value(i)?;
392 let filename = filenames.value(i)?;
393 let target = targets.value(i)?;
394 let line = lines.value(i);
395
396 writer
397 .emit_span(begin_ns, end_ns, name, target, filename, line)
398 .await?;
399
400 span_count += 1;
401 if span_count % 10 == 0 {
402 writer.flush().await?;
403 }
404 }
405 }
406 }
407 Ok(())
408}
409
410async fn generate_async_spans_with_writer(
412 writer: &mut PerfettoWriter,
413 process_id: &str,
414 ctx: &datafusion::execution::context::SessionContext,
415 time_range: &TimeRange,
416) -> anyhow::Result<()> {
417 let sql = format!(
418 r#"
419 WITH begin_events AS (
420 SELECT span_id, time as begin_time, name, filename, target, line
421 FROM view_instance('async_events', '{}')
422 WHERE time >= TIMESTAMP '{}'
423 AND time <= TIMESTAMP '{}'
424 AND event_type = 'begin'
425 ),
426 end_events AS (
427 SELECT span_id, time as end_time
428 FROM view_instance('async_events', '{}')
429 WHERE time >= TIMESTAMP '{}'
430 AND time <= TIMESTAMP '{}'
431 AND event_type = 'end'
432 )
433 SELECT
434 b.span_id,
435 b.begin_time,
436 e.end_time,
437 b.name,
438 b.filename,
439 b.target,
440 b.line
441 FROM begin_events b
442 INNER JOIN end_events e ON b.span_id = e.span_id
443 ORDER BY b.begin_time
444 "#,
445 process_id,
446 time_range.begin.to_rfc3339(),
447 time_range.end.to_rfc3339(),
448 process_id,
449 time_range.begin.to_rfc3339(),
450 time_range.end.to_rfc3339(),
451 );
452
453 let df = ctx.sql(&sql).await?;
454 let mut stream = df.execute_stream().await?;
455
456 let mut span_count = 0;
457 while let Some(batch_result) = stream.next().await {
458 let batch = batch_result?;
459 let span_ids: &datafusion::arrow::array::Int64Array =
460 typed_column_by_name(&batch, "span_id")?;
461 let begin_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "begin_time")?;
462 let end_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "end_time")?;
463 let names = string_column_by_name(&batch, "name")?;
464 let filenames = string_column_by_name(&batch, "filename")?;
465 let targets = string_column_by_name(&batch, "target")?;
466 let lines: &UInt32Array = typed_column_by_name(&batch, "line")?;
467 for i in 0..batch.num_rows() {
468 let _span_id = span_ids.value(i);
469 let begin_ns = begin_times.value(i) as u64;
470 let end_ns = end_times.value(i) as u64;
471 let name = names.value(i)?;
472 let filename = filenames.value(i)?;
473 let target = targets.value(i)?;
474 let line = lines.value(i);
475
476 if begin_ns < end_ns {
477 writer
479 .emit_async_span_begin(begin_ns, name, target, filename, line)
480 .await?;
481 writer
482 .emit_async_span_end(end_ns, name, target, filename, line)
483 .await?;
484
485 span_count += 1;
486 if span_count % 10 == 0 {
488 writer.flush().await?;
489 }
490 } else {
491 warn!("Skipping async span with invalid duration");
492 }
493 }
494 }
495
496 Ok(())
497}
498
499#[derive(Debug)]
501pub struct PerfettoTraceTableProvider {
502 execution_plan: Arc<PerfettoTraceExecutionPlan>,
503}
504
505impl PerfettoTraceTableProvider {
506 pub fn new(execution_plan: Arc<PerfettoTraceExecutionPlan>) -> Self {
507 Self { execution_plan }
508 }
509}
510
511#[async_trait::async_trait]
512impl TableProvider for PerfettoTraceTableProvider {
513 fn as_any(&self) -> &dyn Any {
514 self
515 }
516
517 fn schema(&self) -> SchemaRef {
518 self.execution_plan.schema()
519 }
520
521 fn table_type(&self) -> TableType {
522 TableType::Base
523 }
524
525 async fn scan(
526 &self,
527 _state: &dyn Session,
528 _projection: Option<&Vec<usize>>,
529 _filters: &[Expr],
530 limit: Option<usize>,
531 ) -> DFResult<Arc<dyn ExecutionPlan>> {
532 let plan: Arc<dyn ExecutionPlan> = self.execution_plan.clone();
536 if let Some(fetch) = limit {
537 Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))))
538 } else {
539 Ok(plan)
540 }
541 }
542}