micromegas/servers/
otlp.rs

1//! OTLP/HTTP route registration for `telemetry-ingestion-srv`.
2//!
3//! Exposes three routes that match the OTLP/HTTP spec:
4//!  - `POST /ingestion/otlp/v1/logs`
5//!  - `POST /ingestion/otlp/v1/metrics`
6//!  - `POST /ingestion/otlp/v1/traces`
7//!
8//! The OTLP sub-router applies its own 20 MiB body limit (matching the OTel Collector
9//! `confighttp.max_request_body_size` default) plus gzip request decompression,
10//! independent of the parent router's 100 MiB limit on `/ingestion/insert_block`.
11//!
12//! Per OTLP/HTTP spec, success responses mirror the request encoding (JSON in → JSON out,
13//! proto in → proto out). Error responses (4xx/5xx) carry a `google.rpc.Status` body
14//! encoded in the same way, except 415 responses which always use protobuf because the
15//! request encoding is unknown at that point.
16
17use axum::Extension;
18use axum::Router;
19use axum::body::Body;
20use axum::extract::DefaultBodyLimit;
21use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
22use axum::response::Response;
23use axum::routing::post;
24use micromegas_ingestion::web_ingestion_service::WebIngestionService;
25use micromegas_otel_ingestion::Encoding;
26use micromegas_otel_ingestion::error::OtelError;
27use micromegas_otel_ingestion::handler;
28use micromegas_tracing::prelude::*;
29use prost::Message;
30use std::sync::Arc;
31use tower_http::decompression::RequestDecompressionLayer;
32use tower_http::limit::RequestBodyLimitLayer;
33
34/// 20 MiB matches the OTel Collector `confighttp.max_request_body_size` default —
35/// anything an SDK is willing to send under the conventional Collector cap fits here too.
36/// Applies to compressed wire bytes (the `RequestBodyLimitLayer` runs outside the
37/// decompression layer).
38const OTLP_BODY_LIMIT_BYTES: usize = 20 * 1024 * 1024;
39
40/// Cap on the decompressed body size the handler will materialize. Without this,
41/// a malicious gzip payload up to `OTLP_BODY_LIMIT_BYTES` could expand at gzip's
42/// worst-case ratio (~1000×) and OOM the server. Sized at 15× the wire cap to
43/// cover legitimate protobuf compression (commonly observed up to 10×) with
44/// headroom, while still bounding the worst case to a survivable allocation.
45const OTLP_DECOMPRESSED_BODY_LIMIT_BYTES: usize = 300 * 1024 * 1024;
46
47/// `Retry-After` value (in seconds) on retryable 503 responses. Conservative default —
48/// tune based on observed recovery times.
49const RETRY_AFTER_SECONDS: u32 = 30;
50
51const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
52const CONTENT_TYPE_JSON: &str = "application/json";
53
54/// Examines the `Content-Type` header and maps it to an `Encoding`. The spec allows
55/// parameters (e.g. `application/json; charset=utf-8`), so we parse rather than
56/// string-compare. Returns `Err(OtlpHttpError::WrongContentType)` for unknown types.
57fn content_type_encoding(headers: &HeaderMap) -> Result<Encoding, OtlpHttpError> {
58    let Some(ct) = headers.get(header::CONTENT_TYPE) else {
59        return Err(OtlpHttpError::WrongContentType);
60    };
61    let Ok(ct) = ct.to_str() else {
62        return Err(OtlpHttpError::WrongContentType);
63    };
64    let media = ct
65        .split(';')
66        .next()
67        .unwrap_or("")
68        .trim()
69        .to_ascii_lowercase();
70    match media.as_str() {
71        CONTENT_TYPE_PROTOBUF => Ok(Encoding::Protobuf),
72        CONTENT_TYPE_JSON => Ok(Encoding::Json),
73        _ => Err(OtlpHttpError::WrongContentType),
74    }
75}
76
77/// Internal error type covering both pre-handler validation failures (415) and
78/// post-handler `OtelError`s. Each variant maps to a single HTTP response shape
79/// (status code, optional `Retry-After`, `google.rpc.Status` body).
80enum OtlpHttpError {
81    WrongContentType,
82    Otel(OtelError),
83}
84
85impl OtlpHttpError {
86    fn into_otlp_response(self, encoding: Encoding) -> Response {
87        match self {
88            OtlpHttpError::WrongContentType => build_error_response(
89                StatusCode::UNSUPPORTED_MEDIA_TYPE,
90                3, // INVALID_ARGUMENT
91                "Content-Type must be application/x-protobuf or application/json",
92                false,
93                // encoding is unknown for 415; always emit proto Status (OTLP/HTTP default)
94                Encoding::Protobuf,
95            ),
96            OtlpHttpError::Otel(err) => {
97                let retryable = err.is_retryable();
98                let status = match err.http_status() {
99                    400 => StatusCode::BAD_REQUEST,
100                    503 => StatusCode::SERVICE_UNAVAILABLE,
101                    other => {
102                        StatusCode::from_u16(other).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
103                    }
104                };
105                let code = err.grpc_code();
106                // Detailed error (includes raw sqlx / object-store messages) is
107                // logged server-side; only the sanitized public form goes to the
108                // client to avoid leaking backend internals.
109                error!("OTLP error: {}", err);
110                build_error_response(status, code, &err.public_message(), retryable, encoding)
111            }
112        }
113    }
114}
115
116fn build_error_response(
117    status: StatusCode,
118    code: i32,
119    message: &str,
120    retryable: bool,
121    encoding: Encoding,
122) -> Response {
123    let proto_status = micromegas_otel_ingestion::proto::Status {
124        code,
125        message: message.to_string(),
126    };
127    let (body, content_type) = match encoding {
128        Encoding::Protobuf => (proto_status.encode_to_vec(), CONTENT_TYPE_PROTOBUF),
129        Encoding::Json => (
130            serde_json::to_vec(&proto_status).expect("serializing Status to JSON"),
131            CONTENT_TYPE_JSON,
132        ),
133    };
134    let mut response = Response::builder()
135        .status(status)
136        .header(header::CONTENT_TYPE, HeaderValue::from_static(content_type))
137        .body(Body::from(body))
138        .expect("building OTLP error response");
139    if retryable && let Ok(value) = HeaderValue::from_str(&RETRY_AFTER_SECONDS.to_string()) {
140        response.headers_mut().insert(header::RETRY_AFTER, value);
141    }
142    response
143}
144
145fn success_response<M: Message + serde::Serialize>(msg: M, encoding: Encoding) -> Response {
146    let (body, content_type) = match encoding {
147        Encoding::Protobuf => (msg.encode_to_vec(), CONTENT_TYPE_PROTOBUF),
148        Encoding::Json => (
149            serde_json::to_vec(&msg).expect("serializing OTLP response to JSON"),
150            CONTENT_TYPE_JSON,
151        ),
152    };
153    Response::builder()
154        .status(StatusCode::OK)
155        .header(header::CONTENT_TYPE, HeaderValue::from_static(content_type))
156        .body(Body::from(body))
157        .expect("building OTLP success response")
158}
159
160async fn logs_handler(
161    Extension(service): Extension<Arc<WebIngestionService>>,
162    headers: HeaderMap,
163    body: bytes::Bytes,
164) -> Response {
165    let encoding = match content_type_encoding(&headers) {
166        Ok(enc) => enc,
167        Err(e) => return e.into_otlp_response(Encoding::Protobuf),
168    };
169    match handler::ingest_logs(service, body, encoding).await {
170        Ok(resp) => success_response(resp, encoding),
171        Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
172    }
173}
174
175async fn metrics_handler(
176    Extension(service): Extension<Arc<WebIngestionService>>,
177    headers: HeaderMap,
178    body: bytes::Bytes,
179) -> Response {
180    let encoding = match content_type_encoding(&headers) {
181        Ok(enc) => enc,
182        Err(e) => return e.into_otlp_response(Encoding::Protobuf),
183    };
184    match handler::ingest_metrics(service, body, encoding).await {
185        Ok(resp) => success_response(resp, encoding),
186        Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
187    }
188}
189
190async fn traces_handler(
191    Extension(service): Extension<Arc<WebIngestionService>>,
192    headers: HeaderMap,
193    body: bytes::Bytes,
194) -> Response {
195    let encoding = match content_type_encoding(&headers) {
196        Ok(enc) => enc,
197        Err(e) => return e.into_otlp_response(Encoding::Protobuf),
198    };
199    match handler::ingest_traces(service, body, encoding).await {
200        Ok(resp) => success_response(resp, encoding),
201        Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
202    }
203}
204
205/// Builds a sub-Router carrying the three OTLP routes plus the body-limit and
206/// gzip-decompression layers scoped to those routes.
207///
208/// Layer order, outermost → innermost (request travels through them top to bottom):
209///  1. `DefaultBodyLimit::max(300 MiB)` — caps the post-decompression bytes the
210///     handler's `Bytes` extractor will materialize, defending against gzip-bomb
211///     expansion that the wire-byte limit can't see.
212///  2. `RequestBodyLimitLayer(20 MiB)` — caps the *compressed* wire bytes;
213///     enforced before decompression, returning 413 on oversize.
214///  3. `RequestDecompressionLayer` — gzip-decodes the body before the handler.
215///  4. handler.
216pub fn otlp_router() -> Router {
217    Router::new()
218        .route("/ingestion/otlp/v1/logs", post(logs_handler))
219        .route("/ingestion/otlp/v1/metrics", post(metrics_handler))
220        .route("/ingestion/otlp/v1/traces", post(traces_handler))
221        .layer(RequestDecompressionLayer::new().gzip(true))
222        .layer(RequestBodyLimitLayer::new(OTLP_BODY_LIMIT_BYTES))
223        .layer(DefaultBodyLimit::max(OTLP_DECOMPRESSED_BODY_LIMIT_BYTES))
224}