micromegas_analytics/dfext/
async_log_stream.rs

1use datafusion::{
2    arrow::{
3        array::{PrimitiveBuilder, RecordBatch, StringBuilder},
4        datatypes::{SchemaRef, TimestampNanosecondType},
5    },
6    common::Result,
7    error::DataFusionError,
8    execution::RecordBatchStream,
9};
10use futures::Stream;
11use std::{
12    sync::Arc,
13    task::{Context, Poll},
14};
15use tokio::sync::mpsc;
16
17/// A stream of log messages that can be converted into a `RecordBatchStream`.
18pub struct AsyncLogStream {
19    schema: SchemaRef,
20    rx: mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)>,
21}
22
23impl AsyncLogStream {
24    pub fn new(
25        schema: SchemaRef,
26        rx: mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)>,
27    ) -> Self {
28        Self { schema, rx }
29    }
30}
31
32impl Stream for AsyncLogStream {
33    type Item = Result<RecordBatch>;
34
35    fn poll_next(
36        mut self: std::pin::Pin<&mut Self>,
37        cx: &mut Context<'_>,
38    ) -> Poll<Option<Self::Item>> {
39        let mut messages = vec![];
40        let limit = self.rx.max_capacity();
41        if self
42            .rx
43            .poll_recv_many(cx, &mut messages, limit)
44            .is_pending()
45        {
46            cx.waker().wake_by_ref();
47            return Poll::Pending;
48        }
49        if messages.is_empty() {
50            if self.rx.is_closed() {
51                // channel closed, aborting
52                return Poll::Ready(None);
53            }
54            // not sure this can happen
55            cx.waker().wake_by_ref();
56            return Poll::Pending;
57        }
58
59        let mut times = PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(messages.len());
60        let mut msgs = StringBuilder::new();
61        for msg in messages {
62            times.append_value(msg.0.timestamp_nanos_opt().unwrap_or_default());
63            msgs.append_value(msg.1);
64        }
65
66        let rb_res = RecordBatch::try_new(
67            self.schema.clone(),
68            vec![
69                Arc::new(times.finish().with_timezone_utc()),
70                Arc::new(msgs.finish()),
71            ],
72        )
73        .map_err(|e| DataFusionError::ArrowError(e.into(), None));
74        Poll::Ready(Some(rb_res))
75    }
76
77    fn size_hint(&self) -> (usize, Option<usize>) {
78        (self.rx.len(), Some(self.rx.len()))
79    }
80}
81
82impl RecordBatchStream for AsyncLogStream {
83    fn schema(&self) -> SchemaRef {
84        Arc::clone(&self.schema)
85    }
86}