micromegas_perfetto/
chunk_sender.rs1use datafusion::arrow::array::{BinaryArray, Int32Array};
2use datafusion::arrow::record_batch::RecordBatch;
3use tokio::sync::mpsc;
4
5use crate::async_writer::AsyncWriter;
6
7pub struct ChunkSender {
10 chunk_sender: mpsc::Sender<anyhow::Result<RecordBatch>>,
11 chunk_id: i32,
12 current_chunk: Vec<u8>,
13 chunk_threshold: usize,
14}
15
16impl ChunkSender {
17 pub fn new(
19 chunk_sender: mpsc::Sender<anyhow::Result<RecordBatch>>,
20 chunk_threshold: usize,
21 ) -> Self {
22 Self {
23 chunk_sender,
24 chunk_id: 0,
25 current_chunk: Vec::new(),
26 chunk_threshold,
27 }
28 }
29
30 pub async fn write(&mut self, buf: &[u8]) -> anyhow::Result<()> {
32 self.current_chunk.extend_from_slice(buf);
33
34 if self.current_chunk.len() >= self.chunk_threshold {
36 self.flush().await?;
37 }
38 Ok(())
39 }
40
41 pub async fn flush(&mut self) -> anyhow::Result<()> {
43 if self.current_chunk.is_empty() {
44 return Ok(());
45 }
46
47 let chunk_id_array = Int32Array::from(vec![self.chunk_id]);
48 let chunk_data_array = BinaryArray::from(vec![self.current_chunk.as_slice()]);
49
50 let batch = RecordBatch::try_from_iter(vec![
51 (
52 "chunk_id",
53 std::sync::Arc::new(chunk_id_array)
54 as std::sync::Arc<dyn datafusion::arrow::array::Array>,
55 ),
56 (
57 "chunk_data",
58 std::sync::Arc::new(chunk_data_array)
59 as std::sync::Arc<dyn datafusion::arrow::array::Array>,
60 ),
61 ])?;
62
63 self.chunk_sender
65 .send(Ok(batch))
66 .await
67 .map_err(|_| anyhow::anyhow!("Channel receiver dropped"))?;
68
69 self.chunk_id += 1;
70 self.current_chunk.clear();
71 Ok(())
72 }
73}
74
75#[async_trait::async_trait]
77impl AsyncWriter for ChunkSender {
78 async fn write(&mut self, buf: &[u8]) -> anyhow::Result<()> {
79 self.write(buf).await
80 }
81
82 async fn flush(&mut self) -> anyhow::Result<()> {
83 self.flush().await
84 }
85}