micromegas/servers/
ingestion.rs1use axum::Extension;
2use axum::Router;
3use axum::body::Body;
4use axum::http::{Response, StatusCode};
5use axum::response::IntoResponse;
6use axum::routing::post;
7use micromegas_auth::types::AuthProvider;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_ingestion::web_ingestion_service::{IngestionServiceError, WebIngestionService};
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use thiserror::Error;
16
17#[derive(Error, Debug)]
18pub enum IngestionError {
19 #[error("Bad request: {0}")]
20 BadRequest(String),
21
22 #[error("Internal server error: {0}")]
23 Internal(String),
24}
25
26impl IntoResponse for IngestionError {
27 fn into_response(self) -> Response<Body> {
28 let (status, category, detail) = match self {
29 IngestionError::BadRequest(msg) => (StatusCode::BAD_REQUEST, "Bad request", msg),
30 IngestionError::Internal(msg) => (
31 StatusCode::INTERNAL_SERVER_ERROR,
32 "Internal server error",
33 msg,
34 ),
35 };
36 error!("{status}: {detail}");
37 (status, category).into_response()
38 }
39}
40
41impl From<IngestionServiceError> for IngestionError {
42 fn from(err: IngestionServiceError) -> Self {
43 match err {
44 IngestionServiceError::ParseError(msg) => IngestionError::BadRequest(msg),
45 IngestionServiceError::DatabaseError(msg) => IngestionError::Internal(msg),
46 IngestionServiceError::StorageError(msg) => IngestionError::Internal(msg),
47 }
48 }
49}
50
51pub async fn insert_process_request(
55 Extension(service): Extension<Arc<WebIngestionService>>,
56 body: bytes::Bytes,
57) -> Result<(), IngestionError> {
58 service.insert_process(body).await.map_err(Into::into)
59}
60
61pub async fn insert_stream_request(
65 Extension(service): Extension<Arc<WebIngestionService>>,
66 body: bytes::Bytes,
67) -> Result<(), IngestionError> {
68 service.insert_stream(body).await.map_err(Into::into)
69}
70
71pub async fn insert_block_request(
75 Extension(service): Extension<Arc<WebIngestionService>>,
76 body: bytes::Bytes,
77) -> Result<(), IngestionError> {
78 if body.is_empty() {
79 return Err(IngestionError::BadRequest("empty body".to_string()));
80 }
81 service.insert_block(body).await.map_err(Into::into)
82}
83
84pub fn register_routes(router: Router) -> Router {
89 router
90 .route("/ingestion/insert_process", post(insert_process_request))
91 .route("/ingestion/insert_stream", post(insert_stream_request))
92 .route("/ingestion/insert_block", post(insert_block_request))
93}
94
95pub async fn serve_ingestion(
101 listen_addr: SocketAddr,
102 lake: DataLakeConnection,
103 auth_provider: Option<Arc<dyn AuthProvider>>,
104 shutdown: impl Future<Output = ()> + Send + 'static,
105 grace: Duration,
106) -> anyhow::Result<()> {
107 use axum::extract::DefaultBodyLimit;
108 use axum::middleware;
109 use axum::routing::get;
110 use micromegas_auth::axum::auth_middleware;
111 use tower_http::limit::RequestBodyLimitLayer;
112
113 use super::axum_utils::observability_middleware;
114 use super::shutdown::serve_axum_with_graceful_shutdown;
115
116 let service = Arc::new(WebIngestionService::new(lake));
117
118 let health_router =
119 Router::new().route("/health", get(|| async { axum::http::StatusCode::OK }));
120
121 let mut protected_app = register_routes(Router::new())
122 .merge(super::otlp::otlp_router())
123 .layer(DefaultBodyLimit::disable())
124 .layer(RequestBodyLimitLayer::new(100 * 1024 * 1024))
125 .layer(Extension(service));
126
127 let auth_enabled = auth_provider.is_some();
128 if let Some(provider) = auth_provider {
129 info!("Ingestion: authentication enabled");
130 protected_app = protected_app.layer(middleware::from_fn(move |req, next| {
131 auth_middleware(provider.clone(), req, next)
132 }));
133 } else {
134 warn!("Ingestion: authentication disabled — development mode only");
135 }
136
137 let app = health_router
138 .merge(protected_app)
139 .layer(middleware::from_fn(observability_middleware));
140
141 let listener = tokio::net::TcpListener::bind(listen_addr)
142 .await
143 .map_err(|e| anyhow::anyhow!("ingestion: binding to {listen_addr}: {e}"))?;
144 info!("Ingestion serving on {listen_addr} authentication={auth_enabled}");
145
146 serve_axum_with_graceful_shutdown(
147 listener,
148 app.into_make_service_with_connect_info::<SocketAddr>(),
149 shutdown,
150 grace,
151 )
152 .await?;
153
154 Ok(())
155}