micromegas/servers/
shutdown.rs1use std::convert::Infallible;
2use std::future::Future;
3use std::time::Duration;
4
5use anyhow::Result;
6use micromegas_tracing::prelude::*;
7use tokio::net::TcpListener;
8use tokio::sync::watch;
9use tower::Service;
10
11#[cfg(unix)]
14pub async fn wait_for_sigterm() {
15 use tokio::signal::unix::{SignalKind, signal};
16 let mut sigterm = signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
17 sigterm.recv().await;
18}
19
20#[cfg(not(unix))]
21pub async fn wait_for_sigterm() {
22 std::future::pending::<()>().await
23}
24
25pub struct ShutdownFanout {
32 tx: watch::Sender<bool>,
33}
34
35impl ShutdownFanout {
36 pub fn new<F>(shutdown: F) -> Self
37 where
38 F: Future<Output = ()> + Send + 'static,
39 {
40 let (tx, _rx) = watch::channel(false);
41 let tx2 = tx.clone();
42 tokio::spawn(async move {
43 shutdown.await;
44 let _ = tx2.send(true);
45 });
46 Self { tx }
47 }
48
49 pub fn subscribe(&self) -> impl Future<Output = ()> + Send + 'static {
51 let mut rx = self.tx.subscribe();
52 async move {
53 let _ = rx.wait_for(|v| *v).await;
54 }
55 }
56}
57
58pub async fn serve_axum_with_graceful_shutdown<M, S, F>(
63 listener: TcpListener,
64 make_service: M,
65 shutdown: F,
66 grace: Duration,
67) -> Result<()>
68where
69 M: for<'a> Service<
70 axum::serve::IncomingStream<'a, TcpListener>,
71 Error = Infallible,
72 Response = S,
73 > + Send
74 + 'static,
75 for<'a> <M as Service<axum::serve::IncomingStream<'a, TcpListener>>>::Future: Send,
76 S: Service<axum::extract::Request, Response = axum::response::Response, Error = Infallible>
77 + Clone
78 + Send
79 + 'static,
80 S::Future: Send,
81 F: Future<Output = ()> + Send + 'static,
82{
83 use std::future::IntoFuture;
84
85 let grace_secs = grace.as_secs();
86 let fanout = ShutdownFanout::new(shutdown);
87
88 let drain = fanout.subscribe();
89 let axum_shutdown = async move {
90 drain.await;
91 info!("draining, grace={grace_secs}s");
92 };
93
94 let serve_future = axum::serve(listener, make_service)
95 .with_graceful_shutdown(axum_shutdown)
96 .into_future();
97
98 let deadline = {
99 let d = fanout.subscribe();
100 async move {
101 d.await;
102 tokio::time::sleep(grace).await;
103 }
104 };
105
106 tokio::select! {
107 res = serve_future => {
108 info!("drain completed");
109 res.map_err(anyhow::Error::from)
110 }
111 _ = deadline => {
112 warn!("grace period of {grace_secs}s elapsed with work still in flight");
113 Ok(())
114 }
115 }
116}