micromegas/servers/
otlp.rs1use axum::Extension;
18use axum::Router;
19use axum::body::Body;
20use axum::extract::DefaultBodyLimit;
21use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
22use axum::response::Response;
23use axum::routing::post;
24use micromegas_ingestion::web_ingestion_service::WebIngestionService;
25use micromegas_otel_ingestion::Encoding;
26use micromegas_otel_ingestion::error::OtelError;
27use micromegas_otel_ingestion::handler;
28use micromegas_tracing::prelude::*;
29use prost::Message;
30use std::sync::Arc;
31use tower_http::decompression::RequestDecompressionLayer;
32use tower_http::limit::RequestBodyLimitLayer;
33
34const OTLP_BODY_LIMIT_BYTES: usize = 20 * 1024 * 1024;
39
40const OTLP_DECOMPRESSED_BODY_LIMIT_BYTES: usize = 300 * 1024 * 1024;
46
47const RETRY_AFTER_SECONDS: u32 = 30;
50
51const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
52const CONTENT_TYPE_JSON: &str = "application/json";
53
54fn content_type_encoding(headers: &HeaderMap) -> Result<Encoding, OtlpHttpError> {
58 let Some(ct) = headers.get(header::CONTENT_TYPE) else {
59 return Err(OtlpHttpError::WrongContentType);
60 };
61 let Ok(ct) = ct.to_str() else {
62 return Err(OtlpHttpError::WrongContentType);
63 };
64 let media = ct
65 .split(';')
66 .next()
67 .unwrap_or("")
68 .trim()
69 .to_ascii_lowercase();
70 match media.as_str() {
71 CONTENT_TYPE_PROTOBUF => Ok(Encoding::Protobuf),
72 CONTENT_TYPE_JSON => Ok(Encoding::Json),
73 _ => Err(OtlpHttpError::WrongContentType),
74 }
75}
76
77enum OtlpHttpError {
81 WrongContentType,
82 Otel(OtelError),
83}
84
85impl OtlpHttpError {
86 fn into_otlp_response(self, encoding: Encoding) -> Response {
87 match self {
88 OtlpHttpError::WrongContentType => build_error_response(
89 StatusCode::UNSUPPORTED_MEDIA_TYPE,
90 3, "Content-Type must be application/x-protobuf or application/json",
92 false,
93 Encoding::Protobuf,
95 ),
96 OtlpHttpError::Otel(err) => {
97 let retryable = err.is_retryable();
98 let status = match err.http_status() {
99 400 => StatusCode::BAD_REQUEST,
100 503 => StatusCode::SERVICE_UNAVAILABLE,
101 other => {
102 StatusCode::from_u16(other).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
103 }
104 };
105 let code = err.grpc_code();
106 error!("OTLP error: {}", err);
110 build_error_response(status, code, &err.public_message(), retryable, encoding)
111 }
112 }
113 }
114}
115
116fn build_error_response(
117 status: StatusCode,
118 code: i32,
119 message: &str,
120 retryable: bool,
121 encoding: Encoding,
122) -> Response {
123 let proto_status = micromegas_otel_ingestion::proto::Status {
124 code,
125 message: message.to_string(),
126 };
127 let (body, content_type) = match encoding {
128 Encoding::Protobuf => (proto_status.encode_to_vec(), CONTENT_TYPE_PROTOBUF),
129 Encoding::Json => (
130 serde_json::to_vec(&proto_status).expect("serializing Status to JSON"),
131 CONTENT_TYPE_JSON,
132 ),
133 };
134 let mut response = Response::builder()
135 .status(status)
136 .header(header::CONTENT_TYPE, HeaderValue::from_static(content_type))
137 .body(Body::from(body))
138 .expect("building OTLP error response");
139 if retryable && let Ok(value) = HeaderValue::from_str(&RETRY_AFTER_SECONDS.to_string()) {
140 response.headers_mut().insert(header::RETRY_AFTER, value);
141 }
142 response
143}
144
145fn success_response<M: Message + serde::Serialize>(msg: M, encoding: Encoding) -> Response {
146 let (body, content_type) = match encoding {
147 Encoding::Protobuf => (msg.encode_to_vec(), CONTENT_TYPE_PROTOBUF),
148 Encoding::Json => (
149 serde_json::to_vec(&msg).expect("serializing OTLP response to JSON"),
150 CONTENT_TYPE_JSON,
151 ),
152 };
153 Response::builder()
154 .status(StatusCode::OK)
155 .header(header::CONTENT_TYPE, HeaderValue::from_static(content_type))
156 .body(Body::from(body))
157 .expect("building OTLP success response")
158}
159
160async fn logs_handler(
161 Extension(service): Extension<Arc<WebIngestionService>>,
162 headers: HeaderMap,
163 body: bytes::Bytes,
164) -> Response {
165 let encoding = match content_type_encoding(&headers) {
166 Ok(enc) => enc,
167 Err(e) => return e.into_otlp_response(Encoding::Protobuf),
168 };
169 match handler::ingest_logs(service, body, encoding).await {
170 Ok(resp) => success_response(resp, encoding),
171 Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
172 }
173}
174
175async fn metrics_handler(
176 Extension(service): Extension<Arc<WebIngestionService>>,
177 headers: HeaderMap,
178 body: bytes::Bytes,
179) -> Response {
180 let encoding = match content_type_encoding(&headers) {
181 Ok(enc) => enc,
182 Err(e) => return e.into_otlp_response(Encoding::Protobuf),
183 };
184 match handler::ingest_metrics(service, body, encoding).await {
185 Ok(resp) => success_response(resp, encoding),
186 Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
187 }
188}
189
190async fn traces_handler(
191 Extension(service): Extension<Arc<WebIngestionService>>,
192 headers: HeaderMap,
193 body: bytes::Bytes,
194) -> Response {
195 let encoding = match content_type_encoding(&headers) {
196 Ok(enc) => enc,
197 Err(e) => return e.into_otlp_response(Encoding::Protobuf),
198 };
199 match handler::ingest_traces(service, body, encoding).await {
200 Ok(resp) => success_response(resp, encoding),
201 Err(e) => OtlpHttpError::Otel(e).into_otlp_response(encoding),
202 }
203}
204
205pub fn otlp_router() -> Router {
217 Router::new()
218 .route("/ingestion/otlp/v1/logs", post(logs_handler))
219 .route("/ingestion/otlp/v1/metrics", post(metrics_handler))
220 .route("/ingestion/otlp/v1/traces", post(traces_handler))
221 .layer(RequestDecompressionLayer::new().gzip(true))
222 .layer(RequestBodyLimitLayer::new(OTLP_BODY_LIMIT_BYTES))
223 .layer(DefaultBodyLimit::max(OTLP_DECOMPRESSED_BODY_LIMIT_BYTES))
224}