micromegas_analytics/dfext/
async_log_stream.rs1use datafusion::{
2 arrow::{
3 array::{PrimitiveBuilder, RecordBatch, StringBuilder},
4 datatypes::{SchemaRef, TimestampNanosecondType},
5 },
6 common::Result,
7 error::DataFusionError,
8 execution::RecordBatchStream,
9};
10use futures::Stream;
11use std::{
12 sync::Arc,
13 task::{Context, Poll},
14};
15use tokio::sync::mpsc;
16
17pub struct AsyncLogStream {
19 schema: SchemaRef,
20 rx: mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)>,
21}
22
23impl AsyncLogStream {
24 pub fn new(
25 schema: SchemaRef,
26 rx: mpsc::Receiver<(chrono::DateTime<chrono::Utc>, String)>,
27 ) -> Self {
28 Self { schema, rx }
29 }
30}
31
32impl Stream for AsyncLogStream {
33 type Item = Result<RecordBatch>;
34
35 fn poll_next(
36 mut self: std::pin::Pin<&mut Self>,
37 cx: &mut Context<'_>,
38 ) -> Poll<Option<Self::Item>> {
39 let mut messages = vec![];
40 let limit = self.rx.max_capacity();
41 if self
42 .rx
43 .poll_recv_many(cx, &mut messages, limit)
44 .is_pending()
45 {
46 cx.waker().wake_by_ref();
47 return Poll::Pending;
48 }
49 if messages.is_empty() {
50 if self.rx.is_closed() {
51 return Poll::Ready(None);
53 }
54 cx.waker().wake_by_ref();
56 return Poll::Pending;
57 }
58
59 let mut times = PrimitiveBuilder::<TimestampNanosecondType>::with_capacity(messages.len());
60 let mut msgs = StringBuilder::new();
61 for msg in messages {
62 times.append_value(msg.0.timestamp_nanos_opt().unwrap_or_default());
63 msgs.append_value(msg.1);
64 }
65
66 let rb_res = RecordBatch::try_new(
67 self.schema.clone(),
68 vec![
69 Arc::new(times.finish().with_timezone_utc()),
70 Arc::new(msgs.finish()),
71 ],
72 )
73 .map_err(|e| DataFusionError::ArrowError(e.into(), None));
74 Poll::Ready(Some(rb_res))
75 }
76
77 fn size_hint(&self) -> (usize, Option<usize>) {
78 (self.rx.len(), Some(self.rx.len()))
79 }
80}
81
82impl RecordBatchStream for AsyncLogStream {
83 fn schema(&self) -> SchemaRef {
84 Arc::clone(&self.schema)
85 }
86}