micromegas_analytics/lakehouse/
perfetto_trace_execution_plan.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    process_streams::get_process_thread_list, session_configurator::NoOpSessionConfigurator,
4    view_factory::ViewFactory,
5};
6use crate::dfext::{
7    string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
8};
9use crate::time::TimeRange;
10use async_stream::stream;
11use datafusion::{
12    arrow::{
13        array::{RecordBatch, TimestampNanosecondArray, UInt32Array},
14        datatypes::SchemaRef,
15    },
16    catalog::{Session, TableProvider},
17    common::Result as DFResult,
18    execution::{SendableRecordBatchStream, TaskContext},
19    logical_expr::{Expr, TableType},
20    physical_expr::EquivalenceProperties,
21    physical_plan::{
22        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
23        execution_plan::{Boundedness, EmissionType},
24        limit::GlobalLimitExec,
25        stream::RecordBatchStreamAdapter,
26    },
27};
28use futures::{StreamExt, TryStreamExt, stream};
29use micromegas_perfetto::{chunk_sender::ChunkSender, streaming_writer::PerfettoWriter};
30use micromegas_tracing::prelude::*;
31use std::{
32    any::Any,
33    fmt::{self, Debug, Formatter},
34    sync::Arc,
35};
36
37pub use super::process_spans_table_function::SpanTypes;
38
39/// Execution plan that generates Perfetto trace chunks
40pub struct PerfettoTraceExecutionPlan {
41    schema: SchemaRef,
42    process_id: String,
43    span_types: SpanTypes,
44    time_range: TimeRange,
45    lakehouse: Arc<LakehouseContext>,
46    view_factory: Arc<ViewFactory>,
47    part_provider: Arc<dyn QueryPartitionProvider>,
48    properties: PlanProperties,
49}
50
51impl PerfettoTraceExecutionPlan {
52    pub fn new(
53        schema: SchemaRef,
54        process_id: String,
55        span_types: SpanTypes,
56        time_range: TimeRange,
57        lakehouse: Arc<LakehouseContext>,
58        view_factory: Arc<ViewFactory>,
59        part_provider: Arc<dyn QueryPartitionProvider>,
60    ) -> Self {
61        let properties = PlanProperties::new(
62            EquivalenceProperties::new(schema.clone()),
63            Partitioning::UnknownPartitioning(1),
64            EmissionType::Final,
65            Boundedness::Bounded,
66        );
67
68        Self {
69            schema,
70            process_id,
71            span_types,
72            time_range,
73            lakehouse,
74            view_factory,
75            part_provider,
76            properties,
77        }
78    }
79}
80
81impl Debug for PerfettoTraceExecutionPlan {
82    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
83        f.debug_struct("PerfettoTraceExecutionPlan")
84            .field("process_id", &self.process_id)
85            .field("span_types", &self.span_types)
86            .field("time_range", &self.time_range)
87            .finish()
88    }
89}
90
91impl DisplayAs for PerfettoTraceExecutionPlan {
92    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
93        write!(
94            f,
95            "PerfettoTraceExecutionPlan: process_id={}, span_types={:?}, time_range={}..{}",
96            self.process_id, self.span_types, self.time_range.begin, self.time_range.end
97        )
98    }
99}
100
101impl ExecutionPlan for PerfettoTraceExecutionPlan {
102    fn name(&self) -> &str {
103        "PerfettoTraceExecutionPlan"
104    }
105
106    fn as_any(&self) -> &dyn Any {
107        self
108    }
109
110    fn schema(&self) -> SchemaRef {
111        self.schema.clone()
112    }
113
114    fn properties(&self) -> &PlanProperties {
115        &self.properties
116    }
117
118    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
119        vec![]
120    }
121
122    fn with_new_children(
123        self: Arc<Self>,
124        _children: Vec<Arc<dyn ExecutionPlan>>,
125    ) -> DFResult<Arc<dyn ExecutionPlan>> {
126        Ok(self)
127    }
128
129    #[span_fn]
130    fn execute(
131        &self,
132        _partition: usize,
133        _context: Arc<TaskContext>,
134    ) -> DFResult<SendableRecordBatchStream> {
135        let schema = self.schema.clone();
136        let process_id = self.process_id.clone();
137        let span_types = self.span_types;
138        let time_range = self.time_range;
139        let lakehouse = self.lakehouse.clone();
140        let view_factory = self.view_factory.clone();
141        let part_provider = self.part_provider.clone();
142
143        // Create the stream directly without channels
144        let stream = generate_perfetto_trace_stream(
145            process_id,
146            span_types,
147            time_range,
148            lakehouse,
149            view_factory,
150            part_provider,
151        );
152
153        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
154    }
155}
156
157/// Creates a stream of Perfetto trace chunks using streaming architecture
158#[span_fn]
159fn generate_perfetto_trace_stream(
160    process_id: String,
161    span_types: SpanTypes,
162    time_range: TimeRange,
163    lakehouse: Arc<LakehouseContext>,
164    view_factory: Arc<ViewFactory>,
165    part_provider: Arc<dyn QueryPartitionProvider>,
166) -> impl futures::Stream<Item = DFResult<RecordBatch>> {
167    stream! {
168        // Create channel for streaming chunks
169        const CHUNK_SIZE: usize = 8 * 1024; // 8KB chunks
170        let (chunk_sender, mut chunk_receiver) = tokio::sync::mpsc::channel(16);
171
172        // Create ChunkSender that will stream data through the channel
173        let chunk_sender_writer = ChunkSender::new(chunk_sender, CHUNK_SIZE);
174
175        // Spawn background task to generate trace
176        let generation_task = spawn_with_context(async move {
177            generate_streaming_perfetto_trace(
178                chunk_sender_writer,
179                process_id,
180                span_types,
181                time_range,
182                lakehouse,
183                view_factory,
184                part_provider,
185            ).await
186        });
187
188        // Stream chunks as they become available
189        while let Some(chunk_result) = chunk_receiver.recv().await {
190            match chunk_result {
191                Ok(batch) => yield Ok(batch),
192                Err(e) => {
193                    error!("Error in chunk generation: {:?}", e);
194                    yield Err(datafusion::error::DataFusionError::Execution(
195                        format!("Chunk generation failed: {}", e)
196                    ));
197                    return;
198                }
199            }
200        }
201
202        // Wait for generation task to complete and check for errors
203        match generation_task.await {
204            Ok(Ok(())) => {}, // Success
205            Ok(Err(e)) => {
206                error!("Trace generation failed: {:?}", e);
207                yield Err(datafusion::error::DataFusionError::Execution(
208                    format!("Trace generation failed: {}", e)
209                ));
210            }
211            Err(e) => {
212                error!("Task panicked: {:?}", e);
213                yield Err(datafusion::error::DataFusionError::Execution(
214                    format!("Task panicked: {}", e)
215                ));
216            }
217        }
218    }
219}
220
221/// Generate Perfetto trace using streaming architecture
222async fn generate_streaming_perfetto_trace(
223    chunk_sender: ChunkSender,
224    process_id: String,
225    span_types: SpanTypes,
226    time_range: TimeRange,
227    lakehouse: Arc<LakehouseContext>,
228    view_factory: Arc<ViewFactory>,
229    part_provider: Arc<dyn QueryPartitionProvider>,
230) -> anyhow::Result<()> {
231    info!(
232        "Generating streaming Perfetto trace for process {} with span types {:?} from {} to {}",
233        process_id, span_types, time_range.begin, time_range.end
234    );
235
236    // Create a context for making queries
237    let ctx = super::query::make_session_context(
238        lakehouse,
239        part_provider,
240        Some(TimeRange {
241            begin: time_range.begin,
242            end: time_range.end,
243        }),
244        view_factory,
245        Arc::new(NoOpSessionConfigurator),
246    )
247    .await?;
248
249    // Use ChunkSender directly as the writer destination
250    let mut writer = PerfettoWriter::new(Box::new(chunk_sender), &process_id);
251
252    let process_exe = get_process_exe(&process_id, &ctx).await?;
253    writer.emit_process_descriptor(&process_exe).await?;
254    writer.flush().await?; // Forces chunk emission
255
256    let threads = get_process_thread_list(&process_id, &ctx).await?;
257    for (stream_id, thread_id, thread_name) in &threads {
258        writer
259            .emit_thread_descriptor(stream_id, *thread_id, thread_name)
260            .await?;
261    }
262    if !threads.is_empty() {
263        writer.flush().await?; // Forces chunk emission
264    }
265
266    if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
267        writer.emit_async_track_descriptor().await?;
268        writer.flush().await?; // Forces chunk emission
269    }
270
271    if matches!(span_types, SpanTypes::Thread | SpanTypes::Both) {
272        generate_thread_spans_with_writer(&mut writer, &ctx, &time_range, &threads).await?;
273    }
274
275    if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
276        generate_async_spans_with_writer(&mut writer, &process_id, &ctx, &time_range).await?;
277    }
278
279    writer.flush().await?; // Final chunk - this handles the chunk_sender.flush() internally
280    Ok(())
281}
282
283/// Get process executable name from the processes table
284async fn get_process_exe(
285    process_id: &str,
286    ctx: &datafusion::execution::context::SessionContext,
287) -> anyhow::Result<String> {
288    let sql = format!(
289        r#"
290        SELECT exe
291        FROM processes
292        WHERE process_id = '{}'
293        LIMIT 1
294        "#,
295        process_id
296    );
297
298    let df = ctx.sql(&sql).await?;
299    let batches = df.collect().await?;
300
301    if batches.is_empty() || batches[0].num_rows() == 0 {
302        anyhow::bail!("Process {} not found", process_id);
303    }
304
305    let exes = string_column_by_name(&batches[0], "exe")?;
306    Ok(exes.value(0)?.to_owned())
307}
308
309/// Format the SQL query for thread spans
310fn format_thread_spans_query(stream_id: &str, time_range: &TimeRange) -> String {
311    format!(
312        r#"
313        SELECT "begin", "end", name, filename, target, line
314        FROM view_instance('thread_spans', '{}')
315        WHERE begin <= TIMESTAMP '{}'
316          AND end >= TIMESTAMP '{}'
317        ORDER BY begin
318        "#,
319        stream_id,
320        time_range.end.to_rfc3339(),
321        time_range.begin.to_rfc3339()
322    )
323}
324
325/// Generate thread spans with parallel JIT and sequential writing.
326///
327/// JIT partition locking is per-(view_set_name, view_instance_id), and each thread
328/// has a unique stream_id used as view_instance_id. This means different threads
329/// get different lock keys, making parallel JIT safe.
330///
331/// Strategy:
332/// - Spawn tasks with spawn_with_context() for true multi-threaded parallelism
333/// - Use buffered() to limit concurrent spawned tasks
334/// - Collect all streams preserving order
335/// - Consume streams sequentially to write each thread's spans together
336async fn generate_thread_spans_with_writer(
337    writer: &mut PerfettoWriter,
338    ctx: &datafusion::execution::context::SessionContext,
339    time_range: &TimeRange,
340    threads: &[(String, i32, String)],
341) -> anyhow::Result<()> {
342    let max_concurrent = std::thread::available_parallelism()
343        .map(|n| n.get())
344        .unwrap_or(4);
345
346    // Prepare query inputs upfront
347    let queries: Vec<(String, String)> = threads
348        .iter()
349        .map(|(stream_id, _, _)| {
350            (
351                stream_id.clone(),
352                format_thread_spans_query(stream_id, time_range),
353            )
354        })
355        .collect();
356
357    // Build streams in parallel using spawn for true multi-threading
358    let streams: Vec<(String, SendableRecordBatchStream)> = stream::iter(queries)
359        .map(|(stream_id, sql)| {
360            let ctx = ctx.clone();
361            async move {
362                spawn_with_context(async move {
363                    let df = ctx.sql(&sql).await?;
364                    let stream = df.execute_stream().await?;
365                    Ok::<_, anyhow::Error>((stream_id, stream))
366                })
367                .await?
368            }
369        })
370        .buffered(max_concurrent)
371        .try_collect()
372        .await?;
373
374    // Consume streams sequentially - each thread's spans written together
375    for (stream_id, mut data_stream) in streams {
376        writer.set_current_thread(&stream_id);
377
378        let mut span_count = 0;
379        while let Some(batch_result) = data_stream.next().await {
380            let batch = batch_result?;
381            let begin_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "begin")?;
382            let end_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "end")?;
383            let names = string_column_by_name(&batch, "name")?;
384            let filenames = string_column_by_name(&batch, "filename")?;
385            let targets = string_column_by_name(&batch, "target")?;
386            let lines: &UInt32Array = typed_column_by_name(&batch, "line")?;
387
388            for i in 0..batch.num_rows() {
389                let begin_ns = begin_times.value(i) as u64;
390                let end_ns = end_times.value(i) as u64;
391                let name = names.value(i)?;
392                let filename = filenames.value(i)?;
393                let target = targets.value(i)?;
394                let line = lines.value(i);
395
396                writer
397                    .emit_span(begin_ns, end_ns, name, target, filename, line)
398                    .await?;
399
400                span_count += 1;
401                if span_count % 10 == 0 {
402                    writer.flush().await?;
403                }
404            }
405        }
406    }
407    Ok(())
408}
409
410/// Generate async spans using the provided PerfettoWriter
411async fn generate_async_spans_with_writer(
412    writer: &mut PerfettoWriter,
413    process_id: &str,
414    ctx: &datafusion::execution::context::SessionContext,
415    time_range: &TimeRange,
416) -> anyhow::Result<()> {
417    let sql = format!(
418        r#"
419        WITH begin_events AS (
420            SELECT span_id, time as begin_time, name, filename, target, line
421            FROM view_instance('async_events', '{}')
422            WHERE time >= TIMESTAMP '{}'
423              AND time <= TIMESTAMP '{}'
424              AND event_type = 'begin'
425        ),
426        end_events AS (
427            SELECT span_id, time as end_time
428            FROM view_instance('async_events', '{}')
429            WHERE time >= TIMESTAMP '{}'
430              AND time <= TIMESTAMP '{}'
431              AND event_type = 'end'
432        )
433        SELECT 
434            b.span_id,
435            b.begin_time,
436            e.end_time,
437            b.name,
438            b.filename,
439            b.target,
440            b.line
441        FROM begin_events b
442        INNER JOIN end_events e ON b.span_id = e.span_id
443        ORDER BY b.begin_time
444        "#,
445        process_id,
446        time_range.begin.to_rfc3339(),
447        time_range.end.to_rfc3339(),
448        process_id,
449        time_range.begin.to_rfc3339(),
450        time_range.end.to_rfc3339(),
451    );
452
453    let df = ctx.sql(&sql).await?;
454    let mut stream = df.execute_stream().await?;
455
456    let mut span_count = 0;
457    while let Some(batch_result) = stream.next().await {
458        let batch = batch_result?;
459        let span_ids: &datafusion::arrow::array::Int64Array =
460            typed_column_by_name(&batch, "span_id")?;
461        let begin_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "begin_time")?;
462        let end_times: &TimestampNanosecondArray = typed_column_by_name(&batch, "end_time")?;
463        let names = string_column_by_name(&batch, "name")?;
464        let filenames = string_column_by_name(&batch, "filename")?;
465        let targets = string_column_by_name(&batch, "target")?;
466        let lines: &UInt32Array = typed_column_by_name(&batch, "line")?;
467        for i in 0..batch.num_rows() {
468            let _span_id = span_ids.value(i);
469            let begin_ns = begin_times.value(i) as u64;
470            let end_ns = end_times.value(i) as u64;
471            let name = names.value(i)?;
472            let filename = filenames.value(i)?;
473            let target = targets.value(i)?;
474            let line = lines.value(i);
475
476            if begin_ns < end_ns {
477                // Emit async span begin and end events with single writer
478                writer
479                    .emit_async_span_begin(begin_ns, name, target, filename, line)
480                    .await?;
481                writer
482                    .emit_async_span_end(end_ns, name, target, filename, line)
483                    .await?;
484
485                span_count += 1;
486                // Flush every 10 async spans to create multiple chunks
487                if span_count % 10 == 0 {
488                    writer.flush().await?;
489                }
490            } else {
491                warn!("Skipping async span with invalid duration");
492            }
493        }
494    }
495
496    Ok(())
497}
498
499/// TableProvider wrapper for PerfettoTraceExecutionPlan
500#[derive(Debug)]
501pub struct PerfettoTraceTableProvider {
502    execution_plan: Arc<PerfettoTraceExecutionPlan>,
503}
504
505impl PerfettoTraceTableProvider {
506    pub fn new(execution_plan: Arc<PerfettoTraceExecutionPlan>) -> Self {
507        Self { execution_plan }
508    }
509}
510
511#[async_trait::async_trait]
512impl TableProvider for PerfettoTraceTableProvider {
513    fn as_any(&self) -> &dyn Any {
514        self
515    }
516
517    fn schema(&self) -> SchemaRef {
518        self.execution_plan.schema()
519    }
520
521    fn table_type(&self) -> TableType {
522        TableType::Base
523    }
524
525    async fn scan(
526        &self,
527        _state: &dyn Session,
528        _projection: Option<&Vec<usize>>,
529        _filters: &[Expr],
530        limit: Option<usize>,
531    ) -> DFResult<Arc<dyn ExecutionPlan>> {
532        // Wrap the execution plan in a GlobalLimitExec if a limit is provided.
533        // DataFusion trusts us to apply the limit - if we ignore it, too many rows
534        // will be returned to the client.
535        let plan: Arc<dyn ExecutionPlan> = self.execution_plan.clone();
536        if let Some(fetch) = limit {
537            Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch))))
538        } else {
539            Ok(plan)
540        }
541    }
542}