micromegas_analytics/lakehouse/
perfetto_trace_table_function.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    view_factory::ViewFactory,
4};
5use crate::{
6    dfext::expressions::{exp_to_string, exp_to_timestamp},
7    time::TimeRange,
8};
9use datafusion::{
10    arrow::datatypes::{DataType, Field, Schema},
11    catalog::{TableFunctionImpl, TableProvider},
12    common::plan_err,
13    logical_expr::Expr,
14};
15use micromegas_tracing::prelude::*;
16use std::sync::Arc;
17
18/// `PerfettoTraceTableFunction` generates Perfetto trace chunks from process telemetry data.
19///
20/// SQL Interface:
21/// ```sql
22/// SELECT chunk_id, chunk_data
23/// FROM perfetto_trace_chunks(
24///     'process_id',                              -- Process UUID (required)
25///     'span_types',                              -- 'thread', 'async', or 'both' (required)
26///     TIMESTAMP '2024-01-01T00:00:00Z',          -- Start time as UTC timestamp (required)
27///     TIMESTAMP '2024-01-01T01:00:00Z'           -- End time as UTC timestamp (required)
28/// ) ORDER BY chunk_id
29/// ```
30///
31/// Returns a table with schema:
32/// - chunk_id: Int32 - Sequential chunk identifier
33/// - chunk_data: Binary - Binary protobuf TracePacket data
34///
35#[derive(Debug)]
36pub struct PerfettoTraceTableFunction {
37    lakehouse: Arc<LakehouseContext>,
38    view_factory: Arc<ViewFactory>,
39    part_provider: Arc<dyn QueryPartitionProvider>,
40}
41
42impl PerfettoTraceTableFunction {
43    pub fn new(
44        lakehouse: Arc<LakehouseContext>,
45        view_factory: Arc<ViewFactory>,
46        part_provider: Arc<dyn QueryPartitionProvider>,
47    ) -> Self {
48        Self {
49            lakehouse,
50            view_factory,
51            part_provider,
52        }
53    }
54
55    /// Create the output schema for the table function
56    pub fn output_schema() -> Arc<Schema> {
57        Arc::new(Schema::new(vec![
58            Field::new("chunk_id", DataType::Int32, false),
59            Field::new("chunk_data", DataType::Binary, false),
60        ]))
61    }
62}
63
64impl TableFunctionImpl for PerfettoTraceTableFunction {
65    #[span_fn]
66    fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
67        // Parse process_id (arg 1)
68        let arg1 = exprs.first().map(exp_to_string);
69        let Some(Ok(process_id)) = arg1 else {
70            return plan_err!(
71                "First argument to perfetto_trace_chunks must be a string (the process ID), given {:?}",
72                arg1
73            );
74        };
75
76        // Parse span_types (arg 2)
77        let arg2 = exprs.get(1).map(exp_to_string);
78        let Some(Ok(span_types_str)) = arg2 else {
79            return plan_err!(
80                "Second argument to perfetto_trace_chunks must be a string ('thread', 'async', or 'both'), given {:?}",
81                arg2
82            );
83        };
84
85        let span_types = match span_types_str.as_str() {
86            "thread" => SpanTypes::Thread,
87            "async" => SpanTypes::Async,
88            "both" => SpanTypes::Both,
89            _ => {
90                return plan_err!(
91                    "span_types must be 'thread', 'async', or 'both', given: {}",
92                    span_types_str
93                );
94            }
95        };
96
97        // Parse start_time (arg 3) - expecting a timestamp expression
98        let arg3 = exprs.get(2).map(exp_to_timestamp);
99        let Some(Ok(start_time)) = arg3 else {
100            return plan_err!(
101                "Third argument to perfetto_trace_chunks must be a timestamp (start time), given {:?}",
102                arg3
103            );
104        };
105
106        // Parse end_time (arg 4) - expecting a timestamp expression
107        let arg4 = exprs.get(3).map(exp_to_timestamp);
108        let Some(Ok(end_time)) = arg4 else {
109            return plan_err!(
110                "Fourth argument to perfetto_trace_chunks must be a timestamp (end time), given {:?}",
111                arg4
112            );
113        };
114
115        // Create time range from parsed timestamps
116        let time_range = TimeRange {
117            begin: start_time,
118            end: end_time,
119        };
120
121        // Create the execution plan that will generate the trace chunks
122        let execution_plan = Arc::new(PerfettoTraceExecutionPlan::new(
123            Self::output_schema(),
124            process_id,
125            span_types,
126            time_range,
127            self.lakehouse.clone(),
128            self.view_factory.clone(),
129            self.part_provider.clone(),
130        ));
131
132        // Wrap it in a TableProvider
133        Ok(Arc::new(PerfettoTraceTableProvider::new(execution_plan)))
134    }
135}
136
137// Import the execution plan
138use super::perfetto_trace_execution_plan::{
139    PerfettoTraceExecutionPlan, PerfettoTraceTableProvider, SpanTypes,
140};