1use super::{
2 lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3 process_streams::get_process_thread_list, session_configurator::NoOpSessionConfigurator,
4 view_factory::ViewFactory,
5};
6use crate::{dfext::expressions::exp_to_string, span_table::get_spans_schema, time::TimeRange};
7use async_stream::try_stream;
8use datafusion::{
9 arrow::{
10 array::{ArrayRef, RecordBatch, StringDictionaryBuilder},
11 datatypes::{DataType, Field, Int16Type, Schema, SchemaRef},
12 },
13 catalog::{Session, TableFunctionImpl, TableProvider},
14 common::{Result as DFResult, plan_err},
15 execution::{SendableRecordBatchStream, TaskContext},
16 logical_expr::{Expr, TableType},
17 physical_expr::EquivalenceProperties,
18 physical_plan::{
19 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
20 execution_plan::{Boundedness, EmissionType},
21 limit::GlobalLimitExec,
22 projection::ProjectionExec,
23 stream::RecordBatchStreamAdapter,
24 },
25};
26use futures::{StreamExt, TryStreamExt};
27use micromegas_tracing::prelude::*;
28use std::{
29 any::Any,
30 fmt::{self, Debug, Formatter},
31 sync::Arc,
32};
33
34#[derive(Debug, Clone, Copy)]
36pub enum SpanTypes {
37 Thread,
38 Async,
39 Both,
40}
41
42fn output_schema() -> SchemaRef {
43 let mut fields = vec![
44 Field::new(
45 "stream_id",
46 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
47 false,
48 ),
49 Field::new(
50 "thread_name",
51 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
52 false,
53 ),
54 ];
55 fields.extend(get_spans_schema().fields.iter().map(|f| f.as_ref().clone()));
56 Arc::new(Schema::new(fields))
57}
58
59fn augment_batch(
60 batch: &RecordBatch,
61 schema: SchemaRef,
62 stream_id: &str,
63 thread_name: &str,
64) -> DFResult<RecordBatch> {
65 let n = batch.num_rows();
66 let mut stream_id_builder = StringDictionaryBuilder::<Int16Type>::new();
67 let mut thread_name_builder = StringDictionaryBuilder::<Int16Type>::new();
68 stream_id_builder.append_values(stream_id, n);
69 thread_name_builder.append_values(thread_name, n);
70 let mut columns: Vec<ArrayRef> = vec![
71 Arc::new(stream_id_builder.finish()),
72 Arc::new(thread_name_builder.finish()),
73 ];
74 columns.extend(batch.columns().iter().cloned());
75 RecordBatch::try_new(schema, columns).map_err(Into::into)
76}
77
78#[derive(Debug)]
81pub struct ProcessSpansTableFunction {
82 lakehouse: Arc<LakehouseContext>,
83 view_factory: Arc<ViewFactory>,
84 part_provider: Arc<dyn QueryPartitionProvider>,
85 query_range: Option<TimeRange>,
86}
87
88impl ProcessSpansTableFunction {
89 pub fn new(
90 lakehouse: Arc<LakehouseContext>,
91 view_factory: Arc<ViewFactory>,
92 part_provider: Arc<dyn QueryPartitionProvider>,
93 query_range: Option<TimeRange>,
94 ) -> Self {
95 Self {
96 lakehouse,
97 view_factory,
98 part_provider,
99 query_range,
100 }
101 }
102}
103
104impl TableFunctionImpl for ProcessSpansTableFunction {
105 #[span_fn]
106 fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
107 let arg1 = exprs.first().map(exp_to_string);
108 let Some(Ok(process_id)) = arg1 else {
109 return plan_err!(
110 "First argument to process_spans must be a string (the process ID), given {:?}",
111 arg1
112 );
113 };
114
115 let arg2 = exprs.get(1).map(exp_to_string);
116 let Some(Ok(span_types_str)) = arg2 else {
117 return plan_err!(
118 "Second argument to process_spans must be a string ('thread', 'async', or 'both'), given {:?}",
119 arg2
120 );
121 };
122
123 let span_types = match span_types_str.as_str() {
124 "thread" => SpanTypes::Thread,
125 "async" => SpanTypes::Async,
126 "both" => SpanTypes::Both,
127 _ => {
128 return plan_err!(
129 "span_types must be 'thread', 'async', or 'both', given: {span_types_str}"
130 );
131 }
132 };
133
134 let schema = output_schema();
135 let execution_plan = Arc::new(ProcessSpansExecutionPlan::new(
136 schema,
137 process_id,
138 span_types,
139 self.query_range,
140 self.lakehouse.clone(),
141 self.view_factory.clone(),
142 self.part_provider.clone(),
143 ));
144
145 Ok(Arc::new(ProcessSpansTableProvider { execution_plan }))
146 }
147}
148
149pub struct ProcessSpansExecutionPlan {
152 schema: SchemaRef,
153 process_id: String,
154 span_types: SpanTypes,
155 query_range: Option<TimeRange>,
156 lakehouse: Arc<LakehouseContext>,
157 view_factory: Arc<ViewFactory>,
158 part_provider: Arc<dyn QueryPartitionProvider>,
159 properties: PlanProperties,
160}
161
162impl ProcessSpansExecutionPlan {
163 fn new(
164 schema: SchemaRef,
165 process_id: String,
166 span_types: SpanTypes,
167 query_range: Option<TimeRange>,
168 lakehouse: Arc<LakehouseContext>,
169 view_factory: Arc<ViewFactory>,
170 part_provider: Arc<dyn QueryPartitionProvider>,
171 ) -> Self {
172 let properties = PlanProperties::new(
173 EquivalenceProperties::new(schema.clone()),
174 Partitioning::UnknownPartitioning(1),
175 EmissionType::Final,
176 Boundedness::Bounded,
177 );
178 Self {
179 schema,
180 process_id,
181 span_types,
182 query_range,
183 lakehouse,
184 view_factory,
185 part_provider,
186 properties,
187 }
188 }
189}
190
191impl Debug for ProcessSpansExecutionPlan {
192 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
193 f.debug_struct("ProcessSpansExecutionPlan")
194 .field("process_id", &self.process_id)
195 .field("span_types", &self.span_types)
196 .finish()
197 }
198}
199
200impl DisplayAs for ProcessSpansExecutionPlan {
201 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
202 write!(
203 f,
204 "ProcessSpansExecutionPlan: process_id={}, span_types={:?}",
205 self.process_id, self.span_types
206 )
207 }
208}
209
210impl ExecutionPlan for ProcessSpansExecutionPlan {
211 fn name(&self) -> &str {
212 "ProcessSpansExecutionPlan"
213 }
214
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn schema(&self) -> SchemaRef {
220 self.schema.clone()
221 }
222
223 fn properties(&self) -> &PlanProperties {
224 &self.properties
225 }
226
227 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228 vec![]
229 }
230
231 fn with_new_children(
232 self: Arc<Self>,
233 _children: Vec<Arc<dyn ExecutionPlan>>,
234 ) -> DFResult<Arc<dyn ExecutionPlan>> {
235 Ok(self)
236 }
237
238 #[span_fn]
239 fn execute(
240 &self,
241 _partition: usize,
242 _context: Arc<TaskContext>,
243 ) -> DFResult<SendableRecordBatchStream> {
244 let schema = self.schema.clone();
245 let stream_schema = schema.clone();
246 let process_id = self.process_id.clone();
247 let span_types = self.span_types;
248 let query_range = self.query_range;
249 let lakehouse = self.lakehouse.clone();
250 let view_factory = self.view_factory.clone();
251 let part_provider = self.part_provider.clone();
252
253 let record_batch_stream = try_stream! {
254 let schema = stream_schema;
255 let ctx = super::query::make_session_context(
256 lakehouse,
257 part_provider,
258 query_range,
259 view_factory,
260 Arc::new(NoOpSessionConfigurator),
261 )
262 .await
263 .map_err(|e| datafusion::error::DataFusionError::Execution(
264 format!("Failed to create session context: {e}"),
265 ))?;
266
267 if matches!(span_types, SpanTypes::Thread | SpanTypes::Both) {
269 let threads = get_process_thread_list(&process_id, &ctx)
270 .await
271 .map_err(|e| datafusion::error::DataFusionError::Execution(
272 format!("Failed to get thread list: {e}"),
273 ))?;
274
275 let max_concurrent = std::thread::available_parallelism()
276 .map(|n| n.get())
277 .unwrap_or(4);
278
279 let queries: Vec<(String, String, String)> = threads
280 .iter()
281 .map(|(stream_id, _thread_id, display_name)| {
282 let sql = format!(
283 "SELECT * FROM view_instance('thread_spans', '{stream_id}')"
284 );
285 (stream_id.clone(), display_name.clone(), sql)
286 })
287 .collect();
288
289 let stream_results: Vec<(String, String, SendableRecordBatchStream)> =
290 futures::stream::iter(queries)
291 .map(|(stream_id, thread_name, sql)| {
292 let ctx = ctx.clone();
293 async move {
294 spawn_with_context(async move {
295 let df = ctx.sql(&sql).await?;
296 let s = df.execute_stream().await?;
297 Ok::<_, anyhow::Error>((stream_id, thread_name, s))
298 })
299 .await?
300 }
301 })
302 .buffered(max_concurrent)
303 .try_collect()
304 .await
305 .map_err(|e| datafusion::error::DataFusionError::Execution(
306 format!("Failed to query thread spans: {e}"),
307 ))?;
308
309 for (stream_id, thread_name, mut data_stream) in stream_results {
310 while let Some(batch) = data_stream.try_next().await? {
311 let augmented = augment_batch(&batch, schema.clone(), &stream_id, &thread_name)?;
312 yield augmented;
313 }
314 }
315 }
316
317 if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
319 let async_sql = format!(
320 "SELECT \
321 b.span_id as id, \
322 b.parent_span_id as parent, \
323 b.depth, \
324 b.hash, \
325 b.time as \"begin\", \
326 e.time as \"end\", \
327 arrow_cast(e.time, 'Int64') - arrow_cast(b.time, 'Int64') as duration, \
328 b.name, \
329 b.target, \
330 b.filename, \
331 b.line \
332 FROM (SELECT * FROM view_instance('async_events', '{process_id}') \
333 WHERE event_type = 'begin') b \
334 INNER JOIN (SELECT * FROM view_instance('async_events', '{process_id}') \
335 WHERE event_type = 'end') e \
336 ON b.span_id = e.span_id \
337 WHERE b.time < e.time \
338 ORDER BY b.time"
339 );
340
341 let df = ctx.sql(&async_sql).await
342 .map_err(|e| datafusion::error::DataFusionError::Execution(
343 format!("Failed to query async spans: {e}"),
344 ))?;
345 let mut async_stream = df.execute_stream().await
346 .map_err(|e| datafusion::error::DataFusionError::Execution(
347 format!("Failed to execute async spans stream: {e}"),
348 ))?;
349
350 while let Some(batch) = async_stream.try_next().await? {
351 let augmented = augment_batch(&batch, schema.clone(), "", "async")?;
352 yield augmented;
353 }
354 }
355 };
356
357 Ok(Box::pin(RecordBatchStreamAdapter::new(
358 schema,
359 record_batch_stream,
360 )))
361 }
362}
363
364#[derive(Debug)]
367struct ProcessSpansTableProvider {
368 execution_plan: Arc<ProcessSpansExecutionPlan>,
369}
370
371#[async_trait::async_trait]
372impl TableProvider for ProcessSpansTableProvider {
373 fn as_any(&self) -> &dyn Any {
374 self
375 }
376
377 fn schema(&self) -> SchemaRef {
378 self.execution_plan.schema()
379 }
380
381 fn table_type(&self) -> TableType {
382 TableType::Base
383 }
384
385 async fn scan(
386 &self,
387 _state: &dyn Session,
388 projection: Option<&Vec<usize>>,
389 _filters: &[Expr],
390 limit: Option<usize>,
391 ) -> DFResult<Arc<dyn ExecutionPlan>> {
392 let mut plan: Arc<dyn ExecutionPlan> = self.execution_plan.clone();
393 if let Some(projection) = projection {
394 let schema = plan.schema();
395 let projected_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
396 projection
397 .iter()
398 .map(|&i| {
399 let name = schema.field(i).name().clone();
400 let expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
401 &name, i,
402 ))
403 as Arc<dyn datafusion::physical_expr::PhysicalExpr>;
404 (expr, name)
405 })
406 .collect();
407 plan = Arc::new(ProjectionExec::try_new(projected_exprs, plan)?);
408 }
409 if let Some(fetch) = limit {
410 plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch)));
411 }
412 Ok(plan)
413 }
414}