micromegas_analytics/lakehouse/
lakehouse_context.rs1use super::file_cache::FileCache;
2use super::metadata_cache::MetadataCache;
3use super::migration::migrate_lakehouse;
4use super::reader_factory::ReaderFactory;
5use super::runtime::make_runtime_env;
6use anyhow::Context;
7use anyhow::Result;
8use datafusion::execution::runtime_env::RuntimeEnv;
9use micromegas_ingestion::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13const DEFAULT_METADATA_CACHE_SIZE_MB: u64 = 50;
15
16const DEFAULT_FILE_CACHE_SIZE_MB: u64 = 200;
18
19const DEFAULT_FILE_CACHE_MAX_FILE_MB: u64 = 10;
21
22#[derive(Clone)]
27pub struct LakehouseContext {
28 lake: Arc<DataLakeConnection>,
29 metadata_cache: Arc<MetadataCache>,
30 file_cache: Arc<FileCache>,
31 runtime: Arc<RuntimeEnv>,
32 reader_factory: Arc<ReaderFactory>,
33}
34
35impl LakehouseContext {
36 pub async fn from_connection(lake: Arc<DataLakeConnection>) -> Result<Arc<Self>> {
43 migrate_lakehouse(lake.db_pool.clone())
44 .await
45 .with_context(|| "migrate_lakehouse")?;
46 let runtime = Arc::new(make_runtime_env()?);
47 Ok(Arc::new(Self::new(lake, runtime)))
48 }
49
50 pub async fn from_env() -> Result<Arc<Self>> {
54 let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
55 .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
56 let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
57 .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
58 let data_lake =
59 Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?);
60 migrate_lakehouse(data_lake.db_pool.clone())
61 .await
62 .with_context(|| "migrate_lakehouse")?;
63 let runtime = Arc::new(make_runtime_env()?);
64 Ok(Arc::new(Self::new(data_lake, runtime)))
65 }
66
67 pub fn new(lake: Arc<DataLakeConnection>, runtime: Arc<RuntimeEnv>) -> Self {
69 let metadata_cache_mb = match std::env::var("MICROMEGAS_METADATA_CACHE_MB") {
70 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
71 warn!(
72 "Invalid MICROMEGAS_METADATA_CACHE_MB value '{s}', using default {DEFAULT_METADATA_CACHE_SIZE_MB} MB"
73 );
74 DEFAULT_METADATA_CACHE_SIZE_MB
75 }),
76 Err(_) => DEFAULT_METADATA_CACHE_SIZE_MB,
77 };
78
79 let file_cache_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MB") {
80 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
81 warn!(
82 "Invalid MICROMEGAS_FILE_CACHE_MB value '{s}', using default {DEFAULT_FILE_CACHE_SIZE_MB} MB"
83 );
84 DEFAULT_FILE_CACHE_SIZE_MB
85 }),
86 Err(_) => DEFAULT_FILE_CACHE_SIZE_MB,
87 };
88
89 let file_cache_max_file_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MAX_FILE_MB") {
90 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
91 warn!(
92 "Invalid MICROMEGAS_FILE_CACHE_MAX_FILE_MB value '{s}', using default {DEFAULT_FILE_CACHE_MAX_FILE_MB} MB"
93 );
94 DEFAULT_FILE_CACHE_MAX_FILE_MB
95 }),
96 Err(_) => DEFAULT_FILE_CACHE_MAX_FILE_MB,
97 };
98
99 let metadata_cache = Arc::new(MetadataCache::new(metadata_cache_mb * 1024 * 1024));
100 let file_cache = Arc::new(FileCache::new(
101 file_cache_mb * 1024 * 1024,
102 file_cache_max_file_mb * 1024 * 1024,
103 ));
104
105 let reader_factory = Arc::new(ReaderFactory::new(
106 lake.blob_storage.inner(),
107 lake.db_pool.clone(),
108 metadata_cache.clone(),
109 file_cache.clone(),
110 ));
111 Self {
112 lake,
113 metadata_cache,
114 file_cache,
115 runtime,
116 reader_factory,
117 }
118 }
119
120 pub fn with_caches(
122 lake: Arc<DataLakeConnection>,
123 runtime: Arc<RuntimeEnv>,
124 metadata_cache: Arc<MetadataCache>,
125 file_cache: Arc<FileCache>,
126 ) -> Self {
127 let reader_factory = Arc::new(ReaderFactory::new(
128 lake.blob_storage.inner(),
129 lake.db_pool.clone(),
130 metadata_cache.clone(),
131 file_cache.clone(),
132 ));
133 Self {
134 lake,
135 metadata_cache,
136 file_cache,
137 runtime,
138 reader_factory,
139 }
140 }
141
142 pub fn lake(&self) -> &Arc<DataLakeConnection> {
144 &self.lake
145 }
146
147 pub fn metadata_cache(&self) -> &Arc<MetadataCache> {
149 &self.metadata_cache
150 }
151
152 pub fn file_cache(&self) -> &Arc<FileCache> {
154 &self.file_cache
155 }
156
157 pub fn runtime(&self) -> &Arc<RuntimeEnv> {
159 &self.runtime
160 }
161
162 pub fn reader_factory(&self) -> &Arc<ReaderFactory> {
164 &self.reader_factory
165 }
166}
167
168impl std::fmt::Debug for LakehouseContext {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("LakehouseContext")
171 .field("metadata_cache", &self.metadata_cache)
172 .field("file_cache", &self.file_cache)
173 .finish()
174 }
175}