micromegas_telemetry/
blob_storage.rs

1use anyhow::Result;
2use futures::StreamExt;
3use futures::stream;
4use object_store::prefix::PrefixStore;
5use object_store::{ObjectStore, path::Path};
6use std::sync::Arc;
7
8/// A client for interacting with blob storage.
9///
10/// This struct wraps an `ObjectStore` and prefixes all paths with a root path,
11/// providing a convenient way to interact with a specific "folder" within the blob storage.
12#[derive(Debug)]
13pub struct BlobStorage {
14    blob_store: Arc<dyn ObjectStore>,
15}
16
17impl BlobStorage {
18    /// Creates a new `BlobStorage` instance.
19    pub fn new(blob_store: Arc<dyn ObjectStore>, blob_store_root: Path) -> Self {
20        Self {
21            blob_store: Arc::new(PrefixStore::new(blob_store, blob_store_root)),
22        }
23    }
24
25    /// Connects to a blob storage service using the provided URL.
26    pub fn connect(object_store_url: &str) -> Result<Self> {
27        let (blob_store, blob_store_root) = object_store::parse_url_opts(
28            &url::Url::parse(object_store_url)?,
29            std::env::vars().map(|(k, v)| (k.to_lowercase(), v)),
30        )?;
31        Ok(Self {
32            blob_store: Arc::new(PrefixStore::new(blob_store, blob_store_root)),
33        })
34    }
35
36    /// Returns a shared reference to the inner `ObjectStore`.
37    pub fn inner(&self) -> Arc<dyn ObjectStore> {
38        self.blob_store.clone()
39    }
40
41    /// Puts a blob into storage at the specified path.
42    pub async fn put(&self, obj_path: &str, buffer: bytes::Bytes) -> Result<()> {
43        self.blob_store
44            .put(&Path::from(obj_path), buffer.into())
45            .await?;
46        Ok(())
47    }
48
49    /// Reads a blob from storage at the specified path.
50    pub async fn read_blob(&self, obj_path: &str) -> Result<bytes::Bytes> {
51        let get_result = self.blob_store.get(&Path::from(obj_path)).await?;
52        Ok(get_result.bytes().await?)
53    }
54
55    /// Deletes a blob from storage at the specified path.
56    pub async fn delete(&self, obj_path: &str) -> Result<()> {
57        self.blob_store.delete(&Path::from(obj_path)).await?;
58        Ok(())
59    }
60
61    /// Deletes a batch of blobs from storage.
62    pub async fn delete_batch(&self, objects: &[String]) -> Result<()> {
63        let path_stream = stream::iter(
64            objects
65                .iter()
66                .map(|obj_path| Path::from(obj_path.as_str()))
67                .map(Ok),
68        );
69        let mut stream = self.blob_store.delete_stream(Box::pin(path_stream));
70        while let Some(res) = stream.next().await {
71            if let Err(e) = res {
72                match e {
73                    object_store::Error::NotFound { path: _, source: _ } => Ok(()),
74                    ref _other_error => Err(e),
75                }?
76            }
77        }
78        Ok(())
79    }
80}