micromegas_analytics/lakehouse/
lakehouse_context.rs

1use super::file_cache::FileCache;
2use super::metadata_cache::MetadataCache;
3use super::migration::migrate_lakehouse;
4use super::reader_factory::ReaderFactory;
5use super::runtime::make_runtime_env;
6use anyhow::Context;
7use anyhow::Result;
8use datafusion::execution::runtime_env::RuntimeEnv;
9use micromegas_ingestion::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13/// Default metadata cache size in MB
14const DEFAULT_METADATA_CACHE_SIZE_MB: u64 = 50;
15
16/// Default file cache size in MB
17const DEFAULT_FILE_CACHE_SIZE_MB: u64 = 200;
18
19/// Default max file size to cache in MB
20const DEFAULT_FILE_CACHE_MAX_FILE_MB: u64 = 10;
21
22/// Bundles all runtime resources needed for lakehouse query execution.
23///
24/// This struct holds the data lake connection, metadata cache, file cache, and DataFusion runtime,
25/// providing a single context object that can be passed through the query path.
26#[derive(Clone)]
27pub struct LakehouseContext {
28    lake: Arc<DataLakeConnection>,
29    metadata_cache: Arc<MetadataCache>,
30    file_cache: Arc<FileCache>,
31    runtime: Arc<RuntimeEnv>,
32    reader_factory: Arc<ReaderFactory>,
33}
34
35impl LakehouseContext {
36    /// Reads MICROMEGAS_SQL_CONNECTION_STRING and MICROMEGAS_OBJECT_STORE_URI,
37    /// connects to the data lake, runs lakehouse migrations, and creates the
38    /// runtime environment.
39    pub async fn from_env() -> Result<Arc<Self>> {
40        let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
41            .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
42        let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
43            .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
44        let data_lake =
45            Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?);
46        migrate_lakehouse(data_lake.db_pool.clone())
47            .await
48            .with_context(|| "migrate_lakehouse")?;
49        let runtime = Arc::new(make_runtime_env()?);
50        Ok(Arc::new(Self::new(data_lake, runtime)))
51    }
52
53    /// Creates a new lakehouse context with default-sized metadata and file caches.
54    pub fn new(lake: Arc<DataLakeConnection>, runtime: Arc<RuntimeEnv>) -> Self {
55        let metadata_cache_mb = match std::env::var("MICROMEGAS_METADATA_CACHE_MB") {
56            Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
57                warn!(
58                    "Invalid MICROMEGAS_METADATA_CACHE_MB value '{s}', using default {DEFAULT_METADATA_CACHE_SIZE_MB} MB"
59                );
60                DEFAULT_METADATA_CACHE_SIZE_MB
61            }),
62            Err(_) => DEFAULT_METADATA_CACHE_SIZE_MB,
63        };
64
65        let file_cache_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MB") {
66            Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
67                warn!(
68                    "Invalid MICROMEGAS_FILE_CACHE_MB value '{s}', using default {DEFAULT_FILE_CACHE_SIZE_MB} MB"
69                );
70                DEFAULT_FILE_CACHE_SIZE_MB
71            }),
72            Err(_) => DEFAULT_FILE_CACHE_SIZE_MB,
73        };
74
75        let file_cache_max_file_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MAX_FILE_MB") {
76            Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
77                warn!(
78                    "Invalid MICROMEGAS_FILE_CACHE_MAX_FILE_MB value '{s}', using default {DEFAULT_FILE_CACHE_MAX_FILE_MB} MB"
79                );
80                DEFAULT_FILE_CACHE_MAX_FILE_MB
81            }),
82            Err(_) => DEFAULT_FILE_CACHE_MAX_FILE_MB,
83        };
84
85        let metadata_cache = Arc::new(MetadataCache::new(metadata_cache_mb * 1024 * 1024));
86        let file_cache = Arc::new(FileCache::new(
87            file_cache_mb * 1024 * 1024,
88            file_cache_max_file_mb * 1024 * 1024,
89        ));
90
91        let reader_factory = Arc::new(ReaderFactory::new(
92            lake.blob_storage.inner(),
93            lake.db_pool.clone(),
94            metadata_cache.clone(),
95            file_cache.clone(),
96        ));
97        Self {
98            lake,
99            metadata_cache,
100            file_cache,
101            runtime,
102            reader_factory,
103        }
104    }
105
106    /// Creates a new lakehouse context with custom metadata and file caches.
107    pub fn with_caches(
108        lake: Arc<DataLakeConnection>,
109        runtime: Arc<RuntimeEnv>,
110        metadata_cache: Arc<MetadataCache>,
111        file_cache: Arc<FileCache>,
112    ) -> Self {
113        let reader_factory = Arc::new(ReaderFactory::new(
114            lake.blob_storage.inner(),
115            lake.db_pool.clone(),
116            metadata_cache.clone(),
117            file_cache.clone(),
118        ));
119        Self {
120            lake,
121            metadata_cache,
122            file_cache,
123            runtime,
124            reader_factory,
125        }
126    }
127
128    /// Returns the data lake connection.
129    pub fn lake(&self) -> &Arc<DataLakeConnection> {
130        &self.lake
131    }
132
133    /// Returns the metadata cache.
134    pub fn metadata_cache(&self) -> &Arc<MetadataCache> {
135        &self.metadata_cache
136    }
137
138    /// Returns the file cache.
139    pub fn file_cache(&self) -> &Arc<FileCache> {
140        &self.file_cache
141    }
142
143    /// Returns the DataFusion runtime environment.
144    pub fn runtime(&self) -> &Arc<RuntimeEnv> {
145        &self.runtime
146    }
147
148    /// Returns the shared `ReaderFactory`.
149    pub fn reader_factory(&self) -> &Arc<ReaderFactory> {
150        &self.reader_factory
151    }
152}
153
154impl std::fmt::Debug for LakehouseContext {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("LakehouseContext")
157            .field("metadata_cache", &self.metadata_cache)
158            .field("file_cache", &self.file_cache)
159            .finish()
160    }
161}