micromegas/servers/
axum_utils.rs

1//! axum-utils : observability middleware
2
3// crate-specific lint exceptions:
4#![allow(clippy::missing_errors_doc)]
5
6use anyhow::Result;
7use async_stream::stream;
8use axum::response::Response;
9use axum::{extract::Request, middleware::Next};
10use micromegas_analytics::response_writer::ResponseWriter;
11use micromegas_tracing::prelude::*;
12use std::sync::Arc;
13use tokio::sync::mpsc::Receiver;
14
15use super::http_utils::get_client_ip;
16
17/// observability_middleware logs http requests, their duration and status code
18pub async fn observability_middleware(request: Request, next: Next) -> Response {
19    let (parts, body) = request.into_parts();
20    let uri = parts.uri.clone();
21    let client_ip = get_client_ip(&parts.headers, &parts.extensions);
22    info!(
23        "request method={} uri={uri} client_ip={client_ip}",
24        parts.method
25    );
26    let begin_ticks = now();
27    let response = next.run(Request::from_parts(parts, body)).await;
28    let end_ticks = now();
29    let duration = end_ticks - begin_ticks;
30    imetric!("request_duration", "ticks", duration as u64);
31    info!(
32        "response status={} uri={uri} client_ip={client_ip}",
33        response.status()
34    );
35    response
36}
37
38/// Makes a streaming body from a Tokio MPSC receiver.
39pub fn make_body_from_channel_receiver(mut rx: Receiver<bytes::Bytes>) -> axum::body::Body {
40    let read_stream = stream! {
41        while let Some(value) = rx.recv().await{
42                yield Result::<bytes::Bytes>::Ok(value);
43        }
44    };
45    axum::body::Body::from_stream(read_stream)
46}
47
48/// Streams a response by executing a callback that writes to a `ResponseWriter`.
49///
50/// This function creates a channel and a `ResponseWriter` that writes to this channel.
51/// The `callback` is then executed in a separate Tokio task, allowing it to stream data
52/// back to the client as it becomes available.
53pub fn stream_request<F, Fut>(callback: F) -> Response
54where
55    F: FnOnce(Arc<ResponseWriter>) -> Fut + 'static + Send,
56    Fut: std::future::Future<Output = Result<()>> + Send,
57{
58    let (tx, rx) = tokio::sync::mpsc::channel(10);
59    let writer = Arc::new(ResponseWriter::new(Some(tx)));
60    let response_body = make_body_from_channel_receiver(rx);
61    spawn_with_context(async move {
62        let service_call = callback(writer.clone());
63        if let Err(e) = service_call.await {
64            if writer.is_closed() {
65                info!("Error happened, but connection is closed: {e:?}");
66            } else {
67                // the connection is live, this looks like a real error
68                error!("{e:?}");
69                if let Err(e) = writer.write_string(format!("{e:?}")).await {
70                    //error writing can happen, probably not a big deal
71                    info!("{e:?}");
72                }
73            }
74        }
75    });
76
77    Response::builder().status(200).body(response_body).unwrap()
78}