micromegas_analytics/lakehouse/
async_parquet_writer.rs1use bytes::Bytes;
2use datafusion::parquet::{self, errors::ParquetError};
3use futures::future::BoxFuture;
4use object_store::buffered::BufWriter;
5use parquet::arrow::async_writer::AsyncFileWriter;
6use std::sync::{
7 Arc,
8 atomic::{AtomicI64, Ordering},
9};
10use tokio::io::AsyncWriteExt;
11
12#[derive(Debug)]
14pub struct AsyncParquetWriter {
15 w: BufWriter,
16 counter: Arc<AtomicI64>,
17}
18
19impl AsyncParquetWriter {
20 pub fn new(w: BufWriter, counter: Arc<AtomicI64>) -> Self {
21 Self { w, counter }
22 }
23}
24
25impl AsyncFileWriter for AsyncParquetWriter {
26 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
27 self.counter.fetch_add(bs.len() as i64, Ordering::Relaxed);
28 Box::pin(async {
29 self.w
30 .put(bs)
31 .await
32 .map_err(|err| ParquetError::External(Box::new(err)))
33 })
34 }
35
36 fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
37 Box::pin(async {
38 self.w
39 .shutdown()
40 .await
41 .map_err(|err| ParquetError::External(Box::new(err)))
42 })
43 }
44}