micromegas_analytics/lakehouse/
async_parquet_writer.rs

1use bytes::Bytes;
2use datafusion::parquet::{self, errors::ParquetError};
3use futures::future::BoxFuture;
4use object_store::buffered::BufWriter;
5use parquet::arrow::async_writer::AsyncFileWriter;
6use std::sync::{
7    Arc,
8    atomic::{AtomicI64, Ordering},
9};
10use tokio::io::AsyncWriteExt;
11
12/// A Parquet writer that writes to an `object_store::buffered::BufWriter` and counts bytes written.
13#[derive(Debug)]
14pub struct AsyncParquetWriter {
15    w: BufWriter,
16    counter: Arc<AtomicI64>,
17}
18
19impl AsyncParquetWriter {
20    pub fn new(w: BufWriter, counter: Arc<AtomicI64>) -> Self {
21        Self { w, counter }
22    }
23}
24
25impl AsyncFileWriter for AsyncParquetWriter {
26    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
27        self.counter.fetch_add(bs.len() as i64, Ordering::Relaxed);
28        Box::pin(async {
29            self.w
30                .put(bs)
31                .await
32                .map_err(|err| ParquetError::External(Box::new(err)))
33        })
34    }
35
36    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
37        Box::pin(async {
38            self.w
39                .shutdown()
40                .await
41                .map_err(|err| ParquetError::External(Box::new(err)))
42        })
43    }
44}