1use micromegas_telemetry::stream_info::StreamInfo;
2use micromegas_telemetry::wire_format::encode_cbor;
3use micromegas_tracing::{
4 event::EventSink,
5 flush_monitor::FlushMonitor,
6 logs::{LogBlock, LogMetadata, LogStream},
7 metrics::{MetricsBlock, MetricsStream},
8 prelude::*,
9 property_set::Property,
10 spans::{ThreadBlock, ThreadStream},
11};
12use std::{
13 cmp::max,
14 fmt,
15 sync::{Arc, Mutex},
16};
17use std::{
18 sync::atomic::{AtomicIsize, Ordering},
19 time::Duration,
20};
21use tokio_retry2::RetryError;
22
23use crate::request_decorator::RequestDecorator;
24use crate::stream_block::StreamBlock;
25use crate::stream_info::make_stream_info;
26
27#[derive(Clone, Debug)]
34enum IngestionClientError {
35 Transient(String),
37 Permanent(String),
39}
40
41impl std::fmt::Display for IngestionClientError {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 IngestionClientError::Transient(msg) => write!(f, "transient error: {msg}"),
45 IngestionClientError::Permanent(msg) => write!(f, "permanent error: {msg}"),
46 }
47 }
48}
49
50impl std::error::Error for IngestionClientError {}
51
52impl IngestionClientError {
53 fn into_retry(self) -> RetryError<Self> {
54 match self {
55 IngestionClientError::Transient(_) => RetryError::transient(self),
56 IngestionClientError::Permanent(_) => RetryError::permanent(self),
57 }
58 }
59}
60
61#[derive(Debug)]
62enum SinkEvent {
63 Startup(Arc<ProcessInfo>),
64 InitStream(Arc<StreamInfo>),
65 ProcessLogBlock(Arc<LogBlock>),
66 ProcessMetricsBlock(Arc<MetricsBlock>),
67 ProcessThreadBlock(Arc<ThreadBlock>),
68 Shutdown,
69}
70
71pub struct HttpEventSink {
72 thread: Option<std::thread::JoinHandle<()>>,
73 sender: Mutex<Option<std::sync::mpsc::Sender<SinkEvent>>>,
75 queue_size: Arc<AtomicIsize>,
76 shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
77}
78
79impl Drop for HttpEventSink {
80 fn drop(&mut self) {
81 {
83 let sender_guard = self.sender.lock().unwrap();
84 if let Some(sender) = sender_guard.as_ref() {
85 let _ = sender.send(SinkEvent::Shutdown);
86 }
87 }
88
89 if let Some(handle) = self.thread.take()
91 && let Err(e) = handle.join()
92 {
93 eprintln!("Warning: telemetry thread join failed: {:?}", e);
95 }
96 }
97}
98
99impl HttpEventSink {
100 pub fn new(
113 addr_server: &str,
114 max_queue_size: isize,
115 metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
116 blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
117 make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
118 ) -> Self {
119 let addr = addr_server.to_owned();
120 let (sender, receiver) = std::sync::mpsc::channel::<SinkEvent>();
121 let queue_size = Arc::new(AtomicIsize::new(0));
122 let thread_queue_size = queue_size.clone();
123 let shutdown_complete = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
124 let thread_shutdown_complete = shutdown_complete.clone();
125 Self {
126 thread: Some(std::thread::spawn(move || {
127 Self::thread_proc(
128 addr,
129 receiver,
130 thread_queue_size,
131 max_queue_size,
132 metadata_retry,
133 blocks_retry,
134 make_decorator,
135 thread_shutdown_complete,
136 );
137 })),
138 sender: Mutex::new(Some(sender)),
139 queue_size,
140 shutdown_complete,
141 }
142 }
143
144 fn send(&self, event: SinkEvent) {
145 let guard = self.sender.lock().unwrap();
146 if let Some(sender) = guard.as_ref() {
147 self.queue_size.fetch_add(1, Ordering::Relaxed);
148 if let Err(e) = sender.send(event) {
149 self.queue_size.fetch_sub(1, Ordering::Relaxed);
150 error!("{}", e);
151 }
152 }
153 }
154
155 #[span_fn]
156 async fn push_process(
157 client: &mut reqwest::Client,
158 root_path: &str,
159 process_info: Arc<ProcessInfo>,
160 retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
161 decorator: &dyn RequestDecorator,
162 ) -> Result<(), IngestionClientError> {
163 debug!("sending process {process_info:?}");
164 let url = format!("{root_path}/ingestion/insert_process");
165 let body: bytes::Bytes = encode_cbor(&*process_info)
166 .map_err(|e| IngestionClientError::Permanent(format!("encoding process: {e}")))?
167 .into();
168 tokio_retry2::Retry::spawn(retry_strategy, || async {
169 let mut request = client.post(&url).body(body.clone()).build().map_err(|e| {
170 IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
171 })?;
172
173 if let Err(e) = decorator.decorate(&mut request).await {
174 debug!("request decorator: {e:?}");
175 return Err(
176 IngestionClientError::Transient(format!("decorating request: {e}"))
177 .into_retry(),
178 );
179 }
180
181 let response = client.execute(request).await.map_err(|e| {
182 IngestionClientError::Transient(format!("network error: {e}")).into_retry()
183 })?;
184
185 let status = response.status();
186 match status.as_u16() {
187 200..=299 => Ok(()),
188 400..=499 => {
189 let body = response.text().await.unwrap_or_default();
190 warn!("insert_process client error ({status}): {body}");
191 Err(IngestionClientError::Permanent(body).into_retry())
192 }
193 500..=599 => {
194 let body = response.text().await.unwrap_or_default();
195 debug!("insert_process server error ({status}): {body}");
196 Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
197 }
198 _ => {
199 let body = response.text().await.unwrap_or_default();
200 warn!("insert_process unexpected status ({status}): {body}");
201 Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
202 }
203 }
204 })
205 .await
206 }
207
208 #[span_fn]
209 async fn push_stream(
210 client: &mut reqwest::Client,
211 root_path: &str,
212 stream_info: Arc<StreamInfo>,
213 retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
214 decorator: &dyn RequestDecorator,
215 ) -> Result<(), IngestionClientError> {
216 let url = format!("{root_path}/ingestion/insert_stream");
217 let body: bytes::Bytes = encode_cbor(&*stream_info)
218 .map_err(|e| IngestionClientError::Permanent(format!("encoding stream: {e}")))?
219 .into();
220 tokio_retry2::Retry::spawn(retry_strategy, || async {
221 let mut request = client.post(&url).body(body.clone()).build().map_err(|e| {
222 IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
223 })?;
224
225 if let Err(e) = decorator.decorate(&mut request).await {
226 debug!("request decorator: {e:?}");
227 return Err(
228 IngestionClientError::Transient(format!("decorating request: {e}"))
229 .into_retry(),
230 );
231 }
232
233 let response = client.execute(request).await.map_err(|e| {
234 IngestionClientError::Transient(format!("network error: {e}")).into_retry()
235 })?;
236
237 let status = response.status();
238 match status.as_u16() {
239 200..=299 => Ok(()),
240 400..=499 => {
241 let body = response.text().await.unwrap_or_default();
242 warn!("insert_stream client error ({status}): {body}");
243 Err(IngestionClientError::Permanent(body).into_retry())
244 }
245 500..=599 => {
246 let body = response.text().await.unwrap_or_default();
247 debug!("insert_stream server error ({status}): {body}");
248 Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
249 }
250 _ => {
251 let body = response.text().await.unwrap_or_default();
252 warn!("insert_stream unexpected status ({status}): {body}");
253 Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
254 }
255 }
256 })
257 .await
258 }
259
260 #[span_fn]
261 #[expect(clippy::too_many_arguments)]
262 async fn push_block(
263 client: &mut reqwest::Client,
264 root_path: &str,
265 buffer: &dyn StreamBlock,
266 current_queue_size: &AtomicIsize,
267 max_queue_size: isize,
268 retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
269 decorator: &dyn RequestDecorator,
270 process_info: &ProcessInfo,
271 ) -> Result<(), IngestionClientError> {
272 trace!("push_block");
273 if current_queue_size.load(Ordering::Relaxed) >= max_queue_size {
274 debug!("dropping data, queue over max_queue_size");
277 return Ok(());
278 }
279 let encoded_block: bytes::Bytes = buffer
280 .encode_bin(process_info)
281 .map_err(|e| IngestionClientError::Permanent(format!("encoding block: {e}")))?
282 .into();
283
284 let url = format!("{root_path}/ingestion/insert_block");
285
286 tokio_retry2::Retry::spawn(retry_strategy, || async {
287 let mut request = client
288 .post(&url)
289 .body(encoded_block.clone())
290 .build()
291 .map_err(|e| {
292 IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
293 })?;
294
295 if let Err(e) = decorator.decorate(&mut request).await {
296 debug!("request decorator: {e:?}");
297 return Err(
298 IngestionClientError::Transient(format!("decorating request: {e}"))
299 .into_retry(),
300 );
301 }
302
303 trace!("push_block: executing request");
304
305 let response = client.execute(request).await.map_err(|e| {
306 IngestionClientError::Transient(format!("network error: {e}")).into_retry()
307 })?;
308
309 let status = response.status();
310 match status.as_u16() {
311 200..=299 => Ok(()),
312 400..=499 => {
313 let body = response.text().await.unwrap_or_default();
314 warn!("insert_block client error ({status}): {body}");
315 Err(IngestionClientError::Permanent(body).into_retry())
316 }
317 500..=599 => {
318 let body = response.text().await.unwrap_or_default();
319 debug!("insert_block server error ({status}): {body}");
320 Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
321 }
322 _ => {
323 let body = response.text().await.unwrap_or_default();
324 warn!("insert_block unexpected status ({status}): {body}");
325 Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
326 }
327 }
328 })
329 .await
330 }
331
332 #[expect(clippy::too_many_arguments)]
333 async fn handle_sink_event(
334 message: SinkEvent,
335 client: &mut reqwest::Client,
336 addr: &str,
337 opt_process_info: &mut Option<Arc<ProcessInfo>>,
338 queue_size: &Arc<AtomicIsize>,
339 max_queue_size: isize,
340 metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
341 blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
342 decorator: &dyn RequestDecorator,
343 ) {
344 match message {
345 SinkEvent::Shutdown => {
346 }
348 SinkEvent::Startup(process_info) => {
349 *opt_process_info = Some(process_info.clone());
350 if let Err(e) =
351 Self::push_process(client, addr, process_info, metadata_retry, decorator).await
352 {
353 error!("error sending process: {e}");
354 }
355 }
356 SinkEvent::InitStream(stream_info) => {
357 if let Err(e) =
358 Self::push_stream(client, addr, stream_info, metadata_retry, decorator).await
359 {
360 error!("error sending stream: {e}");
361 }
362 }
363 SinkEvent::ProcessLogBlock(buffer) => {
364 if let Some(process_info) = opt_process_info {
365 if let Err(e) = Self::push_block(
366 client,
367 addr,
368 &*buffer,
369 queue_size,
370 max_queue_size,
371 blocks_retry,
372 decorator,
373 process_info,
374 )
375 .await
376 {
377 error!("error sending log block: {e}");
378 }
379 } else {
380 error!("trying to send blocks before Startup message");
381 }
382 }
383 SinkEvent::ProcessMetricsBlock(buffer) => {
384 if let Some(process_info) = opt_process_info {
385 if let Err(e) = Self::push_block(
386 client,
387 addr,
388 &*buffer,
389 queue_size,
390 max_queue_size,
391 blocks_retry,
392 decorator,
393 process_info,
394 )
395 .await
396 {
397 error!("error sending metrics block: {e}");
398 }
399 } else {
400 error!("trying to send blocks before Startup message");
401 }
402 }
403 SinkEvent::ProcessThreadBlock(buffer) => {
404 if let Some(process_info) = opt_process_info {
405 if let Err(e) = Self::push_block(
406 client,
407 addr,
408 &*buffer,
409 queue_size,
410 max_queue_size,
411 blocks_retry,
412 decorator,
413 process_info,
414 )
415 .await
416 {
417 error!("error sending thread block: {e}");
418 }
419 } else {
420 error!("trying to send blocks before Startup message");
421 }
422 }
423 }
424 }
425
426 #[expect(clippy::too_many_arguments)]
427 async fn thread_proc_impl(
428 addr: String,
429 receiver: std::sync::mpsc::Receiver<SinkEvent>,
430 queue_size: Arc<AtomicIsize>,
431 max_queue_size: isize,
432 metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
433 blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
434 decorator: &dyn RequestDecorator,
435 shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
436 ) {
437 let mut opt_process_info = None;
438 let mut client = match reqwest::Client::builder()
439 .pool_idle_timeout(Some(core::time::Duration::from_secs(2)))
440 .build()
441 {
442 Ok(client) => client,
443 Err(e) => {
444 error!("Error creating http client: {e:?}");
445 return;
446 }
447 };
448 if let Some(process_id) = micromegas_tracing::dispatch::process_id() {
450 let cpu_tracing_enabled =
451 micromegas_tracing::dispatch::cpu_tracing_enabled().unwrap_or(false);
452 info!("process_id={process_id}, cpu_tracing_enabled={cpu_tracing_enabled}");
453 }
454 let flusher = FlushMonitor::default();
455 loop {
456 let timeout = max(0, flusher.time_to_flush_seconds());
457 match receiver.recv_timeout(Duration::from_secs(timeout as u64)) {
458 Ok(message) => {
459 queue_size.fetch_sub(1, Ordering::Relaxed);
460 match message {
461 SinkEvent::Shutdown => {
462 debug!("received shutdown signal, flushing remaining data");
463 let mut count = 0;
465 while let Ok(remaining_message) = receiver.try_recv() {
466 queue_size.fetch_sub(1, Ordering::Relaxed);
467 count += 1;
468 match remaining_message {
469 SinkEvent::Shutdown => break, remaining_msg => {
471 Self::handle_sink_event(
473 remaining_msg,
474 &mut client,
475 &addr,
476 &mut opt_process_info,
477 &queue_size,
478 max_queue_size,
479 metadata_retry.clone(),
480 blocks_retry.clone(),
481 decorator,
482 )
483 .await;
484 }
485 }
486 }
487 debug!(
488 "telemetry thread shutdown complete, processed {} remaining messages",
489 count
490 );
491 let (lock, cvar) = &*shutdown_complete;
493 let mut completed = lock.lock().unwrap();
494 *completed = true;
495 cvar.notify_all();
496 return;
497 }
498 other_message => {
499 Self::handle_sink_event(
500 other_message,
501 &mut client,
502 &addr,
503 &mut opt_process_info,
504 &queue_size,
505 max_queue_size,
506 metadata_retry.clone(),
507 blocks_retry.clone(),
508 decorator,
509 )
510 .await;
511 }
512 }
513 }
514 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
515 flusher.tick();
516 }
517 Err(_e) => {
518 return;
521 }
522 }
523 }
524 }
525
526 #[allow(clippy::needless_pass_by_value,clippy::too_many_arguments
528 )]
529 fn thread_proc(
530 addr: String,
531 receiver: std::sync::mpsc::Receiver<SinkEvent>,
532 queue_size: Arc<AtomicIsize>,
533 max_queue_size: isize,
534 metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
535 blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
536 make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
537 shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
538 ) {
539 let tokio_runtime = match tokio::runtime::Runtime::new() {
541 Ok(rt) => rt,
542 Err(e) => {
543 error!("Failed to create tokio runtime for telemetry: {e}");
544 return;
545 }
546 };
547 let decorator = make_decorator();
548 tokio_runtime.block_on(Self::thread_proc_impl(
549 addr,
550 receiver,
551 queue_size,
552 max_queue_size,
553 metadata_retry,
554 blocks_retry,
555 decorator.as_ref(),
556 shutdown_complete,
557 ));
558 }
559}
560
561impl EventSink for HttpEventSink {
562 fn on_startup(&self, process_info: Arc<ProcessInfo>) {
563 self.send(SinkEvent::Startup(process_info));
564 }
565
566 fn on_shutdown(&self) {
567 self.send(SinkEvent::Shutdown);
569
570 let (lock, cvar) = &*self.shutdown_complete;
572 let completed = lock.lock().unwrap();
573 let timeout = std::time::Duration::from_secs(5);
574 let _result = cvar.wait_timeout_while(completed, timeout, |&mut c| !c);
575 }
576
577 fn on_log_enabled(&self, _metadata: &LogMetadata) -> bool {
578 true
580 }
581
582 fn on_log(
583 &self,
584 _metadata: &LogMetadata,
585 _properties: &[Property],
586 _time: i64,
587 _args: fmt::Arguments<'_>,
588 ) {
589 }
590
591 fn on_init_log_stream(&self, log_stream: &LogStream) {
592 self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
593 log_stream,
594 ))));
595 }
596
597 fn on_process_log_block(&self, log_block: Arc<LogBlock>) {
598 self.send(SinkEvent::ProcessLogBlock(log_block));
599 }
600
601 fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
602 self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
603 metrics_stream,
604 ))));
605 }
606
607 fn on_process_metrics_block(&self, metrics_block: Arc<MetricsBlock>) {
608 self.send(SinkEvent::ProcessMetricsBlock(metrics_block));
609 }
610
611 fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
612 self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
613 thread_stream,
614 ))));
615 }
616
617 fn on_process_thread_block(&self, thread_block: Arc<ThreadBlock>) {
618 self.send(SinkEvent::ProcessThreadBlock(thread_block));
619 }
620
621 fn is_busy(&self) -> bool {
622 let size = self.queue_size.load(Ordering::Relaxed);
623 debug_assert!(size >= 0, "queue_size went negative: {size}");
624 size > 0
625 }
626}