micromegas_telemetry/
blob_storage.rs1use anyhow::Result;
2use futures::StreamExt;
3use futures::stream;
4use object_store::prefix::PrefixStore;
5use object_store::{ObjectStore, path::Path};
6use std::sync::Arc;
7
8#[derive(Debug)]
13pub struct BlobStorage {
14 blob_store: Arc<dyn ObjectStore>,
15}
16
17impl BlobStorage {
18 pub fn new(blob_store: Arc<dyn ObjectStore>, blob_store_root: Path) -> Self {
20 Self {
21 blob_store: Arc::new(PrefixStore::new(blob_store, blob_store_root)),
22 }
23 }
24
25 pub fn connect(object_store_url: &str) -> Result<Self> {
27 let (blob_store, blob_store_root) = object_store::parse_url_opts(
28 &url::Url::parse(object_store_url)?,
29 std::env::vars().map(|(k, v)| (k.to_lowercase(), v)),
30 )?;
31 Ok(Self {
32 blob_store: Arc::new(PrefixStore::new(blob_store, blob_store_root)),
33 })
34 }
35
36 pub fn inner(&self) -> Arc<dyn ObjectStore> {
38 self.blob_store.clone()
39 }
40
41 pub async fn put(&self, obj_path: &str, buffer: bytes::Bytes) -> Result<()> {
43 self.blob_store
44 .put(&Path::from(obj_path), buffer.into())
45 .await?;
46 Ok(())
47 }
48
49 pub async fn read_blob(&self, obj_path: &str) -> Result<bytes::Bytes> {
51 let get_result = self.blob_store.get(&Path::from(obj_path)).await?;
52 Ok(get_result.bytes().await?)
53 }
54
55 pub async fn delete(&self, obj_path: &str) -> Result<()> {
57 self.blob_store.delete(&Path::from(obj_path)).await?;
58 Ok(())
59 }
60
61 pub async fn delete_batch(&self, objects: &[String]) -> Result<()> {
63 let path_stream = stream::iter(
64 objects
65 .iter()
66 .map(|obj_path| Path::from(obj_path.as_str()))
67 .map(Ok),
68 );
69 let mut stream = self.blob_store.delete_stream(Box::pin(path_stream));
70 while let Some(res) = stream.next().await {
71 if let Err(e) = res {
72 match e {
73 object_store::Error::NotFound { path: _, source: _ } => Ok(()),
74 ref _other_error => Err(e),
75 }?
76 }
77 }
78 Ok(())
79 }
80}