micromegas/servers/
ingestion.rs

1use axum::Extension;
2use axum::Router;
3use axum::body::Body;
4use axum::http::{Response, StatusCode};
5use axum::response::IntoResponse;
6use axum::routing::post;
7use micromegas_auth::types::AuthProvider;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_ingestion::web_ingestion_service::{IngestionServiceError, WebIngestionService};
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum IngestionError {
19    #[error("Bad request: {0}")]
20    BadRequest(String),
21
22    #[error("Internal server error: {0}")]
23    Internal(String),
24}
25
26impl IntoResponse for IngestionError {
27    fn into_response(self) -> Response<Body> {
28        let (status, category, detail) = match self {
29            IngestionError::BadRequest(msg) => (StatusCode::BAD_REQUEST, "Bad request", msg),
30            IngestionError::Internal(msg) => (
31                StatusCode::INTERNAL_SERVER_ERROR,
32                "Internal server error",
33                msg,
34            ),
35        };
36        error!("{status}: {detail}");
37        (status, category).into_response()
38    }
39}
40
41impl From<IngestionServiceError> for IngestionError {
42    fn from(err: IngestionServiceError) -> Self {
43        match err {
44            IngestionServiceError::ParseError(msg) => IngestionError::BadRequest(msg),
45            IngestionServiceError::DatabaseError(msg) => IngestionError::Internal(msg),
46            IngestionServiceError::StorageError(msg) => IngestionError::Internal(msg),
47        }
48    }
49}
50
51/// Handles requests to insert process information.
52///
53/// Returns 400 for malformed CBOR, 500 for database errors.
54pub async fn insert_process_request(
55    Extension(service): Extension<Arc<WebIngestionService>>,
56    body: bytes::Bytes,
57) -> Result<(), IngestionError> {
58    service.insert_process(body).await.map_err(Into::into)
59}
60
61/// Handles requests to insert stream information.
62///
63/// Returns 400 for malformed CBOR, 500 for database errors.
64pub async fn insert_stream_request(
65    Extension(service): Extension<Arc<WebIngestionService>>,
66    body: bytes::Bytes,
67) -> Result<(), IngestionError> {
68    service.insert_stream(body).await.map_err(Into::into)
69}
70
71/// Handles requests to insert block information.
72///
73/// Returns 400 for empty body or malformed CBOR, 500 for database/storage errors.
74pub async fn insert_block_request(
75    Extension(service): Extension<Arc<WebIngestionService>>,
76    body: bytes::Bytes,
77) -> Result<(), IngestionError> {
78    if body.is_empty() {
79        return Err(IngestionError::BadRequest("empty body".to_string()));
80    }
81    service.insert_block(body).await.map_err(Into::into)
82}
83
84/// Registers the ingestion routes with the given Axum `Router`.
85///
86/// This function adds routes for `/ingestion/insert_process`,
87/// `/ingestion/insert_stream`, and `/ingestion/insert_block`.
88pub fn register_routes(router: Router) -> Router {
89    router
90        .route("/ingestion/insert_process", post(insert_process_request))
91        .route("/ingestion/insert_stream", post(insert_stream_request))
92        .route("/ingestion/insert_block", post(insert_block_request))
93}
94
95/// Assemble and serve the HTTP ingestion endpoint.
96///
97/// Binds `listen_addr`, wires the ingestion routes + OTLP routes, applies
98/// the supplied `auth_provider` (or runs open when `None`), and shuts down
99/// gracefully when `shutdown` resolves.
100pub async fn serve_ingestion(
101    listen_addr: SocketAddr,
102    lake: DataLakeConnection,
103    auth_provider: Option<Arc<dyn AuthProvider>>,
104    shutdown: impl Future<Output = ()> + Send + 'static,
105    grace: Duration,
106) -> anyhow::Result<()> {
107    use axum::extract::DefaultBodyLimit;
108    use axum::middleware;
109    use axum::routing::get;
110    use micromegas_auth::axum::auth_middleware;
111    use tower_http::limit::RequestBodyLimitLayer;
112
113    use super::axum_utils::observability_middleware;
114    use super::shutdown::serve_axum_with_graceful_shutdown;
115
116    let service = Arc::new(WebIngestionService::new(lake));
117
118    let health_router =
119        Router::new().route("/health", get(|| async { axum::http::StatusCode::OK }));
120
121    let mut protected_app = register_routes(Router::new())
122        .merge(super::otlp::otlp_router())
123        .layer(DefaultBodyLimit::disable())
124        .layer(RequestBodyLimitLayer::new(100 * 1024 * 1024))
125        .layer(Extension(service));
126
127    let auth_enabled = auth_provider.is_some();
128    if let Some(provider) = auth_provider {
129        info!("Ingestion: authentication enabled");
130        protected_app = protected_app.layer(middleware::from_fn(move |req, next| {
131            auth_middleware(provider.clone(), req, next)
132        }));
133    } else {
134        warn!("Ingestion: authentication disabled — development mode only");
135    }
136
137    let app = health_router
138        .merge(protected_app)
139        .layer(middleware::from_fn(observability_middleware));
140
141    let listener = tokio::net::TcpListener::bind(listen_addr)
142        .await
143        .map_err(|e| anyhow::anyhow!("ingestion: binding to {listen_addr}: {e}"))?;
144    info!("Ingestion serving on {listen_addr} authentication={auth_enabled}");
145
146    serve_axum_with_graceful_shutdown(
147        listener,
148        app.into_make_service_with_connect_info::<SocketAddr>(),
149        shutdown,
150        grace,
151    )
152    .await?;
153
154    Ok(())
155}