micromegas_ingestion/
data_lake_connection.rs

1use anyhow::{Context, Result};
2use micromegas_telemetry::blob_storage::BlobStorage;
3use micromegas_tracing::info;
4use sqlx::PgPool;
5use std::sync::Arc;
6
7/// A connection to the data lake, including a database pool and a blob storage client.
8#[derive(Debug, Clone)]
9pub struct DataLakeConnection {
10    pub db_pool: PgPool,
11    pub blob_storage: Arc<BlobStorage>,
12}
13
14impl DataLakeConnection {
15    pub fn new(db_pool: PgPool, blob_storage: Arc<BlobStorage>) -> Self {
16        Self {
17            db_pool,
18            blob_storage,
19        }
20    }
21}
22
23/// Connects to the data lake.
24pub async fn connect_to_data_lake(
25    db_uri: &str,
26    object_store_url: &str,
27) -> Result<DataLakeConnection> {
28    info!("connecting to blob storage");
29    let blob_storage = Arc::new(
30        BlobStorage::connect(object_store_url).with_context(|| "connecting to blob storage")?,
31    );
32    let pool = sqlx::postgres::PgPoolOptions::new()
33        .connect(db_uri)
34        .await
35        .with_context(|| String::from("Connecting to telemetry database"))?;
36    Ok(DataLakeConnection::new(pool, blob_storage))
37}