micromegas/servers/
flight_sql_server.rs

1use anyhow::Result;
2use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
3use micromegas_analytics::lakehouse::partition_cache::LivePartitionProvider;
4use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
5use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
6use micromegas_analytics::lakehouse::view_factory::{ViewFactory, default_view_factory};
7use micromegas_auth::tower::AuthService;
8use micromegas_auth::types::AuthProvider;
9use micromegas_ingestion::data_lake_connection::DataLakeConnection;
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use arrow_flight::flight_service_server::FlightServiceServer;
18use datafusion::execution::runtime_env::RuntimeEnv;
19use tonic::transport::Server;
20use tower::ServiceBuilder;
21use tower::layer::layer_fn;
22
23use super::connect_info_layer::ConnectedIncoming;
24use super::flight_sql_service_impl::FlightSqlServiceImpl;
25use super::grpc_health_service::GrpcHealthService;
26use super::log_uri_service::LogUriService;
27
28type ViewFactoryFn = Box<
29    dyn FnOnce(
30            Arc<RuntimeEnv>,
31            Arc<DataLakeConnection>,
32        ) -> Pin<Box<dyn Future<Output = Result<ViewFactory>> + Send>>
33        + Send,
34>;
35
36/// Builder for assembling and running a FlightSQL server.
37///
38/// Encapsulates the full setup sequence: data lake connection, lakehouse migration,
39/// runtime env, view factory, partition provider, session configurator, auth, and
40/// the gRPC tower layer stack.
41///
42/// # Example
43///
44/// ```rust,no_run
45/// use micromegas::servers::flight_sql_server::FlightSqlServer;
46///
47/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
48/// FlightSqlServer::builder()
49///     .with_default_auth()
50///     .build_and_serve()
51///     .await?;
52/// # Ok(())
53/// # }
54/// ```
55pub struct FlightSqlServer;
56
57impl FlightSqlServer {
58    pub fn builder() -> FlightSqlServerBuilder {
59        FlightSqlServerBuilder::default()
60    }
61}
62
63pub struct FlightSqlServerBuilder {
64    view_factory_fn: Option<ViewFactoryFn>,
65    session_configurator: Option<Arc<dyn SessionConfigurator>>,
66    auth_provider: Option<Arc<dyn AuthProvider>>,
67    use_default_auth: bool,
68    max_decoding_message_size: usize,
69    listen_addr: SocketAddr,
70    shutdown_grace: Duration,
71    injected_lakehouse: Option<Arc<LakehouseContext>>,
72    injected_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
73}
74
75impl Default for FlightSqlServerBuilder {
76    fn default() -> Self {
77        Self {
78            view_factory_fn: None,
79            session_configurator: None,
80            auth_provider: None,
81            use_default_auth: false,
82            max_decoding_message_size: 100 * 1024 * 1024,
83            listen_addr: "0.0.0.0:50051"
84                .parse()
85                .expect("valid default listen address"),
86            shutdown_grace: Duration::from_secs(25),
87            injected_lakehouse: None,
88            injected_shutdown: None,
89        }
90    }
91}
92
93impl FlightSqlServerBuilder {
94    /// Override the default view factory with a custom closure.
95    ///
96    /// The closure receives the runtime and data lake created by the builder.
97    pub fn with_view_factory_fn<F, Fut>(mut self, f: F) -> Self
98    where
99        F: FnOnce(Arc<RuntimeEnv>, Arc<DataLakeConnection>) -> Fut + Send + 'static,
100        Fut: Future<Output = Result<ViewFactory>> + Send + 'static,
101    {
102        self.view_factory_fn = Some(Box::new(move |runtime, lake| Box::pin(f(runtime, lake))));
103        self
104    }
105
106    /// Override the default session configurator.
107    ///
108    /// By default the builder loads static tables from `MICROMEGAS_STATIC_TABLES_URL`.
109    /// Use this to replace that behavior entirely.
110    pub fn with_session_configurator(mut self, cfg: Arc<dyn SessionConfigurator>) -> Self {
111        self.session_configurator = Some(cfg);
112        self
113    }
114
115    /// Set an explicit auth provider.
116    pub fn with_auth_provider(mut self, provider: Arc<dyn AuthProvider>) -> Self {
117        self.auth_provider = Some(provider);
118        self.use_default_auth = false;
119        self
120    }
121
122    /// Use the default auth provider from env vars during build.
123    ///
124    /// Errors if no auth providers are configured (fail-fast).
125    pub fn with_default_auth(mut self) -> Self {
126        self.use_default_auth = true;
127        self.auth_provider = None;
128        self
129    }
130
131    /// Set the max decoding message size (default: 100 MB).
132    pub fn with_max_decoding_message_size(mut self, bytes: usize) -> Self {
133        self.max_decoding_message_size = bytes;
134        self
135    }
136
137    /// Set the listen address (default: `0.0.0.0:50051`).
138    pub fn with_listen_addr(mut self, addr: SocketAddr) -> Self {
139        self.listen_addr = addr;
140        self
141    }
142
143    /// Set the grace period for graceful shutdown on SIGTERM (default: 25s).
144    pub fn with_shutdown_grace(mut self, grace: Duration) -> Self {
145        self.shutdown_grace = grace;
146        self
147    }
148
149    /// Inject a pre-built `LakehouseContext` instead of calling `LakehouseContext::from_env`.
150    ///
151    /// Useful for the monolith, which constructs one shared context for all lake-backed roles.
152    pub fn with_lakehouse(mut self, lakehouse: Arc<LakehouseContext>) -> Self {
153        self.injected_lakehouse = Some(lakehouse);
154        self
155    }
156
157    /// Inject a custom shutdown future instead of the default `wait_for_sigterm()`.
158    ///
159    /// The monolith passes `fanout.subscribe()` here so all roles shut down from one signal.
160    pub fn with_shutdown(mut self, shutdown: impl Future<Output = ()> + Send + 'static) -> Self {
161        self.injected_shutdown = Some(Box::pin(shutdown));
162        self
163    }
164
165    /// Build and run the FlightSQL server.
166    ///
167    /// Runs the full setup sequence and blocks until the server shuts down.
168    pub async fn build_and_serve(self) -> Result<()> {
169        // Use injected lakehouse or build one from environment
170        let lakehouse = if let Some(lh) = self.injected_lakehouse {
171            lh
172        } else {
173            LakehouseContext::from_env().await?
174        };
175        let data_lake = lakehouse.lake().clone();
176        info!(
177            "created lakehouse context with metadata cache: {:?}",
178            lakehouse.metadata_cache()
179        );
180
181        let view_factory = if let Some(factory_fn) = self.view_factory_fn {
182            Arc::new(factory_fn(lakehouse.runtime().clone(), data_lake).await?)
183        } else {
184            Arc::new(default_view_factory(lakehouse.runtime().clone(), data_lake).await?)
185        };
186
187        let partition_provider =
188            Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
189
190        let session_configurator: Arc<dyn SessionConfigurator> =
191            if let Some(cfg) = self.session_configurator {
192                cfg
193            } else {
194                StaticTablesConfigurator::from_env(
195                    "MICROMEGAS_STATIC_TABLES_URL",
196                    lakehouse.runtime().clone(),
197                )
198                .await?
199            };
200
201        let svc = FlightServiceServer::new(FlightSqlServiceImpl::new(
202            lakehouse,
203            partition_provider,
204            view_factory,
205            session_configurator,
206        ))
207        .max_decoding_message_size(self.max_decoding_message_size);
208
209        let auth_provider: Option<Arc<dyn AuthProvider>> = if let Some(provider) =
210            self.auth_provider
211        {
212            Some(provider)
213        } else if self.use_default_auth {
214            match micromegas_auth::default_provider::provider().await? {
215                Some(provider) => Some(provider),
216                None => {
217                    anyhow::bail!(
218                        "Authentication required but no auth providers configured. Set MICROMEGAS_API_KEYS or MICROMEGAS_OIDC_CONFIG"
219                    );
220                }
221            }
222        } else {
223            info!("Authentication disabled");
224            None
225        };
226
227        let layer = ServiceBuilder::new()
228            .layer(layer_fn(GrpcHealthService::new))
229            .layer(layer_fn(|service| LogUriService { service }))
230            .layer(layer_fn(move |inner| AuthService {
231                inner,
232                auth_provider: auth_provider.clone(),
233            }))
234            .into_inner();
235
236        use super::shutdown::{ShutdownFanout, wait_for_sigterm};
237
238        info!("Listening on {:?}", self.listen_addr);
239        let listener = std::net::TcpListener::bind(self.listen_addr)?;
240        let incoming = ConnectedIncoming::from_std_listener(listener)?;
241
242        // Use injected shutdown future or default to SIGTERM
243        let shutdown_future: Pin<Box<dyn Future<Output = ()> + Send + 'static>> = self
244            .injected_shutdown
245            .unwrap_or_else(|| Box::pin(wait_for_sigterm()));
246        let fanout = ShutdownFanout::new(shutdown_future);
247        let grace_secs = self.shutdown_grace.as_secs();
248        let grace = self.shutdown_grace;
249
250        let serve = Server::builder()
251            .layer(layer)
252            .add_service(svc)
253            .serve_with_incoming_shutdown(incoming, fanout.subscribe());
254
255        let deadline = {
256            let d = fanout.subscribe();
257            async move {
258                d.await;
259                tokio::time::sleep(grace).await;
260            }
261        };
262
263        tokio::select! {
264            res = serve => {
265                info!("drain completed");
266                res?;
267            }
268            _ = deadline => {
269                warn!("grace period of {grace_secs}s elapsed with work still in flight");
270            }
271        }
272
273        info!("bye");
274        Ok(())
275    }
276}