micromegas_ingestion/
data_lake_connection.rs1use anyhow::{Context, Result};
2use micromegas_telemetry::blob_storage::BlobStorage;
3use micromegas_tracing::info;
4use sqlx::PgPool;
5use std::sync::Arc;
6
7#[derive(Debug, Clone)]
9pub struct DataLakeConnection {
10 pub db_pool: PgPool,
11 pub blob_storage: Arc<BlobStorage>,
12}
13
14impl DataLakeConnection {
15 pub fn new(db_pool: PgPool, blob_storage: Arc<BlobStorage>) -> Self {
16 Self {
17 db_pool,
18 blob_storage,
19 }
20 }
21}
22
23pub async fn connect_to_data_lake(
25 db_uri: &str,
26 object_store_url: &str,
27) -> Result<DataLakeConnection> {
28 info!("connecting to blob storage");
29 let blob_storage = Arc::new(
30 BlobStorage::connect(object_store_url).with_context(|| "connecting to blob storage")?,
31 );
32 let pool = sqlx::postgres::PgPoolOptions::new()
33 .connect(db_uri)
34 .await
35 .with_context(|| String::from("Connecting to telemetry database"))?;
36 Ok(DataLakeConnection::new(pool, blob_storage))
37}