micromegas_tracing/spans/
instrumented_future.rs

1//! Manual async span instrumentation using InstrumentedFuture wrapper
2
3use crate::dispatch::{
4    on_begin_async_named_scope, on_begin_async_scope, on_end_async_named_scope, on_end_async_scope,
5};
6use crate::spans::{SpanLocation, SpanMetadata};
7use pin_project::pin_project;
8use std::cell::UnsafeCell;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13thread_local! {
14    static ASYNC_CALL_STACK: UnsafeCell<Vec<u64>> = UnsafeCell ::new(vec![0]);
15}
16
17/// Returns the current span ID from the async call stack.
18/// Returns 0 (root) if no span is active.
19#[inline]
20pub fn current_span_id() -> u64 {
21    ASYNC_CALL_STACK.with(|stack_cell| {
22        let stack = unsafe { &*stack_cell.get() };
23        stack.last().copied().unwrap_or(0)
24    })
25}
26
27/// A future wrapper that establishes a span context on every poll.
28///
29/// Unlike an RAII guard, this pushes the parent span ID onto the
30/// thread-local async call stack before each poll and pops it after,
31/// which is correct across yield points and executor thread migration.
32///
33/// The stack is padded to match the parent's depth so that
34/// `InstrumentedFuture::new()` inside the spawned task computes
35/// `depth = stack.len() - 1 = parent_depth + 1`, preserving the
36/// logical nesting across spawn boundaries.
37#[pin_project]
38pub struct SpanContextFuture<F> {
39    #[pin]
40    future: F,
41    parent_span: u64,
42    parent_depth: u32,
43}
44
45impl<F: Future> Future for SpanContextFuture<F> {
46    type Output = F::Output;
47
48    #[inline]
49    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        let this = self.project();
51        ASYNC_CALL_STACK.with(|stack_cell| {
52            let stack = unsafe { &mut *stack_cell.get() };
53            let saved_len = stack.len();
54            // Pad stack so that children see depth = parent_depth.
55            // At the spawn point the stack had length parent_depth + 1;
56            // we recreate that, then push parent_span on top.
57            let target_len = *this.parent_depth as usize + 1;
58            while stack.len() < target_len.saturating_sub(1) {
59                stack.push(0);
60            }
61            stack.push(*this.parent_span);
62            let res = this.future.poll(cx);
63            stack.truncate(saved_len);
64            res
65        })
66    }
67}
68
69/// Spawns a future on the tokio runtime while preserving the current span context.
70///
71/// This is a wrapper around `tokio::spawn` that captures the current span ID
72/// before spawning and establishes it as the parent context in the spawned task.
73/// This ensures that instrumented async functions called within the spawned task
74/// will correctly report the spawning context as their parent.
75///
76/// The parent span ID is pushed onto the thread-local async call stack before
77/// each poll and popped after, so it works correctly across yield points and
78/// executor thread migration.
79///
80/// # Example
81/// ```ignore
82/// use micromegas_tracing::prelude::*;
83///
84/// #[span_fn]
85/// async fn parent_work() {
86///     // Spans created in child_work will show parent_work as their parent
87///     spawn_with_context(child_work()).await.unwrap();
88/// }
89///
90/// #[span_fn]
91/// async fn child_work() {
92///     // This span's parent will be parent_work, not root
93/// }
94/// ```
95#[cfg(feature = "tokio")]
96pub fn spawn_with_context<F>(future: F) -> tokio::task::JoinHandle<F::Output>
97where
98    F: Future + Send + 'static,
99    F::Output: Send + 'static,
100{
101    let (parent_span, parent_depth) = ASYNC_CALL_STACK.with(|stack_cell| {
102        let stack = unsafe { &*stack_cell.get() };
103        (
104            stack.last().copied().unwrap_or(0),
105            (stack.len().saturating_sub(1)) as u32,
106        )
107    });
108    tokio::spawn(SpanContextFuture {
109        future,
110        parent_span,
111        parent_depth,
112    })
113}
114
115/// Trait for adding instrumentation to futures
116pub trait InstrumentFuture: Future + Sized {
117    /// Instrument this future with the given span metadata
118    fn instrument(self, span_desc: &'static SpanMetadata) -> InstrumentedFuture<Self> {
119        InstrumentedFuture::new(self, span_desc)
120    }
121
122    /// Internal method for named instrumentation - do not use directly.
123    /// Use the `instrument_named!` macro for method-like syntax instead.
124    #[doc(hidden)]
125    fn __instrument_named_internal(
126        self,
127        span_location: &'static SpanLocation,
128        name: &'static str,
129    ) -> InstrumentedNamedFuture<Self> {
130        InstrumentedNamedFuture::new(self, span_location, name)
131    }
132}
133
134impl<F: Future> InstrumentFuture for F {}
135
136/// A wrapper that instruments a future with async span tracing
137#[pin_project]
138pub struct InstrumentedFuture<F> {
139    #[pin]
140    future: F,
141    desc: &'static SpanMetadata,
142    span_id: Option<u64>,
143    /// Parent span ID captured at future creation time
144    parent: u64,
145    /// Depth captured at future creation time
146    depth: u32,
147}
148
149impl<F> InstrumentedFuture<F> {
150    /// Create a new instrumented future
151    pub fn new(future: F, desc: &'static SpanMetadata) -> Self {
152        let (parent, depth) = ASYNC_CALL_STACK.with(|stack_cell| {
153            let stack = unsafe { &*stack_cell.get() };
154            assert!(!stack.is_empty());
155            (
156                stack[stack.len() - 1],
157                (stack.len().saturating_sub(1)) as u32,
158            )
159        });
160        Self {
161            future,
162            desc,
163            span_id: None,
164            parent,
165            depth,
166        }
167    }
168}
169
170impl<F> Future for InstrumentedFuture<F>
171where
172    F: Future,
173{
174    type Output = F::Output;
175
176    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
177        let this = self.project();
178        let parent = *this.parent;
179        let depth = *this.depth;
180        ASYNC_CALL_STACK.with(|stack_cell| {
181            let stack = unsafe { &mut *stack_cell.get() };
182            assert!(!stack.is_empty());
183            match this.span_id {
184                Some(span_id) => {
185                    stack.push(*span_id);
186                }
187                None => {
188                    // Begin the async span and store the span ID
189                    let span_id = on_begin_async_scope(this.desc, parent, depth);
190                    stack.push(span_id);
191                    *this.span_id = Some(span_id);
192                }
193            }
194            let res = match this.future.poll(cx) {
195                Poll::Ready(output) => {
196                    // End the async span when the future completes
197                    if let Some(span_id) = *this.span_id {
198                        on_end_async_scope(span_id, parent, this.desc, depth);
199                    }
200                    Poll::Ready(output)
201                }
202                Poll::Pending => Poll::Pending,
203            };
204            stack.pop();
205            res
206        })
207    }
208}
209
210/// A wrapper that instruments a future with named async span tracing
211#[pin_project]
212pub struct InstrumentedNamedFuture<F> {
213    #[pin]
214    future: F,
215    span_location: &'static SpanLocation,
216    name: &'static str,
217    span_id: Option<u64>,
218    /// Parent span ID captured at future creation time
219    parent: u64,
220    /// Depth captured at future creation time
221    depth: u32,
222}
223
224impl<F> InstrumentedNamedFuture<F> {
225    /// Create a new instrumented named future
226    pub fn new(future: F, span_location: &'static SpanLocation, name: &'static str) -> Self {
227        let (parent, depth) = ASYNC_CALL_STACK.with(|stack_cell| {
228            let stack = unsafe { &*stack_cell.get() };
229            assert!(!stack.is_empty());
230            (
231                stack[stack.len() - 1],
232                (stack.len().saturating_sub(1)) as u32,
233            )
234        });
235        Self {
236            future,
237            span_location,
238            name,
239            span_id: None,
240            parent,
241            depth,
242        }
243    }
244}
245
246impl<F> Future for InstrumentedNamedFuture<F>
247where
248    F: Future,
249{
250    type Output = F::Output;
251
252    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253        let this = self.project();
254        let parent = *this.parent;
255        let depth = *this.depth;
256        ASYNC_CALL_STACK.with(|stack_cell| {
257            let stack = unsafe { &mut *stack_cell.get() };
258            assert!(!stack.is_empty());
259            match this.span_id {
260                Some(span_id) => {
261                    stack.push(*span_id);
262                }
263                None => {
264                    // Begin the async named span and store the span ID
265                    let span_id =
266                        on_begin_async_named_scope(this.span_location, this.name, parent, depth);
267                    stack.push(span_id);
268                    *this.span_id = Some(span_id);
269                }
270            }
271            let res = match this.future.poll(cx) {
272                Poll::Ready(output) => {
273                    // End the async named span when the future completes
274                    if let Some(span_id) = *this.span_id {
275                        on_end_async_named_scope(
276                            span_id,
277                            parent,
278                            this.span_location,
279                            this.name,
280                            depth,
281                        );
282                    }
283                    Poll::Ready(output)
284                }
285                Poll::Pending => Poll::Pending,
286            };
287            stack.pop();
288            res
289        })
290    }
291}