micromegas_analytics/lakehouse/
process_spans_table_function.rs

1use super::{
2    lakehouse_context::LakehouseContext, partition_cache::QueryPartitionProvider,
3    process_streams::get_process_thread_list, session_configurator::NoOpSessionConfigurator,
4    view_factory::ViewFactory,
5};
6use crate::{dfext::expressions::exp_to_string, span_table::get_spans_schema, time::TimeRange};
7use async_stream::try_stream;
8use datafusion::{
9    arrow::{
10        array::{ArrayRef, RecordBatch, StringDictionaryBuilder},
11        datatypes::{DataType, Field, Int16Type, Schema, SchemaRef},
12    },
13    catalog::{Session, TableFunctionImpl, TableProvider},
14    common::{Result as DFResult, plan_err},
15    execution::{SendableRecordBatchStream, TaskContext},
16    logical_expr::{Expr, TableType},
17    physical_expr::EquivalenceProperties,
18    physical_plan::{
19        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
20        execution_plan::{Boundedness, EmissionType},
21        limit::GlobalLimitExec,
22        projection::ProjectionExec,
23        stream::RecordBatchStreamAdapter,
24    },
25};
26use futures::{StreamExt, TryStreamExt};
27use micromegas_tracing::prelude::*;
28use std::{
29    any::Any,
30    fmt::{self, Debug, Formatter},
31    sync::Arc,
32};
33
34/// Span types to include in the output
35#[derive(Debug, Clone, Copy)]
36pub enum SpanTypes {
37    Thread,
38    Async,
39    Both,
40}
41
42fn output_schema() -> SchemaRef {
43    let mut fields = vec![
44        Field::new(
45            "stream_id",
46            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
47            false,
48        ),
49        Field::new(
50            "thread_name",
51            DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
52            false,
53        ),
54    ];
55    fields.extend(get_spans_schema().fields.iter().map(|f| f.as_ref().clone()));
56    Arc::new(Schema::new(fields))
57}
58
59fn augment_batch(
60    batch: &RecordBatch,
61    schema: SchemaRef,
62    stream_id: &str,
63    thread_name: &str,
64) -> DFResult<RecordBatch> {
65    let n = batch.num_rows();
66    let mut stream_id_builder = StringDictionaryBuilder::<Int16Type>::new();
67    let mut thread_name_builder = StringDictionaryBuilder::<Int16Type>::new();
68    stream_id_builder.append_values(stream_id, n);
69    thread_name_builder.append_values(thread_name, n);
70    let mut columns: Vec<ArrayRef> = vec![
71        Arc::new(stream_id_builder.finish()),
72        Arc::new(thread_name_builder.finish()),
73    ];
74    columns.extend(batch.columns().iter().cloned());
75    RecordBatch::try_new(schema, columns).map_err(Into::into)
76}
77
78// --- TableFunction ---
79
80#[derive(Debug)]
81pub struct ProcessSpansTableFunction {
82    lakehouse: Arc<LakehouseContext>,
83    view_factory: Arc<ViewFactory>,
84    part_provider: Arc<dyn QueryPartitionProvider>,
85    query_range: Option<TimeRange>,
86}
87
88impl ProcessSpansTableFunction {
89    pub fn new(
90        lakehouse: Arc<LakehouseContext>,
91        view_factory: Arc<ViewFactory>,
92        part_provider: Arc<dyn QueryPartitionProvider>,
93        query_range: Option<TimeRange>,
94    ) -> Self {
95        Self {
96            lakehouse,
97            view_factory,
98            part_provider,
99            query_range,
100        }
101    }
102}
103
104impl TableFunctionImpl for ProcessSpansTableFunction {
105    #[span_fn]
106    fn call(&self, exprs: &[Expr]) -> datafusion::error::Result<Arc<dyn TableProvider>> {
107        let arg1 = exprs.first().map(exp_to_string);
108        let Some(Ok(process_id)) = arg1 else {
109            return plan_err!(
110                "First argument to process_spans must be a string (the process ID), given {:?}",
111                arg1
112            );
113        };
114
115        let arg2 = exprs.get(1).map(exp_to_string);
116        let Some(Ok(span_types_str)) = arg2 else {
117            return plan_err!(
118                "Second argument to process_spans must be a string ('thread', 'async', or 'both'), given {:?}",
119                arg2
120            );
121        };
122
123        let span_types = match span_types_str.as_str() {
124            "thread" => SpanTypes::Thread,
125            "async" => SpanTypes::Async,
126            "both" => SpanTypes::Both,
127            _ => {
128                return plan_err!(
129                    "span_types must be 'thread', 'async', or 'both', given: {span_types_str}"
130                );
131            }
132        };
133
134        let schema = output_schema();
135        let execution_plan = Arc::new(ProcessSpansExecutionPlan::new(
136            schema,
137            process_id,
138            span_types,
139            self.query_range,
140            self.lakehouse.clone(),
141            self.view_factory.clone(),
142            self.part_provider.clone(),
143        ));
144
145        Ok(Arc::new(ProcessSpansTableProvider { execution_plan }))
146    }
147}
148
149// --- ExecutionPlan ---
150
151pub struct ProcessSpansExecutionPlan {
152    schema: SchemaRef,
153    process_id: String,
154    span_types: SpanTypes,
155    query_range: Option<TimeRange>,
156    lakehouse: Arc<LakehouseContext>,
157    view_factory: Arc<ViewFactory>,
158    part_provider: Arc<dyn QueryPartitionProvider>,
159    properties: PlanProperties,
160}
161
162impl ProcessSpansExecutionPlan {
163    fn new(
164        schema: SchemaRef,
165        process_id: String,
166        span_types: SpanTypes,
167        query_range: Option<TimeRange>,
168        lakehouse: Arc<LakehouseContext>,
169        view_factory: Arc<ViewFactory>,
170        part_provider: Arc<dyn QueryPartitionProvider>,
171    ) -> Self {
172        let properties = PlanProperties::new(
173            EquivalenceProperties::new(schema.clone()),
174            Partitioning::UnknownPartitioning(1),
175            EmissionType::Final,
176            Boundedness::Bounded,
177        );
178        Self {
179            schema,
180            process_id,
181            span_types,
182            query_range,
183            lakehouse,
184            view_factory,
185            part_provider,
186            properties,
187        }
188    }
189}
190
191impl Debug for ProcessSpansExecutionPlan {
192    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
193        f.debug_struct("ProcessSpansExecutionPlan")
194            .field("process_id", &self.process_id)
195            .field("span_types", &self.span_types)
196            .finish()
197    }
198}
199
200impl DisplayAs for ProcessSpansExecutionPlan {
201    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
202        write!(
203            f,
204            "ProcessSpansExecutionPlan: process_id={}, span_types={:?}",
205            self.process_id, self.span_types
206        )
207    }
208}
209
210impl ExecutionPlan for ProcessSpansExecutionPlan {
211    fn name(&self) -> &str {
212        "ProcessSpansExecutionPlan"
213    }
214
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn schema(&self) -> SchemaRef {
220        self.schema.clone()
221    }
222
223    fn properties(&self) -> &PlanProperties {
224        &self.properties
225    }
226
227    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228        vec![]
229    }
230
231    fn with_new_children(
232        self: Arc<Self>,
233        _children: Vec<Arc<dyn ExecutionPlan>>,
234    ) -> DFResult<Arc<dyn ExecutionPlan>> {
235        Ok(self)
236    }
237
238    #[span_fn]
239    fn execute(
240        &self,
241        _partition: usize,
242        _context: Arc<TaskContext>,
243    ) -> DFResult<SendableRecordBatchStream> {
244        let schema = self.schema.clone();
245        let stream_schema = schema.clone();
246        let process_id = self.process_id.clone();
247        let span_types = self.span_types;
248        let query_range = self.query_range;
249        let lakehouse = self.lakehouse.clone();
250        let view_factory = self.view_factory.clone();
251        let part_provider = self.part_provider.clone();
252
253        let record_batch_stream = try_stream! {
254            let schema = stream_schema;
255            let ctx = super::query::make_session_context(
256                lakehouse,
257                part_provider,
258                query_range,
259                view_factory,
260                Arc::new(NoOpSessionConfigurator),
261            )
262            .await
263            .map_err(|e| datafusion::error::DataFusionError::Execution(
264                format!("Failed to create session context: {e}"),
265            ))?;
266
267            // Thread spans
268            if matches!(span_types, SpanTypes::Thread | SpanTypes::Both) {
269                let threads = get_process_thread_list(&process_id, &ctx)
270                    .await
271                    .map_err(|e| datafusion::error::DataFusionError::Execution(
272                        format!("Failed to get thread list: {e}"),
273                    ))?;
274
275                let max_concurrent = std::thread::available_parallelism()
276                    .map(|n| n.get())
277                    .unwrap_or(4);
278
279                let queries: Vec<(String, String, String)> = threads
280                    .iter()
281                    .map(|(stream_id, _thread_id, display_name)| {
282                        let sql = format!(
283                            "SELECT * FROM view_instance('thread_spans', '{stream_id}')"
284                        );
285                        (stream_id.clone(), display_name.clone(), sql)
286                    })
287                    .collect();
288
289                let stream_results: Vec<(String, String, SendableRecordBatchStream)> =
290                    futures::stream::iter(queries)
291                        .map(|(stream_id, thread_name, sql)| {
292                            let ctx = ctx.clone();
293                            async move {
294                                spawn_with_context(async move {
295                                    let df = ctx.sql(&sql).await?;
296                                    let s = df.execute_stream().await?;
297                                    Ok::<_, anyhow::Error>((stream_id, thread_name, s))
298                                })
299                                .await?
300                            }
301                        })
302                        .buffered(max_concurrent)
303                        .try_collect()
304                        .await
305                        .map_err(|e| datafusion::error::DataFusionError::Execution(
306                            format!("Failed to query thread spans: {e}"),
307                        ))?;
308
309                for (stream_id, thread_name, mut data_stream) in stream_results {
310                    while let Some(batch) = data_stream.try_next().await? {
311                        let augmented = augment_batch(&batch, schema.clone(), &stream_id, &thread_name)?;
312                        yield augmented;
313                    }
314                }
315            }
316
317            // Async spans
318            if matches!(span_types, SpanTypes::Async | SpanTypes::Both) {
319                let async_sql = format!(
320                    "SELECT \
321                        b.span_id as id, \
322                        b.parent_span_id as parent, \
323                        b.depth, \
324                        b.hash, \
325                        b.time as \"begin\", \
326                        e.time as \"end\", \
327                        arrow_cast(e.time, 'Int64') - arrow_cast(b.time, 'Int64') as duration, \
328                        b.name, \
329                        b.target, \
330                        b.filename, \
331                        b.line \
332                    FROM (SELECT * FROM view_instance('async_events', '{process_id}') \
333                          WHERE event_type = 'begin') b \
334                    INNER JOIN (SELECT * FROM view_instance('async_events', '{process_id}') \
335                          WHERE event_type = 'end') e \
336                    ON b.span_id = e.span_id \
337                    WHERE b.time < e.time \
338                    ORDER BY b.time"
339                );
340
341                let df = ctx.sql(&async_sql).await
342                    .map_err(|e| datafusion::error::DataFusionError::Execution(
343                        format!("Failed to query async spans: {e}"),
344                    ))?;
345                let mut async_stream = df.execute_stream().await
346                    .map_err(|e| datafusion::error::DataFusionError::Execution(
347                        format!("Failed to execute async spans stream: {e}"),
348                    ))?;
349
350                while let Some(batch) = async_stream.try_next().await? {
351                    let augmented = augment_batch(&batch, schema.clone(), "", "async")?;
352                    yield augmented;
353                }
354            }
355        };
356
357        Ok(Box::pin(RecordBatchStreamAdapter::new(
358            schema,
359            record_batch_stream,
360        )))
361    }
362}
363
364// --- TableProvider ---
365
366#[derive(Debug)]
367struct ProcessSpansTableProvider {
368    execution_plan: Arc<ProcessSpansExecutionPlan>,
369}
370
371#[async_trait::async_trait]
372impl TableProvider for ProcessSpansTableProvider {
373    fn as_any(&self) -> &dyn Any {
374        self
375    }
376
377    fn schema(&self) -> SchemaRef {
378        self.execution_plan.schema()
379    }
380
381    fn table_type(&self) -> TableType {
382        TableType::Base
383    }
384
385    async fn scan(
386        &self,
387        _state: &dyn Session,
388        projection: Option<&Vec<usize>>,
389        _filters: &[Expr],
390        limit: Option<usize>,
391    ) -> DFResult<Arc<dyn ExecutionPlan>> {
392        let mut plan: Arc<dyn ExecutionPlan> = self.execution_plan.clone();
393        if let Some(projection) = projection {
394            let schema = plan.schema();
395            let projected_exprs: Vec<(Arc<dyn datafusion::physical_expr::PhysicalExpr>, String)> =
396                projection
397                    .iter()
398                    .map(|&i| {
399                        let name = schema.field(i).name().clone();
400                        let expr = Arc::new(datafusion::physical_expr::expressions::Column::new(
401                            &name, i,
402                        ))
403                            as Arc<dyn datafusion::physical_expr::PhysicalExpr>;
404                        (expr, name)
405                    })
406                    .collect();
407            plan = Arc::new(ProjectionExec::try_new(projected_exprs, plan)?);
408        }
409        if let Some(fetch) = limit {
410            plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(fetch)));
411        }
412        Ok(plan)
413    }
414}