micromegas_telemetry_sink/
http_event_sink.rs

1use micromegas_telemetry::stream_info::StreamInfo;
2use micromegas_telemetry::wire_format::encode_cbor;
3use micromegas_tracing::{
4    event::EventSink,
5    flush_monitor::FlushMonitor,
6    logs::{LogBlock, LogMetadata, LogStream},
7    metrics::{MetricsBlock, MetricsStream},
8    prelude::*,
9    property_set::Property,
10    spans::{ThreadBlock, ThreadStream},
11};
12use std::{
13    cmp::max,
14    fmt,
15    sync::{Arc, Mutex},
16};
17use std::{
18    sync::atomic::{AtomicIsize, Ordering},
19    time::Duration,
20};
21use tokio_retry2::RetryError;
22
23use crate::request_decorator::RequestDecorator;
24use crate::stream_block::StreamBlock;
25use crate::stream_info::make_stream_info;
26
27/// Error type for ingestion client operations.
28/// Explicitly categorizes errors to control retry behavior.
29///
30/// Logging strategy: Transient errors (5xx, network) use `debug!` to avoid
31/// polluting instrumented applications' logs with telemetry infrastructure noise.
32/// Permanent errors (4xx) use `warn!` since they indicate a bug in the client.
33#[derive(Clone, Debug)]
34enum IngestionClientError {
35    /// Transient error - should retry (network issues, 5xx responses)
36    Transient(String),
37    /// Permanent error - should NOT retry (4xx responses, malformed data)
38    Permanent(String),
39}
40
41impl std::fmt::Display for IngestionClientError {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            IngestionClientError::Transient(msg) => write!(f, "transient error: {msg}"),
45            IngestionClientError::Permanent(msg) => write!(f, "permanent error: {msg}"),
46        }
47    }
48}
49
50impl std::error::Error for IngestionClientError {}
51
52impl IngestionClientError {
53    fn into_retry(self) -> RetryError<Self> {
54        match self {
55            IngestionClientError::Transient(_) => RetryError::transient(self),
56            IngestionClientError::Permanent(_) => RetryError::permanent(self),
57        }
58    }
59}
60
61#[derive(Debug)]
62enum SinkEvent {
63    Startup(Arc<ProcessInfo>),
64    InitStream(Arc<StreamInfo>),
65    ProcessLogBlock(Arc<LogBlock>),
66    ProcessMetricsBlock(Arc<MetricsBlock>),
67    ProcessThreadBlock(Arc<ThreadBlock>),
68    Shutdown,
69}
70
71pub struct HttpEventSink {
72    thread: Option<std::thread::JoinHandle<()>>,
73    // TODO: simplify this?
74    sender: Mutex<Option<std::sync::mpsc::Sender<SinkEvent>>>,
75    queue_size: Arc<AtomicIsize>,
76    shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
77}
78
79impl Drop for HttpEventSink {
80    fn drop(&mut self) {
81        // Send shutdown signal (ignore error if thread already stopped from on_shutdown)
82        {
83            let sender_guard = self.sender.lock().unwrap();
84            if let Some(sender) = sender_guard.as_ref() {
85                let _ = sender.send(SinkEvent::Shutdown);
86            }
87        }
88
89        // Now wait for the thread to finish
90        if let Some(handle) = self.thread.take()
91            && let Err(e) = handle.join()
92        {
93            // Don't panic on join failure, just log it
94            eprintln!("Warning: telemetry thread join failed: {:?}", e);
95        }
96    }
97}
98
99impl HttpEventSink {
100    /// Creates a new `HttpEventSink`.
101    ///
102    /// This function spawns a new thread that handles sending telemetry data
103    /// to the specified HTTP server.
104    ///
105    /// # Arguments
106    ///
107    /// * `addr_server` - The address of the HTTP server to send data to.
108    /// * `max_queue_size` - The maximum number of events to queue before dropping them.
109    /// * `metadata_retry` - The retry strategy for sending metadata (process info, stream info).
110    /// * `blocks_retry` - The retry strategy for sending blocks (log, metrics, thread).
111    /// * `make_decorator` - A closure that returns a `RequestDecorator` for modifying HTTP requests.
112    pub fn new(
113        addr_server: &str,
114        max_queue_size: isize,
115        metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
116        blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
117        make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
118    ) -> Self {
119        let addr = addr_server.to_owned();
120        let (sender, receiver) = std::sync::mpsc::channel::<SinkEvent>();
121        let queue_size = Arc::new(AtomicIsize::new(0));
122        let thread_queue_size = queue_size.clone();
123        let shutdown_complete = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
124        let thread_shutdown_complete = shutdown_complete.clone();
125        Self {
126            thread: Some(std::thread::spawn(move || {
127                Self::thread_proc(
128                    addr,
129                    receiver,
130                    thread_queue_size,
131                    max_queue_size,
132                    metadata_retry,
133                    blocks_retry,
134                    make_decorator,
135                    thread_shutdown_complete,
136                );
137            })),
138            sender: Mutex::new(Some(sender)),
139            queue_size,
140            shutdown_complete,
141        }
142    }
143
144    fn send(&self, event: SinkEvent) {
145        let guard = self.sender.lock().unwrap();
146        if let Some(sender) = guard.as_ref() {
147            self.queue_size.fetch_add(1, Ordering::Relaxed);
148            if let Err(e) = sender.send(event) {
149                self.queue_size.fetch_sub(1, Ordering::Relaxed);
150                error!("{}", e);
151            }
152        }
153    }
154
155    #[span_fn]
156    async fn push_process(
157        client: &mut reqwest::Client,
158        root_path: &str,
159        process_info: Arc<ProcessInfo>,
160        retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
161        decorator: &dyn RequestDecorator,
162    ) -> Result<(), IngestionClientError> {
163        debug!("sending process {process_info:?}");
164        let url = format!("{root_path}/ingestion/insert_process");
165        let body: bytes::Bytes = encode_cbor(&*process_info)
166            .map_err(|e| IngestionClientError::Permanent(format!("encoding process: {e}")))?
167            .into();
168        tokio_retry2::Retry::spawn(retry_strategy, || async {
169            let mut request = client.post(&url).body(body.clone()).build().map_err(|e| {
170                IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
171            })?;
172
173            if let Err(e) = decorator.decorate(&mut request).await {
174                debug!("request decorator: {e:?}");
175                return Err(
176                    IngestionClientError::Transient(format!("decorating request: {e}"))
177                        .into_retry(),
178                );
179            }
180
181            let response = client.execute(request).await.map_err(|e| {
182                IngestionClientError::Transient(format!("network error: {e}")).into_retry()
183            })?;
184
185            let status = response.status();
186            match status.as_u16() {
187                200..=299 => Ok(()),
188                400..=499 => {
189                    let body = response.text().await.unwrap_or_default();
190                    warn!("insert_process client error ({status}): {body}");
191                    Err(IngestionClientError::Permanent(body).into_retry())
192                }
193                500..=599 => {
194                    let body = response.text().await.unwrap_or_default();
195                    debug!("insert_process server error ({status}): {body}");
196                    Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
197                }
198                _ => {
199                    let body = response.text().await.unwrap_or_default();
200                    warn!("insert_process unexpected status ({status}): {body}");
201                    Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
202                }
203            }
204        })
205        .await
206    }
207
208    #[span_fn]
209    async fn push_stream(
210        client: &mut reqwest::Client,
211        root_path: &str,
212        stream_info: Arc<StreamInfo>,
213        retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
214        decorator: &dyn RequestDecorator,
215    ) -> Result<(), IngestionClientError> {
216        let url = format!("{root_path}/ingestion/insert_stream");
217        let body: bytes::Bytes = encode_cbor(&*stream_info)
218            .map_err(|e| IngestionClientError::Permanent(format!("encoding stream: {e}")))?
219            .into();
220        tokio_retry2::Retry::spawn(retry_strategy, || async {
221            let mut request = client.post(&url).body(body.clone()).build().map_err(|e| {
222                IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
223            })?;
224
225            if let Err(e) = decorator.decorate(&mut request).await {
226                debug!("request decorator: {e:?}");
227                return Err(
228                    IngestionClientError::Transient(format!("decorating request: {e}"))
229                        .into_retry(),
230                );
231            }
232
233            let response = client.execute(request).await.map_err(|e| {
234                IngestionClientError::Transient(format!("network error: {e}")).into_retry()
235            })?;
236
237            let status = response.status();
238            match status.as_u16() {
239                200..=299 => Ok(()),
240                400..=499 => {
241                    let body = response.text().await.unwrap_or_default();
242                    warn!("insert_stream client error ({status}): {body}");
243                    Err(IngestionClientError::Permanent(body).into_retry())
244                }
245                500..=599 => {
246                    let body = response.text().await.unwrap_or_default();
247                    debug!("insert_stream server error ({status}): {body}");
248                    Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
249                }
250                _ => {
251                    let body = response.text().await.unwrap_or_default();
252                    warn!("insert_stream unexpected status ({status}): {body}");
253                    Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
254                }
255            }
256        })
257        .await
258    }
259
260    #[span_fn]
261    #[expect(clippy::too_many_arguments)]
262    async fn push_block(
263        client: &mut reqwest::Client,
264        root_path: &str,
265        buffer: &dyn StreamBlock,
266        current_queue_size: &AtomicIsize,
267        max_queue_size: isize,
268        retry_strategy: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
269        decorator: &dyn RequestDecorator,
270        process_info: &ProcessInfo,
271    ) -> Result<(), IngestionClientError> {
272        trace!("push_block");
273        if current_queue_size.load(Ordering::Relaxed) >= max_queue_size {
274            // could be better to have a budget for each block type
275            // this way thread data would not starve the other streams
276            debug!("dropping data, queue over max_queue_size");
277            return Ok(());
278        }
279        let encoded_block: bytes::Bytes = buffer
280            .encode_bin(process_info)
281            .map_err(|e| IngestionClientError::Permanent(format!("encoding block: {e}")))?
282            .into();
283
284        let url = format!("{root_path}/ingestion/insert_block");
285
286        tokio_retry2::Retry::spawn(retry_strategy, || async {
287            let mut request = client
288                .post(&url)
289                .body(encoded_block.clone())
290                .build()
291                .map_err(|e| {
292                    IngestionClientError::Permanent(format!("building request: {e}")).into_retry()
293                })?;
294
295            if let Err(e) = decorator.decorate(&mut request).await {
296                debug!("request decorator: {e:?}");
297                return Err(
298                    IngestionClientError::Transient(format!("decorating request: {e}"))
299                        .into_retry(),
300                );
301            }
302
303            trace!("push_block: executing request");
304
305            let response = client.execute(request).await.map_err(|e| {
306                IngestionClientError::Transient(format!("network error: {e}")).into_retry()
307            })?;
308
309            let status = response.status();
310            match status.as_u16() {
311                200..=299 => Ok(()),
312                400..=499 => {
313                    let body = response.text().await.unwrap_or_default();
314                    warn!("insert_block client error ({status}): {body}");
315                    Err(IngestionClientError::Permanent(body).into_retry())
316                }
317                500..=599 => {
318                    let body = response.text().await.unwrap_or_default();
319                    debug!("insert_block server error ({status}): {body}");
320                    Err(IngestionClientError::Transient(format!("{status}: {body}")).into_retry())
321                }
322                _ => {
323                    let body = response.text().await.unwrap_or_default();
324                    warn!("insert_block unexpected status ({status}): {body}");
325                    Err(IngestionClientError::Permanent(format!("{status}: {body}")).into_retry())
326                }
327            }
328        })
329        .await
330    }
331
332    #[expect(clippy::too_many_arguments)]
333    async fn handle_sink_event(
334        message: SinkEvent,
335        client: &mut reqwest::Client,
336        addr: &str,
337        opt_process_info: &mut Option<Arc<ProcessInfo>>,
338        queue_size: &Arc<AtomicIsize>,
339        max_queue_size: isize,
340        metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
341        blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
342        decorator: &dyn RequestDecorator,
343    ) {
344        match message {
345            SinkEvent::Shutdown => {
346                // This should not happen in this function, but handle it gracefully
347            }
348            SinkEvent::Startup(process_info) => {
349                *opt_process_info = Some(process_info.clone());
350                if let Err(e) =
351                    Self::push_process(client, addr, process_info, metadata_retry, decorator).await
352                {
353                    error!("error sending process: {e}");
354                }
355            }
356            SinkEvent::InitStream(stream_info) => {
357                if let Err(e) =
358                    Self::push_stream(client, addr, stream_info, metadata_retry, decorator).await
359                {
360                    error!("error sending stream: {e}");
361                }
362            }
363            SinkEvent::ProcessLogBlock(buffer) => {
364                if let Some(process_info) = opt_process_info {
365                    if let Err(e) = Self::push_block(
366                        client,
367                        addr,
368                        &*buffer,
369                        queue_size,
370                        max_queue_size,
371                        blocks_retry,
372                        decorator,
373                        process_info,
374                    )
375                    .await
376                    {
377                        error!("error sending log block: {e}");
378                    }
379                } else {
380                    error!("trying to send blocks before Startup message");
381                }
382            }
383            SinkEvent::ProcessMetricsBlock(buffer) => {
384                if let Some(process_info) = opt_process_info {
385                    if let Err(e) = Self::push_block(
386                        client,
387                        addr,
388                        &*buffer,
389                        queue_size,
390                        max_queue_size,
391                        blocks_retry,
392                        decorator,
393                        process_info,
394                    )
395                    .await
396                    {
397                        error!("error sending metrics block: {e}");
398                    }
399                } else {
400                    error!("trying to send blocks before Startup message");
401                }
402            }
403            SinkEvent::ProcessThreadBlock(buffer) => {
404                if let Some(process_info) = opt_process_info {
405                    if let Err(e) = Self::push_block(
406                        client,
407                        addr,
408                        &*buffer,
409                        queue_size,
410                        max_queue_size,
411                        blocks_retry,
412                        decorator,
413                        process_info,
414                    )
415                    .await
416                    {
417                        error!("error sending thread block: {e}");
418                    }
419                } else {
420                    error!("trying to send blocks before Startup message");
421                }
422            }
423        }
424    }
425
426    #[expect(clippy::too_many_arguments)]
427    async fn thread_proc_impl(
428        addr: String,
429        receiver: std::sync::mpsc::Receiver<SinkEvent>,
430        queue_size: Arc<AtomicIsize>,
431        max_queue_size: isize,
432        metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
433        blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
434        decorator: &dyn RequestDecorator,
435        shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
436    ) {
437        let mut opt_process_info = None;
438        let mut client = match reqwest::Client::builder()
439            .pool_idle_timeout(Some(core::time::Duration::from_secs(2)))
440            .build()
441        {
442            Ok(client) => client,
443            Err(e) => {
444                error!("Error creating http client: {e:?}");
445                return;
446            }
447        };
448        // eagerly connect, a new process message is sure to follow if it's not already in queue
449        if let Some(process_id) = micromegas_tracing::dispatch::process_id() {
450            let cpu_tracing_enabled =
451                micromegas_tracing::dispatch::cpu_tracing_enabled().unwrap_or(false);
452            info!("process_id={process_id}, cpu_tracing_enabled={cpu_tracing_enabled}");
453        }
454        let flusher = FlushMonitor::default();
455        loop {
456            let timeout = max(0, flusher.time_to_flush_seconds());
457            match receiver.recv_timeout(Duration::from_secs(timeout as u64)) {
458                Ok(message) => {
459                    queue_size.fetch_sub(1, Ordering::Relaxed);
460                    match message {
461                        SinkEvent::Shutdown => {
462                            debug!("received shutdown signal, flushing remaining data");
463                            // Process any remaining messages in the queue before shutting down
464                            let mut count = 0;
465                            while let Ok(remaining_message) = receiver.try_recv() {
466                                queue_size.fetch_sub(1, Ordering::Relaxed);
467                                count += 1;
468                                match remaining_message {
469                                    SinkEvent::Shutdown => break, // Don't process multiple shutdowns
470                                    remaining_msg => {
471                                        // Process the remaining message using the same logic as the main loop
472                                        Self::handle_sink_event(
473                                            remaining_msg,
474                                            &mut client,
475                                            &addr,
476                                            &mut opt_process_info,
477                                            &queue_size,
478                                            max_queue_size,
479                                            metadata_retry.clone(),
480                                            blocks_retry.clone(),
481                                            decorator,
482                                        )
483                                        .await;
484                                    }
485                                }
486                            }
487                            debug!(
488                                "telemetry thread shutdown complete, processed {} remaining messages",
489                                count
490                            );
491                            // Signal that shutdown is complete
492                            let (lock, cvar) = &*shutdown_complete;
493                            let mut completed = lock.lock().unwrap();
494                            *completed = true;
495                            cvar.notify_all();
496                            return;
497                        }
498                        other_message => {
499                            Self::handle_sink_event(
500                                other_message,
501                                &mut client,
502                                &addr,
503                                &mut opt_process_info,
504                                &queue_size,
505                                max_queue_size,
506                                metadata_retry.clone(),
507                                blocks_retry.clone(),
508                                decorator,
509                            )
510                            .await;
511                        }
512                    }
513                }
514                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
515                    flusher.tick();
516                }
517                Err(_e) => {
518                    // can only fail when the sending half is disconnected
519                    // println!("Error in telemetry thread: {}", e);
520                    return;
521                }
522            }
523        }
524    }
525
526    #[allow(clippy::needless_pass_by_value,// we don't want to leave the receiver in the calling thread
527	    clippy::too_many_arguments
528    )]
529    fn thread_proc(
530        addr: String,
531        receiver: std::sync::mpsc::Receiver<SinkEvent>,
532        queue_size: Arc<AtomicIsize>,
533        max_queue_size: isize,
534        metadata_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
535        blocks_retry: core::iter::Take<tokio_retry2::strategy::ExponentialBackoff>,
536        make_decorator: Box<dyn FnOnce() -> Arc<dyn RequestDecorator> + Send>,
537        shutdown_complete: Arc<(Mutex<bool>, std::sync::Condvar)>,
538    ) {
539        // TODO: add runtime as configuration option (or create one only if global don't exist)
540        let tokio_runtime = match tokio::runtime::Runtime::new() {
541            Ok(rt) => rt,
542            Err(e) => {
543                error!("Failed to create tokio runtime for telemetry: {e}");
544                return;
545            }
546        };
547        let decorator = make_decorator();
548        tokio_runtime.block_on(Self::thread_proc_impl(
549            addr,
550            receiver,
551            queue_size,
552            max_queue_size,
553            metadata_retry,
554            blocks_retry,
555            decorator.as_ref(),
556            shutdown_complete,
557        ));
558    }
559}
560
561impl EventSink for HttpEventSink {
562    fn on_startup(&self, process_info: Arc<ProcessInfo>) {
563        self.send(SinkEvent::Startup(process_info));
564    }
565
566    fn on_shutdown(&self) {
567        // Send shutdown event to trigger flushing of remaining data
568        self.send(SinkEvent::Shutdown);
569
570        // Wait for the background thread to signal that shutdown is complete
571        let (lock, cvar) = &*self.shutdown_complete;
572        let completed = lock.lock().unwrap();
573        let timeout = std::time::Duration::from_secs(5);
574        let _result = cvar.wait_timeout_while(completed, timeout, |&mut c| !c);
575    }
576
577    fn on_log_enabled(&self, _metadata: &LogMetadata) -> bool {
578        // If all previous filter succeeds this sink always agrees
579        true
580    }
581
582    fn on_log(
583        &self,
584        _metadata: &LogMetadata,
585        _properties: &[Property],
586        _time: i64,
587        _args: fmt::Arguments<'_>,
588    ) {
589    }
590
591    fn on_init_log_stream(&self, log_stream: &LogStream) {
592        self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
593            log_stream,
594        ))));
595    }
596
597    fn on_process_log_block(&self, log_block: Arc<LogBlock>) {
598        self.send(SinkEvent::ProcessLogBlock(log_block));
599    }
600
601    fn on_init_metrics_stream(&self, metrics_stream: &MetricsStream) {
602        self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
603            metrics_stream,
604        ))));
605    }
606
607    fn on_process_metrics_block(&self, metrics_block: Arc<MetricsBlock>) {
608        self.send(SinkEvent::ProcessMetricsBlock(metrics_block));
609    }
610
611    fn on_init_thread_stream(&self, thread_stream: &ThreadStream) {
612        self.send(SinkEvent::InitStream(Arc::new(make_stream_info(
613            thread_stream,
614        ))));
615    }
616
617    fn on_process_thread_block(&self, thread_block: Arc<ThreadBlock>) {
618        self.send(SinkEvent::ProcessThreadBlock(thread_block));
619    }
620
621    fn is_busy(&self) -> bool {
622        let size = self.queue_size.load(Ordering::Relaxed);
623        debug_assert!(size >= 0, "queue_size went negative: {size}");
624        size > 0
625    }
626}