micromegas/servers/
flight_sql_server.rs

1use anyhow::Result;
2use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
3use micromegas_analytics::lakehouse::partition_cache::LivePartitionProvider;
4use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
5use micromegas_analytics::lakehouse::static_tables_configurator::StaticTablesConfigurator;
6use micromegas_analytics::lakehouse::view_factory::{ViewFactory, default_view_factory};
7use micromegas_auth::tower::AuthService;
8use micromegas_auth::types::AuthProvider;
9use micromegas_ingestion::data_lake_connection::DataLakeConnection;
10use micromegas_tracing::prelude::*;
11use std::future::Future;
12use std::net::SocketAddr;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use arrow_flight::flight_service_server::FlightServiceServer;
17use datafusion::execution::runtime_env::RuntimeEnv;
18use tonic::transport::Server;
19use tower::ServiceBuilder;
20use tower::layer::layer_fn;
21
22use super::connect_info_layer::ConnectedIncoming;
23use super::flight_sql_service_impl::FlightSqlServiceImpl;
24use super::grpc_health_service::GrpcHealthService;
25use super::log_uri_service::LogUriService;
26
27type ViewFactoryFn = Box<
28    dyn FnOnce(
29            Arc<RuntimeEnv>,
30            Arc<DataLakeConnection>,
31        ) -> Pin<Box<dyn Future<Output = Result<ViewFactory>> + Send>>
32        + Send,
33>;
34
35/// Builder for assembling and running a FlightSQL server.
36///
37/// Encapsulates the full setup sequence: data lake connection, lakehouse migration,
38/// runtime env, view factory, partition provider, session configurator, auth, and
39/// the gRPC tower layer stack.
40///
41/// # Example
42///
43/// ```rust,no_run
44/// use micromegas::servers::flight_sql_server::FlightSqlServer;
45///
46/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
47/// FlightSqlServer::builder()
48///     .with_default_auth()
49///     .build_and_serve()
50///     .await?;
51/// # Ok(())
52/// # }
53/// ```
54pub struct FlightSqlServer;
55
56impl FlightSqlServer {
57    pub fn builder() -> FlightSqlServerBuilder {
58        FlightSqlServerBuilder::default()
59    }
60}
61
62pub struct FlightSqlServerBuilder {
63    view_factory_fn: Option<ViewFactoryFn>,
64    session_configurator: Option<Arc<dyn SessionConfigurator>>,
65    auth_provider: Option<Arc<dyn AuthProvider>>,
66    use_default_auth: bool,
67    max_decoding_message_size: usize,
68    listen_addr: SocketAddr,
69}
70
71impl Default for FlightSqlServerBuilder {
72    fn default() -> Self {
73        Self {
74            view_factory_fn: None,
75            session_configurator: None,
76            auth_provider: None,
77            use_default_auth: false,
78            max_decoding_message_size: 100 * 1024 * 1024,
79            listen_addr: "0.0.0.0:50051"
80                .parse()
81                .expect("valid default listen address"),
82        }
83    }
84}
85
86impl FlightSqlServerBuilder {
87    /// Override the default view factory with a custom closure.
88    ///
89    /// The closure receives the runtime and data lake created by the builder.
90    pub fn with_view_factory_fn<F, Fut>(mut self, f: F) -> Self
91    where
92        F: FnOnce(Arc<RuntimeEnv>, Arc<DataLakeConnection>) -> Fut + Send + 'static,
93        Fut: Future<Output = Result<ViewFactory>> + Send + 'static,
94    {
95        self.view_factory_fn = Some(Box::new(move |runtime, lake| Box::pin(f(runtime, lake))));
96        self
97    }
98
99    /// Override the default session configurator.
100    ///
101    /// By default the builder loads static tables from `MICROMEGAS_STATIC_TABLES_URL`.
102    /// Use this to replace that behavior entirely.
103    pub fn with_session_configurator(mut self, cfg: Arc<dyn SessionConfigurator>) -> Self {
104        self.session_configurator = Some(cfg);
105        self
106    }
107
108    /// Set an explicit auth provider.
109    pub fn with_auth_provider(mut self, provider: Arc<dyn AuthProvider>) -> Self {
110        self.auth_provider = Some(provider);
111        self.use_default_auth = false;
112        self
113    }
114
115    /// Use the default auth provider from env vars during build.
116    ///
117    /// Errors if no auth providers are configured (fail-fast).
118    pub fn with_default_auth(mut self) -> Self {
119        self.use_default_auth = true;
120        self.auth_provider = None;
121        self
122    }
123
124    /// Set the max decoding message size (default: 100 MB).
125    pub fn with_max_decoding_message_size(mut self, bytes: usize) -> Self {
126        self.max_decoding_message_size = bytes;
127        self
128    }
129
130    /// Set the listen address (default: `0.0.0.0:50051`).
131    pub fn with_listen_addr(mut self, addr: SocketAddr) -> Self {
132        self.listen_addr = addr;
133        self
134    }
135
136    /// Build and run the FlightSQL server.
137    ///
138    /// Runs the full setup sequence and blocks until the server shuts down.
139    pub async fn build_and_serve(self) -> Result<()> {
140        let lakehouse = LakehouseContext::from_env().await?;
141        let data_lake = lakehouse.lake().clone();
142        info!(
143            "created lakehouse context with metadata cache: {:?}",
144            lakehouse.metadata_cache()
145        );
146
147        let view_factory = if let Some(factory_fn) = self.view_factory_fn {
148            Arc::new(factory_fn(lakehouse.runtime().clone(), data_lake).await?)
149        } else {
150            Arc::new(default_view_factory(lakehouse.runtime().clone(), data_lake).await?)
151        };
152
153        let partition_provider =
154            Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
155
156        let session_configurator: Arc<dyn SessionConfigurator> =
157            if let Some(cfg) = self.session_configurator {
158                cfg
159            } else {
160                StaticTablesConfigurator::from_env(
161                    "MICROMEGAS_STATIC_TABLES_URL",
162                    lakehouse.runtime().clone(),
163                )
164                .await?
165            };
166
167        let svc = FlightServiceServer::new(FlightSqlServiceImpl::new(
168            lakehouse,
169            partition_provider,
170            view_factory,
171            session_configurator,
172        ))
173        .max_decoding_message_size(self.max_decoding_message_size);
174
175        let auth_provider: Option<Arc<dyn AuthProvider>> = if let Some(provider) =
176            self.auth_provider
177        {
178            Some(provider)
179        } else if self.use_default_auth {
180            match micromegas_auth::default_provider::provider().await? {
181                Some(provider) => Some(provider),
182                None => {
183                    anyhow::bail!(
184                        "Authentication required but no auth providers configured. Set MICROMEGAS_API_KEYS or MICROMEGAS_OIDC_CONFIG"
185                    );
186                }
187            }
188        } else {
189            info!("Authentication disabled");
190            None
191        };
192
193        let layer = ServiceBuilder::new()
194            .layer(layer_fn(GrpcHealthService::new))
195            .layer(layer_fn(|service| LogUriService { service }))
196            .layer(layer_fn(move |inner| AuthService {
197                inner,
198                auth_provider: auth_provider.clone(),
199            }))
200            .into_inner();
201
202        info!("Listening on {:?}", self.listen_addr);
203        let listener = std::net::TcpListener::bind(self.listen_addr)?;
204        let incoming = ConnectedIncoming::from_std_listener(listener)?;
205
206        Server::builder()
207            .layer(layer)
208            .add_service(svc)
209            .serve_with_incoming(incoming)
210            .await?;
211
212        info!("bye");
213        Ok(())
214    }
215}