micromegas_analytics/lakehouse/
perfetto_trace_table_function.rs1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 view_factory::ViewFactory,
4};
5use crate::{
6 dfext::expressions::{exp_to_string, exp_to_timestamp},
7 time::TimeRange,
8};
9use datafusion::{
10 arrow::datatypes::{DataType, Field, Schema},
11 catalog::{TableFunctionImpl, TableProvider},
12 common::plan_err,
13 logical_expr::Expr,
14};
15use micromegas_tracing::prelude::*;
16use std::sync::Arc;
17
18#[derive(Debug)]
36pub struct PerfettoTraceTableFunction {
37 lakehouse: Arc<LakehouseContext>,
38 view_factory: Arc<ViewFactory>,
39 part_provider: Arc<dyn QueryPartitionProvider>,
40}
41
42impl PerfettoTraceTableFunction {
43 pub fn new(
44 lakehouse: Arc<LakehouseContext>,
45 view_factory: Arc<ViewFactory>,
46 part_provider: Arc<dyn QueryPartitionProvider>,
47 ) -> Self {
48 Self {
49 lakehouse,
50 view_factory,
51 part_provider,
52 }
53 }
54
55 pub fn output_schema() -> Arc<Schema> {
57 Arc::new(Schema::new(vec![
58 Field::new("chunk_id", DataType::Int32, false),
59 Field::new("chunk_data", DataType::Binary, false),
60 ]))
61 }
62}
63
64impl TableFunctionImpl for PerfettoTraceTableFunction {
65 #[span_fn]
66 fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
67 let arg1 = exprs.first().map(exp_to_string);
69 let Some(Ok(process_id)) = arg1 else {
70 return plan_err!(
71 "First argument to perfetto_trace_chunks must be a string (the process ID), given {:?}",
72 arg1
73 );
74 };
75
76 let arg2 = exprs.get(1).map(exp_to_string);
78 let Some(Ok(span_types_str)) = arg2 else {
79 return plan_err!(
80 "Second argument to perfetto_trace_chunks must be a string ('thread', 'async', or 'both'), given {:?}",
81 arg2
82 );
83 };
84
85 let span_types = match span_types_str.as_str() {
86 "thread" => SpanTypes::Thread,
87 "async" => SpanTypes::Async,
88 "both" => SpanTypes::Both,
89 _ => {
90 return plan_err!(
91 "span_types must be 'thread', 'async', or 'both', given: {}",
92 span_types_str
93 );
94 }
95 };
96
97 let arg3 = exprs.get(2).map(exp_to_timestamp);
99 let Some(Ok(start_time)) = arg3 else {
100 return plan_err!(
101 "Third argument to perfetto_trace_chunks must be a timestamp (start time), given {:?}",
102 arg3
103 );
104 };
105
106 let arg4 = exprs.get(3).map(exp_to_timestamp);
108 let Some(Ok(end_time)) = arg4 else {
109 return plan_err!(
110 "Fourth argument to perfetto_trace_chunks must be a timestamp (end time), given {:?}",
111 arg4
112 );
113 };
114
115 let time_range = TimeRange {
117 begin: start_time,
118 end: end_time,
119 };
120
121 let execution_plan = Arc::new(PerfettoTraceExecutionPlan::new(
123 Self::output_schema(),
124 process_id,
125 span_types,
126 time_range,
127 self.lakehouse.clone(),
128 self.view_factory.clone(),
129 self.part_provider.clone(),
130 ));
131
132 Ok(Arc::new(PerfettoTraceTableProvider::new(execution_plan)))
134 }
135}
136
137use super::perfetto_trace_execution_plan::{
139 PerfettoTraceExecutionPlan, PerfettoTraceTableProvider, SpanTypes,
140};