1use crate::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
2use crate::remote_data_lake::migrate_db;
3use anyhow::Context;
4use bytes::Buf;
5use micromegas_telemetry::block_wire_format;
6use micromegas_telemetry::property::Property;
7use micromegas_telemetry::property::make_properties;
8use micromegas_telemetry::stream_info::StreamInfo;
9use micromegas_telemetry::wire_format::encode_cbor;
10use micromegas_tracing::prelude::*;
11use micromegas_tracing::property_set;
12use std::sync::{Arc, LazyLock};
13use thiserror::Error;
14use uuid::Uuid;
15
16static EMPTY_TRANSIT_METADATA_CBOR_BYTES: LazyLock<Vec<u8>> = LazyLock::new(|| {
17 let mut buf = Vec::new();
18 ciborium::ser::into_writer(&Vec::<()>::new(), &mut buf)
19 .expect("encoding an empty Vec to CBOR is infallible");
20 buf
21});
22
23pub fn empty_transit_metadata_cbor() -> &'static [u8] {
28 &EMPTY_TRANSIT_METADATA_CBOR_BYTES
29}
30
31pub const FORMAT_TRANSIT: &str = "micromegas-transit";
33
34pub const FORMAT_OTLP_LOGS: &str = "otlp/v1/logs";
36
37pub const FORMAT_OTLP_METRICS: &str = "otlp/v1/metrics";
39
40pub const FORMAT_OTLP_TRACES: &str = "otlp/v1/traces";
42
43#[derive(Error, Debug)]
46pub enum IngestionServiceError {
47 #[error("Parse error: {0}")]
49 ParseError(String),
50
51 #[error("Database error: {0}")]
53 DatabaseError(String),
54
55 #[error("Storage error: {0}")]
57 StorageError(String),
58}
59
60#[derive(Clone)]
61pub struct WebIngestionService {
62 lake: DataLakeConnection,
63}
64
65impl WebIngestionService {
66 pub fn new(lake: DataLakeConnection) -> Self {
67 Self { lake }
68 }
69
70 pub async fn from_env() -> anyhow::Result<Arc<Self>> {
74 let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
75 .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
76 let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
77 .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
78 let lake = connect_to_data_lake(&connection_string, &object_store_uri).await?;
79 migrate_db(lake.db_pool.clone())
80 .await
81 .with_context(|| "migrate_db")?;
82 Ok(Arc::new(Self::new(lake)))
83 }
84
85 #[span_fn]
86 pub async fn insert_block(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
87 let block: block_wire_format::Block = ciborium::from_reader(body.reader())
88 .map_err(|e| IngestionServiceError::ParseError(format!("parsing block: {e}")))?;
89 self.insert_block_typed(block).await
90 }
91
92 #[span_fn]
99 pub async fn insert_block_typed(
100 &self,
101 block: block_wire_format::Block,
102 ) -> Result<(), IngestionServiceError> {
103 let encoded_payload = encode_cbor(&block.payload)
104 .map_err(|e| IngestionServiceError::ParseError(format!("encoding payload: {e}")))?;
105 let payload_size = encoded_payload.len();
106
107 let process_id = &block.process_id;
108 let stream_id = &block.stream_id;
109 let block_id = &block.block_id;
110 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
111 debug!("writing {obj_path}");
112
113 use sqlx::types::chrono::{DateTime, FixedOffset};
114 let begin_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.begin_time)
115 .map_err(|e| IngestionServiceError::ParseError(format!("parsing begin_time: {e}")))?;
116 let end_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.end_time)
117 .map_err(|e| IngestionServiceError::ParseError(format!("parsing end_time: {e}")))?;
118 {
119 let begin_put = now();
120 self.lake
121 .blob_storage
122 .put(&obj_path, encoded_payload.into())
123 .await
124 .map_err(|e| {
125 IngestionServiceError::StorageError(format!(
126 "writing block to blob storage: {e}"
127 ))
128 })?;
129 imetric!("put_duration", "ticks", (now() - begin_put) as u64);
130 }
131
132 debug!("recording block_id={block_id} stream_id={stream_id} process_id={process_id}");
133 let begin_insert = now();
134 let insert_time = sqlx::types::chrono::Utc::now();
135 let result = sqlx::query(
136 "INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;",
137 )
138 .bind(block_id)
139 .bind(stream_id)
140 .bind(process_id)
141 .bind(begin_time)
142 .bind(block.begin_ticks)
143 .bind(end_time)
144 .bind(block.end_ticks)
145 .bind(block.nb_objects)
146 .bind(block.object_offset)
147 .bind(payload_size as i64)
148 .bind(insert_time)
149 .execute(&self.lake.db_pool)
150 .await
151 .map_err(|e| IngestionServiceError::DatabaseError(format!("inserting into blocks: {e}")))?;
152 imetric!("insert_duration", "ticks", (now() - begin_insert) as u64);
153
154 if result.rows_affected() == 0 {
155 debug!("duplicate block_id={block_id} skipped (already exists)");
156 }
157 imetric!(
160 "payload_size_inserted",
161 "bytes",
162 property_set::PropertySet::find_or_create(vec![property_set::Property::new(
163 "target",
164 "micromegas::ingestion"
165 ),]),
166 payload_size as u64
167 );
168 debug!("recorded block_id={block_id} stream_id={stream_id} process_id={process_id}");
169
170 Ok(())
171 }
172
173 #[span_fn]
175 pub async fn insert_stream(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
176 let stream_info: StreamInfo = ciborium::from_reader(body.reader())
177 .map_err(|e| IngestionServiceError::ParseError(format!("parsing StreamInfo: {e}")))?;
178 info!(
179 "new stream {} {:?} {:?}",
180 stream_info.stream_id, &stream_info.tags, &stream_info.properties
181 );
182 let dependencies_metadata =
183 encode_cbor(&stream_info.dependencies_metadata).map_err(|e| {
184 IngestionServiceError::ParseError(format!("encoding dependencies_metadata: {e}"))
185 })?;
186 let objects_metadata = encode_cbor(&stream_info.objects_metadata).map_err(|e| {
187 IngestionServiceError::ParseError(format!("encoding objects_metadata: {e}"))
188 })?;
189 let result = sqlx::query(
190 "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
191 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
192 ON CONFLICT (stream_id) DO NOTHING;",
193 )
194 .bind(stream_info.stream_id)
195 .bind(stream_info.process_id)
196 .bind(dependencies_metadata)
197 .bind(objects_metadata)
198 .bind(&stream_info.tags)
199 .bind(make_properties(&stream_info.properties))
200 .bind(sqlx::types::chrono::Utc::now())
201 .bind(FORMAT_TRANSIT)
202 .execute(&self.lake.db_pool)
203 .await
204 .map_err(|e| {
205 IngestionServiceError::DatabaseError(format!("inserting into streams: {e}"))
206 })?;
207
208 if result.rows_affected() == 0 {
209 debug!(
210 "duplicate stream_id={} skipped (already exists)",
211 stream_info.stream_id
212 );
213 }
214 Ok(())
215 }
216
217 #[span_fn]
230 pub async fn register_otel_stream(
231 &self,
232 stream_id: Uuid,
233 process_id: Uuid,
234 tags: Vec<String>,
235 format: &str,
236 ) -> Result<(), IngestionServiceError> {
237 let result = sqlx::query(
238 "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
239 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
240 ON CONFLICT (stream_id) DO NOTHING;",
241 )
242 .bind(stream_id)
243 .bind(process_id)
244 .bind(empty_transit_metadata_cbor())
245 .bind(empty_transit_metadata_cbor())
246 .bind(tags)
247 .bind(Vec::<Property>::new())
248 .bind(sqlx::types::chrono::Utc::now())
249 .bind(format)
250 .execute(&self.lake.db_pool)
251 .await
252 .map_err(|e| {
253 IngestionServiceError::DatabaseError(format!("inserting otel stream: {e}"))
254 })?;
255
256 if result.rows_affected() == 0 {
257 debug!("duplicate otel stream_id={stream_id} skipped (already exists)");
258 }
259 Ok(())
260 }
261
262 #[span_fn]
263 pub async fn insert_process(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
264 let process_info: ProcessInfo = ciborium::from_reader(body.reader())
265 .map_err(|e| IngestionServiceError::ParseError(format!("parsing ProcessInfo: {e}")))?;
266
267 let insert_time = sqlx::types::chrono::Utc::now();
268 let result = sqlx::query(
269 "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
270 )
271 .bind(process_info.process_id)
272 .bind(process_info.exe)
273 .bind(process_info.username)
274 .bind(process_info.realname)
275 .bind(process_info.computer)
276 .bind(process_info.distro)
277 .bind(process_info.cpu_brand)
278 .bind(process_info.tsc_frequency)
279 .bind(process_info.start_time)
280 .bind(process_info.start_ticks)
281 .bind(insert_time)
282 .bind(process_info.parent_process_id)
283 .bind(make_properties(&process_info.properties))
284 .execute(&self.lake.db_pool)
285 .await
286 .map_err(|e| {
287 IngestionServiceError::DatabaseError(format!("inserting into processes: {e}"))
288 })?;
289
290 if result.rows_affected() == 0 {
291 debug!(
292 "duplicate process_id={} skipped (already exists)",
293 process_info.process_id
294 );
295 }
296 Ok(())
297 }
298
299 #[span_fn]
305 #[expect(clippy::too_many_arguments, reason = "OTel process identity fields")]
306 pub async fn register_otel_process(
307 &self,
308 process_id: Uuid,
309 exe: String,
310 username: String,
311 computer: String,
312 distro: String,
313 cpu_brand: String,
314 tsc_frequency: i64,
315 start_time: sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>,
316 start_ticks: i64,
317 properties: Vec<Property>,
318 ) -> Result<(), IngestionServiceError> {
319 let insert_time = sqlx::types::chrono::Utc::now();
320 let result = sqlx::query(
321 "INSERT INTO processes
322 (process_id, exe, username, realname, computer, distro, cpu_brand,
323 tsc_frequency, start_time, start_ticks, insert_time, parent_process_id, properties)
324 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NULL,$12)
325 ON CONFLICT (process_id) DO NOTHING;",
326 )
327 .bind(process_id)
328 .bind(exe)
329 .bind(&username)
330 .bind(&username)
331 .bind(computer)
332 .bind(distro)
333 .bind(cpu_brand)
334 .bind(tsc_frequency)
335 .bind(start_time)
336 .bind(start_ticks)
337 .bind(insert_time)
338 .bind(properties)
339 .execute(&self.lake.db_pool)
340 .await
341 .map_err(|e| {
342 IngestionServiceError::DatabaseError(format!("inserting otel process: {e}"))
343 })?;
344
345 if result.rows_affected() == 0 {
346 debug!("duplicate otel process_id={process_id} skipped (already exists)");
347 }
348 Ok(())
349 }
350}