micromegas/servers/
flight_sql_server.rs1use anyhow::Result;
2use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
3use micromegas_analytics::lakehouse::partition_cache::LivePartitionProvider;
4use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
5use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
6use micromegas_analytics::lakehouse::view_factory::{ViewFactory, default_view_factory};
7use micromegas_auth::tower::AuthService;
8use micromegas_auth::types::AuthProvider;
9use micromegas_ingestion::data_lake_connection::DataLakeConnection;
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use arrow_flight::flight_service_server::FlightServiceServer;
18use datafusion::execution::runtime_env::RuntimeEnv;
19use tonic::transport::Server;
20use tower::ServiceBuilder;
21use tower::layer::layer_fn;
22
23use super::connect_info_layer::ConnectedIncoming;
24use super::flight_sql_service_impl::FlightSqlServiceImpl;
25use super::grpc_health_service::GrpcHealthService;
26use super::log_uri_service::LogUriService;
27
28type ViewFactoryFn = Box<
29 dyn FnOnce(
30 Arc<RuntimeEnv>,
31 Arc<DataLakeConnection>,
32 ) -> Pin<Box<dyn Future<Output = Result<ViewFactory>> + Send>>
33 + Send,
34>;
35
36pub struct FlightSqlServer;
56
57impl FlightSqlServer {
58 pub fn builder() -> FlightSqlServerBuilder {
59 FlightSqlServerBuilder::default()
60 }
61}
62
63pub struct FlightSqlServerBuilder {
64 view_factory_fn: Option<ViewFactoryFn>,
65 session_configurator: Option<Arc<dyn SessionConfigurator>>,
66 auth_provider: Option<Arc<dyn AuthProvider>>,
67 use_default_auth: bool,
68 max_decoding_message_size: usize,
69 listen_addr: SocketAddr,
70 shutdown_grace: Duration,
71 injected_lakehouse: Option<Arc<LakehouseContext>>,
72 injected_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
73}
74
75impl Default for FlightSqlServerBuilder {
76 fn default() -> Self {
77 Self {
78 view_factory_fn: None,
79 session_configurator: None,
80 auth_provider: None,
81 use_default_auth: false,
82 max_decoding_message_size: 100 * 1024 * 1024,
83 listen_addr: "0.0.0.0:50051"
84 .parse()
85 .expect("valid default listen address"),
86 shutdown_grace: Duration::from_secs(25),
87 injected_lakehouse: None,
88 injected_shutdown: None,
89 }
90 }
91}
92
93impl FlightSqlServerBuilder {
94 pub fn with_view_factory_fn<F, Fut>(mut self, f: F) -> Self
98 where
99 F: FnOnce(Arc<RuntimeEnv>, Arc<DataLakeConnection>) -> Fut + Send + 'static,
100 Fut: Future<Output = Result<ViewFactory>> + Send + 'static,
101 {
102 self.view_factory_fn = Some(Box::new(move |runtime, lake| Box::pin(f(runtime, lake))));
103 self
104 }
105
106 pub fn with_session_configurator(mut self, cfg: Arc<dyn SessionConfigurator>) -> Self {
111 self.session_configurator = Some(cfg);
112 self
113 }
114
115 pub fn with_auth_provider(mut self, provider: Arc<dyn AuthProvider>) -> Self {
117 self.auth_provider = Some(provider);
118 self.use_default_auth = false;
119 self
120 }
121
122 pub fn with_default_auth(mut self) -> Self {
126 self.use_default_auth = true;
127 self.auth_provider = None;
128 self
129 }
130
131 pub fn with_max_decoding_message_size(mut self, bytes: usize) -> Self {
133 self.max_decoding_message_size = bytes;
134 self
135 }
136
137 pub fn with_listen_addr(mut self, addr: SocketAddr) -> Self {
139 self.listen_addr = addr;
140 self
141 }
142
143 pub fn with_shutdown_grace(mut self, grace: Duration) -> Self {
145 self.shutdown_grace = grace;
146 self
147 }
148
149 pub fn with_lakehouse(mut self, lakehouse: Arc<LakehouseContext>) -> Self {
153 self.injected_lakehouse = Some(lakehouse);
154 self
155 }
156
157 pub fn with_shutdown(mut self, shutdown: impl Future<Output = ()> + Send + 'static) -> Self {
161 self.injected_shutdown = Some(Box::pin(shutdown));
162 self
163 }
164
165 pub async fn build_and_serve(self) -> Result<()> {
169 let lakehouse = if let Some(lh) = self.injected_lakehouse {
171 lh
172 } else {
173 LakehouseContext::from_env().await?
174 };
175 let data_lake = lakehouse.lake().clone();
176 info!(
177 "created lakehouse context with metadata cache: {:?}",
178 lakehouse.metadata_cache()
179 );
180
181 let view_factory = if let Some(factory_fn) = self.view_factory_fn {
182 Arc::new(factory_fn(lakehouse.runtime().clone(), data_lake).await?)
183 } else {
184 Arc::new(default_view_factory(lakehouse.runtime().clone(), data_lake).await?)
185 };
186
187 let partition_provider =
188 Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
189
190 let session_configurator: Arc<dyn SessionConfigurator> =
191 if let Some(cfg) = self.session_configurator {
192 cfg
193 } else {
194 StaticTablesConfigurator::from_env(
195 "MICROMEGAS_STATIC_TABLES_URL",
196 lakehouse.runtime().clone(),
197 )
198 .await?
199 };
200
201 let svc = FlightServiceServer::new(FlightSqlServiceImpl::new(
202 lakehouse,
203 partition_provider,
204 view_factory,
205 session_configurator,
206 ))
207 .max_decoding_message_size(self.max_decoding_message_size);
208
209 let auth_provider: Option<Arc<dyn AuthProvider>> = if let Some(provider) =
210 self.auth_provider
211 {
212 Some(provider)
213 } else if self.use_default_auth {
214 match micromegas_auth::default_provider::provider().await? {
215 Some(provider) => Some(provider),
216 None => {
217 anyhow::bail!(
218 "Authentication required but no auth providers configured. Set MICROMEGAS_API_KEYS or MICROMEGAS_OIDC_CONFIG"
219 );
220 }
221 }
222 } else {
223 info!("Authentication disabled");
224 None
225 };
226
227 let layer = ServiceBuilder::new()
228 .layer(layer_fn(GrpcHealthService::new))
229 .layer(layer_fn(|service| LogUriService { service }))
230 .layer(layer_fn(move |inner| AuthService {
231 inner,
232 auth_provider: auth_provider.clone(),
233 }))
234 .into_inner();
235
236 use super::shutdown::{ShutdownFanout, wait_for_sigterm};
237
238 info!("Listening on {:?}", self.listen_addr);
239 let listener = std::net::TcpListener::bind(self.listen_addr)?;
240 let incoming = ConnectedIncoming::from_std_listener(listener)?;
241
242 let shutdown_future: Pin<Box<dyn Future<Output = ()> + Send + 'static>> = self
244 .injected_shutdown
245 .unwrap_or_else(|| Box::pin(wait_for_sigterm()));
246 let fanout = ShutdownFanout::new(shutdown_future);
247 let grace_secs = self.shutdown_grace.as_secs();
248 let grace = self.shutdown_grace;
249
250 let serve = Server::builder()
251 .layer(layer)
252 .add_service(svc)
253 .serve_with_incoming_shutdown(incoming, fanout.subscribe());
254
255 let deadline = {
256 let d = fanout.subscribe();
257 async move {
258 d.await;
259 tokio::time::sleep(grace).await;
260 }
261 };
262
263 tokio::select! {
264 res = serve => {
265 info!("drain completed");
266 res?;
267 }
268 _ = deadline => {
269 warn!("grace period of {grace_secs}s elapsed with work still in flight");
270 }
271 }
272
273 info!("bye");
274 Ok(())
275 }
276}