micromegas/servers/
axum_utils.rs1#![allow(clippy::missing_errors_doc)]
5
6use anyhow::Result;
7use async_stream::stream;
8use axum::response::Response;
9use axum::{extract::Request, middleware::Next};
10use micromegas_analytics::response_writer::ResponseWriter;
11use micromegas_tracing::prelude::*;
12use std::sync::Arc;
13use tokio::sync::mpsc::Receiver;
14
15use super::http_utils::get_client_ip;
16
17pub async fn observability_middleware(request: Request, next: Next) -> Response {
19 let (parts, body) = request.into_parts();
20 let uri = parts.uri.clone();
21 let client_ip = get_client_ip(&parts.headers, &parts.extensions);
22 info!(
23 "request method={} uri={uri} client_ip={client_ip}",
24 parts.method
25 );
26 let begin_ticks = now();
27 let response = next.run(Request::from_parts(parts, body)).await;
28 let end_ticks = now();
29 let duration = end_ticks - begin_ticks;
30 imetric!("request_duration", "ticks", duration as u64);
31 info!(
32 "response status={} uri={uri} client_ip={client_ip}",
33 response.status()
34 );
35 response
36}
37
38pub fn make_body_from_channel_receiver(mut rx: Receiver<bytes::Bytes>) -> axum::body::Body {
40 let read_stream = stream! {
41 while let Some(value) = rx.recv().await{
42 yield Result::<bytes::Bytes>::Ok(value);
43 }
44 };
45 axum::body::Body::from_stream(read_stream)
46}
47
48pub fn stream_request<F, Fut>(callback: F) -> Response
54where
55 F: FnOnce(Arc<ResponseWriter>) -> Fut + 'static + Send,
56 Fut: std::future::Future<Output = Result<()>> + Send,
57{
58 let (tx, rx) = tokio::sync::mpsc::channel(10);
59 let writer = Arc::new(ResponseWriter::new(Some(tx)));
60 let response_body = make_body_from_channel_receiver(rx);
61 spawn_with_context(async move {
62 let service_call = callback(writer.clone());
63 if let Err(e) = service_call.await {
64 if writer.is_closed() {
65 info!("Error happened, but connection is closed: {e:?}");
66 } else {
67 error!("{e:?}");
69 if let Err(e) = writer.write_string(format!("{e:?}")).await {
70 info!("{e:?}");
72 }
73 }
74 }
75 });
76
77 Response::builder().status(200).body(response_body).unwrap()
78}