micromegas_ingestion/
web_ingestion_service.rs

1use crate::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
2use crate::remote_data_lake::migrate_db;
3use anyhow::Context;
4use bytes::Buf;
5use micromegas_telemetry::block_wire_format;
6use micromegas_telemetry::property::make_properties;
7use micromegas_telemetry::stream_info::StreamInfo;
8use micromegas_telemetry::wire_format::encode_cbor;
9use micromegas_tracing::prelude::*;
10use micromegas_tracing::property_set;
11use std::sync::Arc;
12use thiserror::Error;
13
14/// Error type for ingestion service operations.
15/// Categorizes errors to enable proper HTTP status code mapping.
16#[derive(Error, Debug)]
17pub enum IngestionServiceError {
18    /// Client-side errors (malformed input) - maps to 400 Bad Request
19    #[error("Parse error: {0}")]
20    ParseError(String),
21
22    /// Database errors - maps to 500 Internal Server Error
23    #[error("Database error: {0}")]
24    DatabaseError(String),
25
26    /// Object storage errors - maps to 500 Internal Server Error
27    #[error("Storage error: {0}")]
28    StorageError(String),
29}
30
31#[derive(Clone)]
32pub struct WebIngestionService {
33    lake: DataLakeConnection,
34}
35
36impl WebIngestionService {
37    pub fn new(lake: DataLakeConnection) -> Self {
38        Self { lake }
39    }
40
41    /// Reads MICROMEGAS_SQL_CONNECTION_STRING and MICROMEGAS_OBJECT_STORE_URI,
42    /// connects to the data lake, runs ingestion migrations, and returns
43    /// a ready-to-use service.
44    pub async fn from_env() -> anyhow::Result<Arc<Self>> {
45        let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
46            .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
47        let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
48            .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
49        let lake = connect_to_data_lake(&connection_string, &object_store_uri).await?;
50        migrate_db(lake.db_pool.clone())
51            .await
52            .with_context(|| "migrate_db")?;
53        Ok(Arc::new(Self::new(lake)))
54    }
55
56    #[span_fn]
57    pub async fn insert_block(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
58        let block: block_wire_format::Block = ciborium::from_reader(body.reader())
59            .map_err(|e| IngestionServiceError::ParseError(format!("parsing block: {e}")))?;
60        let encoded_payload = encode_cbor(&block.payload)
61            .map_err(|e| IngestionServiceError::ParseError(format!("encoding payload: {e}")))?;
62        let payload_size = encoded_payload.len();
63
64        let process_id = &block.process_id;
65        let stream_id = &block.stream_id;
66        let block_id = &block.block_id;
67        let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
68        debug!("writing {obj_path}");
69
70        use sqlx::types::chrono::{DateTime, FixedOffset};
71        let begin_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.begin_time)
72            .map_err(|e| IngestionServiceError::ParseError(format!("parsing begin_time: {e}")))?;
73        let end_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.end_time)
74            .map_err(|e| IngestionServiceError::ParseError(format!("parsing end_time: {e}")))?;
75        {
76            let begin_put = now();
77            self.lake
78                .blob_storage
79                .put(&obj_path, encoded_payload.into())
80                .await
81                .map_err(|e| {
82                    IngestionServiceError::StorageError(format!(
83                        "writing block to blob storage: {e}"
84                    ))
85                })?;
86            imetric!("put_duration", "ticks", (now() - begin_put) as u64);
87        }
88
89        debug!("recording block_id={block_id} stream_id={stream_id} process_id={process_id}");
90        let begin_insert = now();
91        let insert_time = sqlx::types::chrono::Utc::now();
92        let result = sqlx::query(
93            "INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;",
94        )
95        .bind(block_id)
96        .bind(stream_id)
97        .bind(process_id)
98        .bind(begin_time)
99        .bind(block.begin_ticks)
100        .bind(end_time)
101        .bind(block.end_ticks)
102        .bind(block.nb_objects)
103        .bind(block.object_offset)
104        .bind(payload_size as i64)
105        .bind(insert_time)
106        .execute(&self.lake.db_pool)
107        .await
108        .map_err(|e| IngestionServiceError::DatabaseError(format!("inserting into blocks: {e}")))?;
109        imetric!("insert_duration", "ticks", (now() - begin_insert) as u64);
110
111        if result.rows_affected() == 0 {
112            debug!("duplicate block_id={block_id} skipped (already exists)");
113        }
114        // this measure does not benefit from a dynamic property - I just want to make sure the feature works well
115        // the cost in this context should be reasonnable
116        imetric!(
117            "payload_size_inserted",
118            "bytes",
119            property_set::PropertySet::find_or_create(vec![property_set::Property::new(
120                "target",
121                "micromegas::ingestion"
122            ),]),
123            payload_size as u64
124        );
125        debug!("recorded block_id={block_id} stream_id={stream_id} process_id={process_id}");
126
127        Ok(())
128    }
129
130    #[span_fn]
131    pub async fn insert_stream(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
132        let stream_info: StreamInfo = ciborium::from_reader(body.reader())
133            .map_err(|e| IngestionServiceError::ParseError(format!("parsing StreamInfo: {e}")))?;
134        info!(
135            "new stream {} {:?} {:?}",
136            stream_info.stream_id, &stream_info.tags, &stream_info.properties
137        );
138        let dependencies_metadata =
139            encode_cbor(&stream_info.dependencies_metadata).map_err(|e| {
140                IngestionServiceError::ParseError(format!("encoding dependencies_metadata: {e}"))
141            })?;
142        let objects_metadata = encode_cbor(&stream_info.objects_metadata).map_err(|e| {
143            IngestionServiceError::ParseError(format!("encoding objects_metadata: {e}"))
144        })?;
145        let result = sqlx::query(
146            "INSERT INTO streams VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (stream_id) DO NOTHING;",
147        )
148        .bind(stream_info.stream_id)
149        .bind(stream_info.process_id)
150        .bind(dependencies_metadata)
151        .bind(objects_metadata)
152        .bind(&stream_info.tags)
153        .bind(make_properties(&stream_info.properties))
154        .bind(sqlx::types::chrono::Utc::now())
155        .execute(&self.lake.db_pool)
156        .await
157        .map_err(|e| {
158            IngestionServiceError::DatabaseError(format!("inserting into streams: {e}"))
159        })?;
160
161        if result.rows_affected() == 0 {
162            debug!(
163                "duplicate stream_id={} skipped (already exists)",
164                stream_info.stream_id
165            );
166        }
167        Ok(())
168    }
169
170    #[span_fn]
171    pub async fn insert_process(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
172        let process_info: ProcessInfo = ciborium::from_reader(body.reader())
173            .map_err(|e| IngestionServiceError::ParseError(format!("parsing ProcessInfo: {e}")))?;
174
175        let insert_time = sqlx::types::chrono::Utc::now();
176        let result = sqlx::query(
177            "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
178        )
179        .bind(process_info.process_id)
180        .bind(process_info.exe)
181        .bind(process_info.username)
182        .bind(process_info.realname)
183        .bind(process_info.computer)
184        .bind(process_info.distro)
185        .bind(process_info.cpu_brand)
186        .bind(process_info.tsc_frequency)
187        .bind(process_info.start_time)
188        .bind(process_info.start_ticks)
189        .bind(insert_time)
190        .bind(process_info.parent_process_id)
191        .bind(make_properties(&process_info.properties))
192        .execute(&self.lake.db_pool)
193        .await
194        .map_err(|e| {
195            IngestionServiceError::DatabaseError(format!("inserting into processes: {e}"))
196        })?;
197
198        if result.rows_affected() == 0 {
199            debug!(
200                "duplicate process_id={} skipped (already exists)",
201                process_info.process_id
202            );
203        }
204        Ok(())
205    }
206}