micromegas_perfetto/
chunk_sender.rs

1use datafusion::arrow::array::{BinaryArray, Int32Array};
2use datafusion::arrow::record_batch::RecordBatch;
3use tokio::sync::mpsc;
4
5use crate::async_writer::AsyncWriter;
6
7/// ChunkSender sends data as RecordBatch chunks through a channel.
8/// It accumulates data until reaching a threshold size, then sends it as a chunk.
9pub struct ChunkSender {
10    chunk_sender: mpsc::Sender<anyhow::Result<RecordBatch>>,
11    chunk_id: i32,
12    current_chunk: Vec<u8>,
13    chunk_threshold: usize,
14}
15
16impl ChunkSender {
17    /// Creates a new ChunkSender with specified chunk size threshold
18    pub fn new(
19        chunk_sender: mpsc::Sender<anyhow::Result<RecordBatch>>,
20        chunk_threshold: usize,
21    ) -> Self {
22        Self {
23            chunk_sender,
24            chunk_id: 0,
25            current_chunk: Vec::new(),
26            chunk_threshold,
27        }
28    }
29
30    /// Writes data to the chunk buffer, automatically flushing when threshold is reached
31    pub async fn write(&mut self, buf: &[u8]) -> anyhow::Result<()> {
32        self.current_chunk.extend_from_slice(buf);
33
34        // If chunk exceeds threshold, flush it
35        if self.current_chunk.len() >= self.chunk_threshold {
36            self.flush().await?;
37        }
38        Ok(())
39    }
40
41    /// Flushes the current chunk as a RecordBatch to the channel
42    pub async fn flush(&mut self) -> anyhow::Result<()> {
43        if self.current_chunk.is_empty() {
44            return Ok(());
45        }
46
47        let chunk_id_array = Int32Array::from(vec![self.chunk_id]);
48        let chunk_data_array = BinaryArray::from(vec![self.current_chunk.as_slice()]);
49
50        let batch = RecordBatch::try_from_iter(vec![
51            (
52                "chunk_id",
53                std::sync::Arc::new(chunk_id_array)
54                    as std::sync::Arc<dyn datafusion::arrow::array::Array>,
55            ),
56            (
57                "chunk_data",
58                std::sync::Arc::new(chunk_data_array)
59                    as std::sync::Arc<dyn datafusion::arrow::array::Array>,
60            ),
61        ])?;
62
63        // Send the batch through the channel
64        self.chunk_sender
65            .send(Ok(batch))
66            .await
67            .map_err(|_| anyhow::anyhow!("Channel receiver dropped"))?;
68
69        self.chunk_id += 1;
70        self.current_chunk.clear();
71        Ok(())
72    }
73}
74
75/// Implementation of AsyncWriter for ChunkSender
76#[async_trait::async_trait]
77impl AsyncWriter for ChunkSender {
78    async fn write(&mut self, buf: &[u8]) -> anyhow::Result<()> {
79        self.write(buf).await
80    }
81
82    async fn flush(&mut self) -> anyhow::Result<()> {
83        self.flush().await
84    }
85}