micromegas/servers/
shutdown.rs

1use std::convert::Infallible;
2use std::future::Future;
3use std::time::Duration;
4
5use anyhow::Result;
6use micromegas_tracing::prelude::*;
7use tokio::net::TcpListener;
8use tokio::sync::watch;
9use tower::Service;
10
11/// Completes when SIGTERM is received. On non-unix targets, never completes
12/// (preserves current behavior; production deploys are Linux/ECS).
13#[cfg(unix)]
14pub async fn wait_for_sigterm() {
15    use tokio::signal::unix::{SignalKind, signal};
16    let mut sigterm = signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
17    sigterm.recv().await;
18}
19
20#[cfg(not(unix))]
21pub async fn wait_for_sigterm() {
22    std::future::pending::<()>().await
23}
24
25/// Fans a shutdown future out to N consumers via a watch channel.
26///
27/// `subscribe()` returns a future that completes once the original shutdown
28/// future has fired — usable as the drain trigger for axum/tonic and as the
29/// deadline arm. Subscribers created after the signal has fired complete
30/// immediately.
31pub struct ShutdownFanout {
32    tx: watch::Sender<bool>,
33}
34
35impl ShutdownFanout {
36    pub fn new<F>(shutdown: F) -> Self
37    where
38        F: Future<Output = ()> + Send + 'static,
39    {
40        let (tx, _rx) = watch::channel(false);
41        let tx2 = tx.clone();
42        tokio::spawn(async move {
43            shutdown.await;
44            let _ = tx2.send(true);
45        });
46        Self { tx }
47    }
48
49    /// Returns a future that completes once the shutdown signal has fired.
50    pub fn subscribe(&self) -> impl Future<Output = ()> + Send + 'static {
51        let mut rx = self.tx.subscribe();
52        async move {
53            let _ = rx.wait_for(|v| *v).await;
54        }
55    }
56}
57
58/// Serves an Axum application, draining in-flight requests when `shutdown` fires.
59///
60/// Logs when the signal is received, when drain completes cleanly, or when the
61/// grace period expires with work still in flight.
62pub async fn serve_axum_with_graceful_shutdown<M, S, F>(
63    listener: TcpListener,
64    make_service: M,
65    shutdown: F,
66    grace: Duration,
67) -> Result<()>
68where
69    M: for<'a> Service<
70            axum::serve::IncomingStream<'a, TcpListener>,
71            Error = Infallible,
72            Response = S,
73        > + Send
74        + 'static,
75    for<'a> <M as Service<axum::serve::IncomingStream<'a, TcpListener>>>::Future: Send,
76    S: Service<axum::extract::Request, Response = axum::response::Response, Error = Infallible>
77        + Clone
78        + Send
79        + 'static,
80    S::Future: Send,
81    F: Future<Output = ()> + Send + 'static,
82{
83    use std::future::IntoFuture;
84
85    let grace_secs = grace.as_secs();
86    let fanout = ShutdownFanout::new(shutdown);
87
88    let drain = fanout.subscribe();
89    let axum_shutdown = async move {
90        drain.await;
91        info!("draining, grace={grace_secs}s");
92    };
93
94    let serve_future = axum::serve(listener, make_service)
95        .with_graceful_shutdown(axum_shutdown)
96        .into_future();
97
98    let deadline = {
99        let d = fanout.subscribe();
100        async move {
101            d.await;
102            tokio::time::sleep(grace).await;
103        }
104    };
105
106    tokio::select! {
107        res = serve_future => {
108            info!("drain completed");
109            res.map_err(anyhow::Error::from)
110        }
111        _ = deadline => {
112            warn!("grace period of {grace_secs}s elapsed with work still in flight");
113            Ok(())
114        }
115    }
116}