micromegas_ingestion/
web_ingestion_service.rs1use crate::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
2use crate::remote_data_lake::migrate_db;
3use anyhow::Context;
4use bytes::Buf;
5use micromegas_telemetry::block_wire_format;
6use micromegas_telemetry::property::make_properties;
7use micromegas_telemetry::stream_info::StreamInfo;
8use micromegas_telemetry::wire_format::encode_cbor;
9use micromegas_tracing::prelude::*;
10use micromegas_tracing::property_set;
11use std::sync::Arc;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
17pub enum IngestionServiceError {
18 #[error("Parse error: {0}")]
20 ParseError(String),
21
22 #[error("Database error: {0}")]
24 DatabaseError(String),
25
26 #[error("Storage error: {0}")]
28 StorageError(String),
29}
30
31#[derive(Clone)]
32pub struct WebIngestionService {
33 lake: DataLakeConnection,
34}
35
36impl WebIngestionService {
37 pub fn new(lake: DataLakeConnection) -> Self {
38 Self { lake }
39 }
40
41 pub async fn from_env() -> anyhow::Result<Arc<Self>> {
45 let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
46 .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
47 let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
48 .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
49 let lake = connect_to_data_lake(&connection_string, &object_store_uri).await?;
50 migrate_db(lake.db_pool.clone())
51 .await
52 .with_context(|| "migrate_db")?;
53 Ok(Arc::new(Self::new(lake)))
54 }
55
56 #[span_fn]
57 pub async fn insert_block(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
58 let block: block_wire_format::Block = ciborium::from_reader(body.reader())
59 .map_err(|e| IngestionServiceError::ParseError(format!("parsing block: {e}")))?;
60 let encoded_payload = encode_cbor(&block.payload)
61 .map_err(|e| IngestionServiceError::ParseError(format!("encoding payload: {e}")))?;
62 let payload_size = encoded_payload.len();
63
64 let process_id = &block.process_id;
65 let stream_id = &block.stream_id;
66 let block_id = &block.block_id;
67 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
68 debug!("writing {obj_path}");
69
70 use sqlx::types::chrono::{DateTime, FixedOffset};
71 let begin_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.begin_time)
72 .map_err(|e| IngestionServiceError::ParseError(format!("parsing begin_time: {e}")))?;
73 let end_time = DateTime::<FixedOffset>::parse_from_rfc3339(&block.end_time)
74 .map_err(|e| IngestionServiceError::ParseError(format!("parsing end_time: {e}")))?;
75 {
76 let begin_put = now();
77 self.lake
78 .blob_storage
79 .put(&obj_path, encoded_payload.into())
80 .await
81 .map_err(|e| {
82 IngestionServiceError::StorageError(format!(
83 "writing block to blob storage: {e}"
84 ))
85 })?;
86 imetric!("put_duration", "ticks", (now() - begin_put) as u64);
87 }
88
89 debug!("recording block_id={block_id} stream_id={stream_id} process_id={process_id}");
90 let begin_insert = now();
91 let insert_time = sqlx::types::chrono::Utc::now();
92 let result = sqlx::query(
93 "INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;",
94 )
95 .bind(block_id)
96 .bind(stream_id)
97 .bind(process_id)
98 .bind(begin_time)
99 .bind(block.begin_ticks)
100 .bind(end_time)
101 .bind(block.end_ticks)
102 .bind(block.nb_objects)
103 .bind(block.object_offset)
104 .bind(payload_size as i64)
105 .bind(insert_time)
106 .execute(&self.lake.db_pool)
107 .await
108 .map_err(|e| IngestionServiceError::DatabaseError(format!("inserting into blocks: {e}")))?;
109 imetric!("insert_duration", "ticks", (now() - begin_insert) as u64);
110
111 if result.rows_affected() == 0 {
112 debug!("duplicate block_id={block_id} skipped (already exists)");
113 }
114 imetric!(
117 "payload_size_inserted",
118 "bytes",
119 property_set::PropertySet::find_or_create(vec![property_set::Property::new(
120 "target",
121 "micromegas::ingestion"
122 ),]),
123 payload_size as u64
124 );
125 debug!("recorded block_id={block_id} stream_id={stream_id} process_id={process_id}");
126
127 Ok(())
128 }
129
130 #[span_fn]
131 pub async fn insert_stream(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
132 let stream_info: StreamInfo = ciborium::from_reader(body.reader())
133 .map_err(|e| IngestionServiceError::ParseError(format!("parsing StreamInfo: {e}")))?;
134 info!(
135 "new stream {} {:?} {:?}",
136 stream_info.stream_id, &stream_info.tags, &stream_info.properties
137 );
138 let dependencies_metadata =
139 encode_cbor(&stream_info.dependencies_metadata).map_err(|e| {
140 IngestionServiceError::ParseError(format!("encoding dependencies_metadata: {e}"))
141 })?;
142 let objects_metadata = encode_cbor(&stream_info.objects_metadata).map_err(|e| {
143 IngestionServiceError::ParseError(format!("encoding objects_metadata: {e}"))
144 })?;
145 let result = sqlx::query(
146 "INSERT INTO streams VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (stream_id) DO NOTHING;",
147 )
148 .bind(stream_info.stream_id)
149 .bind(stream_info.process_id)
150 .bind(dependencies_metadata)
151 .bind(objects_metadata)
152 .bind(&stream_info.tags)
153 .bind(make_properties(&stream_info.properties))
154 .bind(sqlx::types::chrono::Utc::now())
155 .execute(&self.lake.db_pool)
156 .await
157 .map_err(|e| {
158 IngestionServiceError::DatabaseError(format!("inserting into streams: {e}"))
159 })?;
160
161 if result.rows_affected() == 0 {
162 debug!(
163 "duplicate stream_id={} skipped (already exists)",
164 stream_info.stream_id
165 );
166 }
167 Ok(())
168 }
169
170 #[span_fn]
171 pub async fn insert_process(&self, body: bytes::Bytes) -> Result<(), IngestionServiceError> {
172 let process_info: ProcessInfo = ciborium::from_reader(body.reader())
173 .map_err(|e| IngestionServiceError::ParseError(format!("parsing ProcessInfo: {e}")))?;
174
175 let insert_time = sqlx::types::chrono::Utc::now();
176 let result = sqlx::query(
177 "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
178 )
179 .bind(process_info.process_id)
180 .bind(process_info.exe)
181 .bind(process_info.username)
182 .bind(process_info.realname)
183 .bind(process_info.computer)
184 .bind(process_info.distro)
185 .bind(process_info.cpu_brand)
186 .bind(process_info.tsc_frequency)
187 .bind(process_info.start_time)
188 .bind(process_info.start_ticks)
189 .bind(insert_time)
190 .bind(process_info.parent_process_id)
191 .bind(make_properties(&process_info.properties))
192 .execute(&self.lake.db_pool)
193 .await
194 .map_err(|e| {
195 IngestionServiceError::DatabaseError(format!("inserting into processes: {e}"))
196 })?;
197
198 if result.rows_affected() == 0 {
199 debug!(
200 "duplicate process_id={} skipped (already exists)",
201 process_info.process_id
202 );
203 }
204 Ok(())
205 }
206}