micromegas_analytics/lakehouse/
lakehouse_context.rs1use super::file_cache::FileCache;
2use super::metadata_cache::MetadataCache;
3use super::migration::migrate_lakehouse;
4use super::reader_factory::ReaderFactory;
5use super::runtime::make_runtime_env;
6use anyhow::Context;
7use anyhow::Result;
8use datafusion::execution::runtime_env::RuntimeEnv;
9use micromegas_ingestion::data_lake_connection::{DataLakeConnection, connect_to_data_lake};
10use micromegas_tracing::prelude::*;
11use std::sync::Arc;
12
13const DEFAULT_METADATA_CACHE_SIZE_MB: u64 = 50;
15
16const DEFAULT_FILE_CACHE_SIZE_MB: u64 = 200;
18
19const DEFAULT_FILE_CACHE_MAX_FILE_MB: u64 = 10;
21
22#[derive(Clone)]
27pub struct LakehouseContext {
28 lake: Arc<DataLakeConnection>,
29 metadata_cache: Arc<MetadataCache>,
30 file_cache: Arc<FileCache>,
31 runtime: Arc<RuntimeEnv>,
32 reader_factory: Arc<ReaderFactory>,
33}
34
35impl LakehouseContext {
36 pub async fn from_env() -> Result<Arc<Self>> {
40 let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING")
41 .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?;
42 let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI")
43 .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?;
44 let data_lake =
45 Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?);
46 migrate_lakehouse(data_lake.db_pool.clone())
47 .await
48 .with_context(|| "migrate_lakehouse")?;
49 let runtime = Arc::new(make_runtime_env()?);
50 Ok(Arc::new(Self::new(data_lake, runtime)))
51 }
52
53 pub fn new(lake: Arc<DataLakeConnection>, runtime: Arc<RuntimeEnv>) -> Self {
55 let metadata_cache_mb = match std::env::var("MICROMEGAS_METADATA_CACHE_MB") {
56 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
57 warn!(
58 "Invalid MICROMEGAS_METADATA_CACHE_MB value '{s}', using default {DEFAULT_METADATA_CACHE_SIZE_MB} MB"
59 );
60 DEFAULT_METADATA_CACHE_SIZE_MB
61 }),
62 Err(_) => DEFAULT_METADATA_CACHE_SIZE_MB,
63 };
64
65 let file_cache_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MB") {
66 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
67 warn!(
68 "Invalid MICROMEGAS_FILE_CACHE_MB value '{s}', using default {DEFAULT_FILE_CACHE_SIZE_MB} MB"
69 );
70 DEFAULT_FILE_CACHE_SIZE_MB
71 }),
72 Err(_) => DEFAULT_FILE_CACHE_SIZE_MB,
73 };
74
75 let file_cache_max_file_mb = match std::env::var("MICROMEGAS_FILE_CACHE_MAX_FILE_MB") {
76 Ok(s) => s.parse::<u64>().unwrap_or_else(|_| {
77 warn!(
78 "Invalid MICROMEGAS_FILE_CACHE_MAX_FILE_MB value '{s}', using default {DEFAULT_FILE_CACHE_MAX_FILE_MB} MB"
79 );
80 DEFAULT_FILE_CACHE_MAX_FILE_MB
81 }),
82 Err(_) => DEFAULT_FILE_CACHE_MAX_FILE_MB,
83 };
84
85 let metadata_cache = Arc::new(MetadataCache::new(metadata_cache_mb * 1024 * 1024));
86 let file_cache = Arc::new(FileCache::new(
87 file_cache_mb * 1024 * 1024,
88 file_cache_max_file_mb * 1024 * 1024,
89 ));
90
91 let reader_factory = Arc::new(ReaderFactory::new(
92 lake.blob_storage.inner(),
93 lake.db_pool.clone(),
94 metadata_cache.clone(),
95 file_cache.clone(),
96 ));
97 Self {
98 lake,
99 metadata_cache,
100 file_cache,
101 runtime,
102 reader_factory,
103 }
104 }
105
106 pub fn with_caches(
108 lake: Arc<DataLakeConnection>,
109 runtime: Arc<RuntimeEnv>,
110 metadata_cache: Arc<MetadataCache>,
111 file_cache: Arc<FileCache>,
112 ) -> Self {
113 let reader_factory = Arc::new(ReaderFactory::new(
114 lake.blob_storage.inner(),
115 lake.db_pool.clone(),
116 metadata_cache.clone(),
117 file_cache.clone(),
118 ));
119 Self {
120 lake,
121 metadata_cache,
122 file_cache,
123 runtime,
124 reader_factory,
125 }
126 }
127
128 pub fn lake(&self) -> &Arc<DataLakeConnection> {
130 &self.lake
131 }
132
133 pub fn metadata_cache(&self) -> &Arc<MetadataCache> {
135 &self.metadata_cache
136 }
137
138 pub fn file_cache(&self) -> &Arc<FileCache> {
140 &self.file_cache
141 }
142
143 pub fn runtime(&self) -> &Arc<RuntimeEnv> {
145 &self.runtime
146 }
147
148 pub fn reader_factory(&self) -> &Arc<ReaderFactory> {
150 &self.reader_factory
151 }
152}
153
154impl std::fmt::Debug for LakehouseContext {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct("LakehouseContext")
157 .field("metadata_cache", &self.metadata_cache)
158 .field("file_cache", &self.file_cache)
159 .finish()
160 }
161}