micromegas_ingestion/
web_ingestion_service.rs

1use crate::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
2use crate::remote_data_lake::migrate_db;
3use anyhow::Context;
4use bytes::Buf;
5use micromegas_telemetry::block_wire_format;
6use micromegas_telemetry::property::Property;
7use micromegas_telemetry::property::make_properties;
8use micromegas_telemetry::stream_info::StreamInfo;
9use micromegas_telemetry::wire_format::encode_cbor;
10use micromegas_tracing::prelude::*;
11use micromegas_tracing::property_set;
12use std::sync::{Arc, LazyLock};
13use thiserror::Error;
14use uuid::Uuid;
15
16static EMPTY_TRANSIT_METADATA_CBOR_BYTES: LazyLock<Vec<u8>> = LazyLock::new(|| {
17    let mut buf = Vec::new();
18    ciborium::ser::into_writer(&Vec::<()>::new(), &mut buf)
19        .expect("encoding an empty Vec to CBOR is infallible");
20    buf
21});
22
23/// Sentinel for `dependencies_metadata` / `objects_metadata` on streams that
24/// don't use the transit/POD wire format (e.g. OTLP). Existing readers decode
25/// these BYTEA columns as `Vec<UserDefinedType>` and iterate; an empty Vec
26/// makes those loops no-ops without touching consumer code.
27pub fn empty_transit_metadata_cbor() -> &'static [u8] {
28    &EMPTY_TRANSIT_METADATA_CBOR_BYTES
29}
30
31/// Format string for native streams (transit-encoded payload, CBOR envelope).
32pub const FORMAT_TRANSIT: &str = "micromegas-transit";
33
34/// Stream `format` value for OTel logs (one `ResourceLogs` proto per block payload).
35pub const FORMAT_OTLP_LOGS: &str = "otlp/v1/logs";
36
37/// Stream `format` value for OTel metrics (one `ResourceMetrics` proto per block payload).
38pub const FORMAT_OTLP_METRICS: &str = "otlp/v1/metrics";
39
40/// Stream `format` value for OTel traces (one `ResourceSpans` proto per block payload).
41pub const FORMAT_OTLP_TRACES: &str = "otlp/v1/traces";
42
43/// Error type for ingestion service operations.
44/// Categorizes errors to enable proper HTTP status code mapping.
45#[derive(Error, Debug)]
46pub enum IngestionServiceError {
47    /// Client-side errors (malformed input) - maps to 400 Bad Request
48    #[error("Parse error: {0}")]
49    ParseError(String),
50
51    /// Database errors - maps to 500 Internal Server Error
52    #[error("Database error: {0}")]
53    DatabaseError(String),
54
55    /// Object storage errors - maps to 500 Internal Server Error
56    #[error("Storage error: {0}")]
57    StorageError(String),
58}
59
60#[derive(Clone)]
61pub struct WebIngestionService {
62    lake: DataLakeConnection,
63}
64
65impl WebIngestionService {
66    pub fn new(lake: DataLakeConnection) -> Self {
67        Self { lake }
68    }
69
70    /// Reads MICROMEGAS_SQL_CONNECTION_STRING and MICROMEGAS_OBJECT_STORE_URI,
71    /// connects to the data lake, runs ingestion migrations, and returns
72    /// a ready-to-use service.
73    pub async fn from_env() -> anyhow::Result<Arc<Self>> {
74        let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
75            .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
76        let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
77            .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
78        let lake = connect_to_data_lake(&connection_string, &object_store_uri).await?;
79        migrate_db(lake.db_pool.clone())
80            .await
81            .with_context(|| "migrate_db")?;
82        Ok(Arc::new(Self::new(lake)))
83    }
84
85    #[span_fn]
86    pub async fn insert_block(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
87        let block: block_wire_format::Block = ciborium::from_reader(body.reader())
88            .map_err(|e| IngestionServiceError::ParseError(format!("parsing block: {e}")))?;
89        self.insert_block_typed(block).await
90    }
91
92    /// Inserts a block whose payload is already typed (no envelope round-trip on the caller side).
93    ///
94    /// The caller hands us a fully-built `Block`; we CBOR-encode the payload envelope once,
95    /// write it to object storage, and INSERT the row. Used by the OTLP adapter where
96    /// constructing the CBOR `Block` envelope just so `insert_block` could decode it
97    /// would be wasted work.
98    #[span_fn]
99    pub async fn insert_block_typed(
100        &self,
101        block: block_wire_format::Block,
102    ) -> Result<(), IngestionServiceError> {
103        let encoded_payload = encode_cbor(&block.payload)
104            .map_err(|e| IngestionServiceError::ParseError(format!("encoding payload: {e}")))?;
105        let payload_size = encoded_payload.len();
106
107        let process_id = &block.process_id;
108        let stream_id = &block.stream_id;
109        let block_id = &block.block_id;
110        let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
111        debug!("writing {obj_path}");
112
113        use sqlx::types::chrono::{DateTime, FixedOffset};
114        let begin_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.begin_time)
115            .map_err(|e| IngestionServiceError::ParseError(format!("parsing begin_time: {e}")))?;
116        let end_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.end_time)
117            .map_err(|e| IngestionServiceError::ParseError(format!("parsing end_time: {e}")))?;
118        {
119            let begin_put = now();
120            self.lake
121                .blob_storage
122                .put(&obj_path, encoded_payload.into())
123                .await
124                .map_err(|e| {
125                    IngestionServiceError::StorageError(format!(
126                        "writing block to blob storage: {e}"
127                    ))
128                })?;
129            imetric!("put_duration", "ticks", (now() - begin_put) as u64);
130        }
131
132        debug!("recording block_id={block_id} stream_id={stream_id} process_id={process_id}");
133        let begin_insert = now();
134        let insert_time = sqlx::types::chrono::Utc::now();
135        let result = sqlx::query(
136            "INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;",
137        )
138        .bind(block_id)
139        .bind(stream_id)
140        .bind(process_id)
141        .bind(begin_time)
142        .bind(block.begin_ticks)
143        .bind(end_time)
144        .bind(block.end_ticks)
145        .bind(block.nb_objects)
146        .bind(block.object_offset)
147        .bind(payload_size as i64)
148        .bind(insert_time)
149        .execute(&self.lake.db_pool)
150        .await
151        .map_err(|e| IngestionServiceError::DatabaseError(format!("inserting into blocks: {e}")))?;
152        imetric!("insert_duration", "ticks", (now() - begin_insert) as u64);
153
154        if result.rows_affected() == 0 {
155            debug!("duplicate block_id={block_id} skipped (already exists)");
156        }
157        // this measure does not benefit from a dynamic property - I just want to make sure the feature works well
158        // the cost in this context should be reasonnable
159        imetric!(
160            "payload_size_inserted",
161            "bytes",
162            property_set::PropertySet::find_or_create(vec![property_set::Property::new(
163                "target",
164                "micromegas::ingestion"
165            ),]),
166            payload_size as u64
167        );
168        debug!("recorded block_id={block_id} stream_id={stream_id} process_id={process_id}");
169
170        Ok(())
171    }
172
173    /// Registers a stream whose blocks will be ingested in the transit format.
174    #[span_fn]
175    pub async fn insert_stream(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
176        let stream_info: StreamInfo = ciborium::from_reader(body.reader())
177            .map_err(|e| IngestionServiceError::ParseError(format!("parsing StreamInfo: {e}")))?;
178        info!(
179            "new stream {} {:?} {:?}",
180            stream_info.stream_id, &stream_info.tags, &stream_info.properties
181        );
182        let dependencies_metadata =
183            encode_cbor(&stream_info.dependencies_metadata).map_err(|e| {
184                IngestionServiceError::ParseError(format!("encoding dependencies_metadata: {e}"))
185            })?;
186        let objects_metadata = encode_cbor(&stream_info.objects_metadata).map_err(|e| {
187            IngestionServiceError::ParseError(format!("encoding objects_metadata: {e}"))
188        })?;
189        let result = sqlx::query(
190            "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
191             VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
192             ON CONFLICT (stream_id) DO NOTHING;",
193        )
194        .bind(stream_info.stream_id)
195        .bind(stream_info.process_id)
196        .bind(dependencies_metadata)
197        .bind(objects_metadata)
198        .bind(&stream_info.tags)
199        .bind(make_properties(&stream_info.properties))
200        .bind(sqlx::types::chrono::Utc::now())
201        .bind(FORMAT_TRANSIT)
202        .execute(&self.lake.db_pool)
203        .await
204        .map_err(|e| {
205            IngestionServiceError::DatabaseError(format!("inserting into streams: {e}"))
206        })?;
207
208        if result.rows_affected() == 0 {
209            debug!(
210                "duplicate stream_id={} skipped (already exists)",
211                stream_info.stream_id
212            );
213        }
214        Ok(())
215    }
216
217    /// Registers a stream produced by an OTLP ingestion path.
218    ///
219    /// `dependencies_metadata` and `objects_metadata` are filled with the CBOR sentinel
220    /// for an empty `Vec<UserDefinedType>` so legacy decode sites continue to work.
221    /// `format` distinguishes per-block dispatch downstream (e.g. `"otlp/v1/logs"`).
222    /// Stream `properties` are always empty for OTel — scope and per-event attrs
223    /// live on individual rows during materialization, not on the stream.
224    ///
225    /// Hack: piggybacking OTLP onto the transit-shaped `streams` row (with empty
226    /// metadata sentinels) is expedient for two formats but won't scale. To support
227    /// more formats cleanly, `dependencies_metadata`, `objects_metadata`, and `format`
228    /// should be merged into a single per-format payload column.
229    #[span_fn]
230    pub async fn register_otel_stream(
231        &self,
232        stream_id: Uuid,
233        process_id: Uuid,
234        tags: Vec<String>,
235        format: &str,
236    ) -> Result<(), IngestionServiceError> {
237        let result = sqlx::query(
238            "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
239             VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
240             ON CONFLICT (stream_id) DO NOTHING;",
241        )
242        .bind(stream_id)
243        .bind(process_id)
244        .bind(empty_transit_metadata_cbor())
245        .bind(empty_transit_metadata_cbor())
246        .bind(tags)
247        .bind(Vec::<Property>::new())
248        .bind(sqlx::types::chrono::Utc::now())
249        .bind(format)
250        .execute(&self.lake.db_pool)
251        .await
252        .map_err(|e| {
253            IngestionServiceError::DatabaseError(format!("inserting otel stream: {e}"))
254        })?;
255
256        if result.rows_affected() == 0 {
257            debug!("duplicate otel stream_id={stream_id} skipped (already exists)");
258        }
259        Ok(())
260    }
261
262    #[span_fn]
263    pub async fn insert_process(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
264        let process_info: ProcessInfo = ciborium::from_reader(body.reader())
265            .map_err(|e| IngestionServiceError::ParseError(format!("parsing ProcessInfo: {e}")))?;
266
267        let insert_time = sqlx::types::chrono::Utc::now();
268        let result = sqlx::query(
269            "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
270        )
271        .bind(process_info.process_id)
272        .bind(process_info.exe)
273        .bind(process_info.username)
274        .bind(process_info.realname)
275        .bind(process_info.computer)
276        .bind(process_info.distro)
277        .bind(process_info.cpu_brand)
278        .bind(process_info.tsc_frequency)
279        .bind(process_info.start_time)
280        .bind(process_info.start_ticks)
281        .bind(insert_time)
282        .bind(process_info.parent_process_id)
283        .bind(make_properties(&process_info.properties))
284        .execute(&self.lake.db_pool)
285        .await
286        .map_err(|e| {
287            IngestionServiceError::DatabaseError(format!("inserting into processes: {e}"))
288        })?;
289
290        if result.rows_affected() == 0 {
291            debug!(
292                "duplicate process_id={} skipped (already exists)",
293                process_info.process_id
294            );
295        }
296        Ok(())
297    }
298
299    /// Registers a process originating from OTLP. Idempotent via `ON CONFLICT DO NOTHING`.
300    ///
301    /// `realname` is set equal to `username` (OTel has no separate "real name" concept).
302    /// `parent_process_id` is always NULL — OTel has no parent-process model.
303    /// `insert_time` is the server wall clock, matching the existing `insert_process` path.
304    #[span_fn]
305    #[expect(clippy::too_many_arguments, reason = "OTel process identity fields")]
306    pub async fn register_otel_process(
307        &self,
308        process_id: Uuid,
309        exe: String,
310        username: String,
311        computer: String,
312        distro: String,
313        cpu_brand: String,
314        tsc_frequency: i64,
315        start_time: sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>,
316        start_ticks: i64,
317        properties: Vec<Property>,
318    ) -> Result<(), IngestionServiceError> {
319        let insert_time = sqlx::types::chrono::Utc::now();
320        let result = sqlx::query(
321            "INSERT INTO processes
322             (process_id, exe, username, realname, computer, distro, cpu_brand,
323              tsc_frequency, start_time, start_ticks, insert_time, parent_process_id, properties)
324             VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NULL,$12)
325             ON CONFLICT (process_id) DO NOTHING;",
326        )
327        .bind(process_id)
328        .bind(exe)
329        .bind(&username)
330        .bind(&username)
331        .bind(computer)
332        .bind(distro)
333        .bind(cpu_brand)
334        .bind(tsc_frequency)
335        .bind(start_time)
336        .bind(start_ticks)
337        .bind(insert_time)
338        .bind(properties)
339        .execute(&self.lake.db_pool)
340        .await
341        .map_err(|e| {
342            IngestionServiceError::DatabaseError(format!("inserting otel process: {e}"))
343        })?;
344
345        if result.rows_affected() == 0 {
346            debug!("duplicate otel process_id={process_id} skipped (already exists)");
347        }
348        Ok(())
349    }
350}