micromegas/servers/
flight_sql_server.rs1use anyhow::Result;
2use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
3use micromegas_analytics::lakehouse::partition_cache::LivePartitionProvider;
4use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
5use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
6use micromegas_analytics::lakehouse::view_factory::{ViewFactory, default_view_factory};
7use micromegas_auth::tower::AuthService;
8use micromegas_auth::types::AuthProvider;
9use micromegas_ingestion::data_lake_connection::DataLakeConnection;
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use arrow_flight::flight_service_server::FlightServiceServer;
17use datafusion::execution::runtime_env::RuntimeEnv;
18use tonic::transport::Server;
19use tower::ServiceBuilder;
20use tower::layer::layer_fn;
21
22use super::connect_info_layer::ConnectedIncoming;
23use super::flight_sql_service_impl::FlightSqlServiceImpl;
24use super::grpc_health_service::GrpcHealthService;
25use super::log_uri_service::LogUriService;
26
27type ViewFactoryFn = Box<
28 dyn FnOnce(
29 Arc<RuntimeEnv>,
30 Arc<DataLakeConnection>,
31 ) -> Pin<Box<dyn Future<Output = Result<ViewFactory>> + Send>>
32 + Send,
33>;
34
35pub struct FlightSqlServer;
55
56impl FlightSqlServer {
57 pub fn builder() -> FlightSqlServerBuilder {
58 FlightSqlServerBuilder::default()
59 }
60}
61
62pub struct FlightSqlServerBuilder {
63 view_factory_fn: Option<ViewFactoryFn>,
64 session_configurator: Option<Arc<dyn SessionConfigurator>>,
65 auth_provider: Option<Arc<dyn AuthProvider>>,
66 use_default_auth: bool,
67 max_decoding_message_size: usize,
68 listen_addr: SocketAddr,
69}
70
71impl Default for FlightSqlServerBuilder {
72 fn default() -> Self {
73 Self {
74 view_factory_fn: None,
75 session_configurator: None,
76 auth_provider: None,
77 use_default_auth: false,
78 max_decoding_message_size: 100 * 1024 * 1024,
79 listen_addr: "0.0.0.0:50051"
80 .parse()
81 .expect("valid default listen address"),
82 }
83 }
84}
85
86impl FlightSqlServerBuilder {
87 pub fn with_view_factory_fn<F, Fut>(mut self, f: F) -> Self
91 where
92 F: FnOnce(Arc<RuntimeEnv>, Arc<DataLakeConnection>) -> Fut + Send + 'static,
93 Fut: Future<Output = Result<ViewFactory>> + Send + 'static,
94 {
95 self.view_factory_fn = Some(Box::new(move |runtime, lake| Box::pin(f(runtime, lake))));
96 self
97 }
98
99 pub fn with_session_configurator(mut self, cfg: Arc<dyn SessionConfigurator>) -> Self {
104 self.session_configurator = Some(cfg);
105 self
106 }
107
108 pub fn with_auth_provider(mut self, provider: Arc<dyn AuthProvider>) -> Self {
110 self.auth_provider = Some(provider);
111 self.use_default_auth = false;
112 self
113 }
114
115 pub fn with_default_auth(mut self) -> Self {
119 self.use_default_auth = true;
120 self.auth_provider = None;
121 self
122 }
123
124 pub fn with_max_decoding_message_size(mut self, bytes: usize) -> Self {
126 self.max_decoding_message_size = bytes;
127 self
128 }
129
130 pub fn with_listen_addr(mut self, addr: SocketAddr) -> Self {
132 self.listen_addr = addr;
133 self
134 }
135
136 pub async fn build_and_serve(self) -> Result<()> {
140 let lakehouse = LakehouseContext::from_env().await?;
141 let data_lake = lakehouse.lake().clone();
142 info!(
143 "created lakehouse context with metadata cache: {:?}",
144 lakehouse.metadata_cache()
145 );
146
147 let view_factory = if let Some(factory_fn) = self.view_factory_fn {
148 Arc::new(factory_fn(lakehouse.runtime().clone(), data_lake).await?)
149 } else {
150 Arc::new(default_view_factory(lakehouse.runtime().clone(), data_lake).await?)
151 };
152
153 let partition_provider =
154 Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
155
156 let session_configurator: Arc<dyn SessionConfigurator> =
157 if let Some(cfg) = self.session_configurator {
158 cfg
159 } else {
160 StaticTablesConfigurator::from_env(
161 "MICROMEGAS_STATIC_TABLES_URL",
162 lakehouse.runtime().clone(),
163 )
164 .await?
165 };
166
167 let svc = FlightServiceServer::new(FlightSqlServiceImpl::new(
168 lakehouse,
169 partition_provider,
170 view_factory,
171 session_configurator,
172 ))
173 .max_decoding_message_size(self.max_decoding_message_size);
174
175 let auth_provider: Option<Arc<dyn AuthProvider>> = if let Some(provider) =
176 self.auth_provider
177 {
178 Some(provider)
179 } else if self.use_default_auth {
180 match micromegas_auth::default_provider::provider().await? {
181 Some(provider) => Some(provider),
182 None => {
183 anyhow::bail!(
184 "Authentication required but no auth providers configured. Set MICROMEGAS_API_KEYS or MICROMEGAS_OIDC_CONFIG"
185 );
186 }
187 }
188 } else {
189 info!("Authentication disabled");
190 None
191 };
192
193 let layer = ServiceBuilder::new()
194 .layer(layer_fn(GrpcHealthService::new))
195 .layer(layer_fn(|service| LogUriService { service }))
196 .layer(layer_fn(move |inner| AuthService {
197 inner,
198 auth_provider: auth_provider.clone(),
199 }))
200 .into_inner();
201
202 info!("Listening on {:?}", self.listen_addr);
203 let listener = std::net::TcpListener::bind(self.listen_addr)?;
204 let incoming = ConnectedIncoming::from_std_listener(listener)?;
205
206 Server::builder()
207 .layer(layer)
208 .add_service(svc)
209 .serve_with_incoming(incoming)
210 .await?;
211
212 info!("bye");
213 Ok(())
214 }
215}