micromegas_tracing/spans/
instrumented_future.rs1use crate::dispatch::{
4 on_begin_async_named_scope, on_begin_async_scope, on_end_async_named_scope, on_end_async_scope,
5};
6use crate::spans::{SpanLocation, SpanMetadata};
7use pin_project::pin_project;
8use std::cell::UnsafeCell;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13thread_local! {
14 static ASYNC_CALL_STACK: UnsafeCell<Vec<u64>> = UnsafeCell ::new(vec![0]);
15}
16
17#[inline]
20pub fn current_span_id() -> u64 {
21 ASYNC_CALL_STACK.with(|stack_cell| {
22 let stack = unsafe { &*stack_cell.get() };
23 stack.last().copied().unwrap_or(0)
24 })
25}
26
27#[pin_project]
38pub struct SpanContextFuture<F> {
39 #[pin]
40 future: F,
41 parent_span: u64,
42 parent_depth: u32,
43}
44
45impl<F: Future> Future for SpanContextFuture<F> {
46 type Output = F::Output;
47
48 #[inline]
49 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let this = self.project();
51 ASYNC_CALL_STACK.with(|stack_cell| {
52 let stack = unsafe { &mut *stack_cell.get() };
53 let saved_len = stack.len();
54 let target_len = *this.parent_depth as usize + 1;
58 while stack.len() < target_len.saturating_sub(1) {
59 stack.push(0);
60 }
61 stack.push(*this.parent_span);
62 let res = this.future.poll(cx);
63 stack.truncate(saved_len);
64 res
65 })
66 }
67}
68
69#[cfg(feature = "tokio")]
96pub fn spawn_with_context<F>(future: F) -> tokio::task::JoinHandle<F::Output>
97where
98 F: Future + Send + 'static,
99 F::Output: Send + 'static,
100{
101 let (parent_span, parent_depth) = ASYNC_CALL_STACK.with(|stack_cell| {
102 let stack = unsafe { &*stack_cell.get() };
103 (
104 stack.last().copied().unwrap_or(0),
105 (stack.len().saturating_sub(1)) as u32,
106 )
107 });
108 tokio::spawn(SpanContextFuture {
109 future,
110 parent_span,
111 parent_depth,
112 })
113}
114
115pub trait InstrumentFuture: Future + Sized {
117 fn instrument(self, span_desc: &'static SpanMetadata) -> InstrumentedFuture<Self> {
119 InstrumentedFuture::new(self, span_desc)
120 }
121
122 #[doc(hidden)]
125 fn __instrument_named_internal(
126 self,
127 span_location: &'static SpanLocation,
128 name: &'static str,
129 ) -> InstrumentedNamedFuture<Self> {
130 InstrumentedNamedFuture::new(self, span_location, name)
131 }
132}
133
134impl<F: Future> InstrumentFuture for F {}
135
136#[pin_project]
138pub struct InstrumentedFuture<F> {
139 #[pin]
140 future: F,
141 desc: &'static SpanMetadata,
142 span_id: Option<u64>,
143 parent: u64,
145 depth: u32,
147}
148
149impl<F> InstrumentedFuture<F> {
150 pub fn new(future: F, desc: &'static SpanMetadata) -> Self {
152 let (parent, depth) = ASYNC_CALL_STACK.with(|stack_cell| {
153 let stack = unsafe { &*stack_cell.get() };
154 assert!(!stack.is_empty());
155 (
156 stack[stack.len() - 1],
157 (stack.len().saturating_sub(1)) as u32,
158 )
159 });
160 Self {
161 future,
162 desc,
163 span_id: None,
164 parent,
165 depth,
166 }
167 }
168}
169
170impl<F> Future for InstrumentedFuture<F>
171where
172 F: Future,
173{
174 type Output = F::Output;
175
176 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
177 let this = self.project();
178 let parent = *this.parent;
179 let depth = *this.depth;
180 ASYNC_CALL_STACK.with(|stack_cell| {
181 let stack = unsafe { &mut *stack_cell.get() };
182 assert!(!stack.is_empty());
183 match this.span_id {
184 Some(span_id) => {
185 stack.push(*span_id);
186 }
187 None => {
188 let span_id = on_begin_async_scope(this.desc, parent, depth);
190 stack.push(span_id);
191 *this.span_id = Some(span_id);
192 }
193 }
194 let res = match this.future.poll(cx) {
195 Poll::Ready(output) => {
196 if let Some(span_id) = *this.span_id {
198 on_end_async_scope(span_id, parent, this.desc, depth);
199 }
200 Poll::Ready(output)
201 }
202 Poll::Pending => Poll::Pending,
203 };
204 stack.pop();
205 res
206 })
207 }
208}
209
210#[pin_project]
212pub struct InstrumentedNamedFuture<F> {
213 #[pin]
214 future: F,
215 span_location: &'static SpanLocation,
216 name: &'static str,
217 span_id: Option<u64>,
218 parent: u64,
220 depth: u32,
222}
223
224impl<F> InstrumentedNamedFuture<F> {
225 pub fn new(future: F, span_location: &'static SpanLocation, name: &'static str) -> Self {
227 let (parent, depth) = ASYNC_CALL_STACK.with(|stack_cell| {
228 let stack = unsafe { &*stack_cell.get() };
229 assert!(!stack.is_empty());
230 (
231 stack[stack.len() - 1],
232 (stack.len().saturating_sub(1)) as u32,
233 )
234 });
235 Self {
236 future,
237 span_location,
238 name,
239 span_id: None,
240 parent,
241 depth,
242 }
243 }
244}
245
246impl<F> Future for InstrumentedNamedFuture<F>
247where
248 F: Future,
249{
250 type Output = F::Output;
251
252 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253 let this = self.project();
254 let parent = *this.parent;
255 let depth = *this.depth;
256 ASYNC_CALL_STACK.with(|stack_cell| {
257 let stack = unsafe { &mut *stack_cell.get() };
258 assert!(!stack.is_empty());
259 match this.span_id {
260 Some(span_id) => {
261 stack.push(*span_id);
262 }
263 None => {
264 let span_id =
266 on_begin_async_named_scope(this.span_location, this.name, parent, depth);
267 stack.push(span_id);
268 *this.span_id = Some(span_id);
269 }
270 }
271 let res = match this.future.poll(cx) {
272 Poll::Ready(output) => {
273 if let Some(span_id) = *this.span_id {
275 on_end_async_named_scope(
276 span_id,
277 parent,
278 this.span_location,
279 this.name,
280 depth,
281 );
282 }
283 Poll::Ready(output)
284 }
285 Poll::Pending => Poll::Pending,
286 };
287 stack.pop();
288 res
289 })
290 }
291}