micromegas/servers/
ingestion.rs

1use axum::Extension;
2use axum::Router;
3use axum::body::Body;
4use axum::http::{Response, StatusCode};
5use axum::response::IntoResponse;
6use axum::routing::post;
7use micromegas_ingestion::web_ingestion_service::{IngestionServiceError, WebIngestionService};
8use micromegas_tracing::prelude::*;
9use std::sync::Arc;
10use thiserror::Error;
11
12#[derive(Error, Debug)]
13pub enum IngestionError {
14    #[error("Bad request: {0}")]
15    BadRequest(String),
16
17    #[error("Internal server error: {0}")]
18    Internal(String),
19}
20
21impl IntoResponse for IngestionError {
22    fn into_response(self) -> Response<Body> {
23        let (status, category, detail) = match self {
24            IngestionError::BadRequest(msg) => (StatusCode::BAD_REQUEST, "Bad request", msg),
25            IngestionError::Internal(msg) => (
26                StatusCode::INTERNAL_SERVER_ERROR,
27                "Internal server error",
28                msg,
29            ),
30        };
31        error!("{status}: {detail}");
32        (status, category).into_response()
33    }
34}
35
36impl From<IngestionServiceError> for IngestionError {
37    fn from(err: IngestionServiceError) -> Self {
38        match err {
39            IngestionServiceError::ParseError(msg) => IngestionError::BadRequest(msg),
40            IngestionServiceError::DatabaseError(msg) => IngestionError::Internal(msg),
41            IngestionServiceError::StorageError(msg) => IngestionError::Internal(msg),
42        }
43    }
44}
45
46/// Handles requests to insert process information.
47///
48/// Returns 400 for malformed CBOR, 500 for database errors.
49pub async fn insert_process_request(
50    Extension(service): Extension<Arc<WebIngestionService>>,
51    body: bytes::Bytes,
52) -> Result<(), IngestionError> {
53    service.insert_process(body).await.map_err(Into::into)
54}
55
56/// Handles requests to insert stream information.
57///
58/// Returns 400 for malformed CBOR, 500 for database errors.
59pub async fn insert_stream_request(
60    Extension(service): Extension<Arc<WebIngestionService>>,
61    body: bytes::Bytes,
62) -> Result<(), IngestionError> {
63    service.insert_stream(body).await.map_err(Into::into)
64}
65
66/// Handles requests to insert block information.
67///
68/// Returns 400 for empty body or malformed CBOR, 500 for database/storage errors.
69pub async fn insert_block_request(
70    Extension(service): Extension<Arc<WebIngestionService>>,
71    body: bytes::Bytes,
72) -> Result<(), IngestionError> {
73    if body.is_empty() {
74        return Err(IngestionError::BadRequest("empty body".to_string()));
75    }
76    service.insert_block(body).await.map_err(Into::into)
77}
78
79/// Registers the ingestion routes with the given Axum `Router`.
80///
81/// This function adds routes for `/ingestion/insert_process`,
82/// `/ingestion/insert_stream`, and `/ingestion/insert_block`.
83pub fn register_routes(router: Router) -> Router {
84    router
85        .route("/ingestion/insert_process", post(insert_process_request))
86        .route("/ingestion/insert_stream", post(insert_stream_request))
87        .route("/ingestion/insert_block", post(insert_block_request))
88}