micromegas/servers/
flight_sql_service_impl.rs

1use super::sqlinfo::{
2    SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS,
3    SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS,
4};
5use anyhow::Result;
6use arrow_flight::decode::FlightRecordBatchStream;
7use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
8use arrow_flight::error::FlightError;
9use arrow_flight::sql::DoPutPreparedStatementResult;
10use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
11use arrow_flight::sql::server::PeekableFlightDataStream;
12use arrow_flight::sql::{
13    ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest,
14    ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
15    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
16    ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
17    ActionEndSavepointRequest, ActionEndTransactionRequest, Any, CommandGetCatalogs,
18    CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
19    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
20    CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
21    CommandStatementIngest, CommandStatementQuery, CommandStatementSubstraitPlan,
22    CommandStatementUpdate, ProstMessageExt, SqlInfo, TicketStatementQuery,
23    server::FlightSqlService,
24};
25use arrow_flight::{
26    Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
27    Ticket, flight_service_server::FlightService,
28};
29use core::str;
30use datafusion::arrow::datatypes::Schema;
31use datafusion::arrow::ipc::writer::StreamWriter;
32use futures::StreamExt;
33use futures::{Stream, TryStreamExt};
34use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
35use micromegas_analytics::lakehouse::partition_cache::QueryPartitionProvider;
36use micromegas_analytics::lakehouse::query::make_session_context;
37use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
38use micromegas_analytics::lakehouse::view_factory::ViewFactory;
39use micromegas_analytics::replication::bulk_ingest;
40use micromegas_analytics::time::TimeRange;
41use micromegas_auth::user_attribution::validate_and_resolve_user_attribution_grpc;
42use micromegas_tracing::prelude::*;
43use once_cell::sync::Lazy;
44use prost::Message;
45use std::pin::Pin;
46use std::str::FromStr;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use tonic::metadata::MetadataMap;
50use tonic::{Request, Response, Status, Streaming};
51
52type FlightDataStream =
53    Pin<Box<dyn Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>;
54
55macro_rules! status {
56    ($desc:expr, $err:expr) => {
57        Status::internal(format!("{}: {} at {}:{}", $desc, $err, file!(), line!()))
58    };
59}
60
61macro_rules! api_entry_not_implemented {
62    () => {{
63        let function_name = micromegas_tracing::__function_name!();
64        error!("not implemented: {function_name}");
65        Err(Status::unimplemented(format!(
66            "{}:{} not implemented: {function_name}",
67            file!(),
68            line!()
69        )))
70    }};
71}
72
73/// Stream wrapper that tracks when the stream is fully consumed
74struct CompletionTrackedStream<S> {
75    inner: S,
76    start_time: i64,
77    completed: bool,
78}
79
80impl<S> CompletionTrackedStream<S> {
81    fn new(inner: S, start_time: i64) -> Self {
82        Self {
83            inner,
84            start_time,
85            completed: false,
86        }
87    }
88}
89
90impl<S> Stream for CompletionTrackedStream<S>
91where
92    S: Stream<Item = Result<arrow_flight::FlightData, Status>> + Unpin + Send,
93{
94    type Item = Result<arrow_flight::FlightData, Status>;
95
96    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97        match Pin::new(&mut self.inner).poll_next(cx) {
98            Poll::Ready(Some(result)) => {
99                // Check if this is an error result and log it
100                if let Err(ref err) = result {
101                    error!("stream error occurred: {err:?}");
102                    if !self.completed {
103                        let total_duration = now() - self.start_time;
104                        imetric!("query_duration_with_error", "ticks", total_duration as u64);
105                        imetric!("query_failed", "count", 1);
106                        self.completed = true;
107                    }
108                }
109                Poll::Ready(Some(result))
110            }
111            Poll::Ready(None) => {
112                // Stream completed successfully
113                if !self.completed {
114                    let total_duration = now() - self.start_time;
115                    imetric!("query_duration_total", "ticks", total_duration as u64);
116                    imetric!("query_completed_successfully", "count", 1);
117                    self.completed = true;
118                }
119                Poll::Ready(None)
120            }
121            Poll::Pending => Poll::Pending,
122        }
123    }
124}
125
126static INSTANCE_SQL_DATA: Lazy<SqlInfoData> = Lazy::new(|| {
127    let mut builder = SqlInfoDataBuilder::new();
128    // Server information
129    builder.append(SqlInfo::FlightSqlServerName, "Micromegas Flight SQL Server");
130    builder.append(SqlInfo::FlightSqlServerVersion, "1");
131    // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
132    builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
133    builder.append(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS);
134    builder.append(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS);
135    builder.append(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS);
136    builder.append(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS);
137    builder.append(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS);
138    builder.build().unwrap()
139});
140
141/// Implementation of the Flight SQL service.
142#[derive(Clone)]
143pub struct FlightSqlServiceImpl {
144    lakehouse: Arc<LakehouseContext>,
145    part_provider: Arc<dyn QueryPartitionProvider>,
146    view_factory: Arc<ViewFactory>,
147    session_configurator: Arc<dyn SessionConfigurator>,
148}
149
150impl FlightSqlServiceImpl {
151    pub fn new(
152        lakehouse: Arc<LakehouseContext>,
153        part_provider: Arc<dyn QueryPartitionProvider>,
154        view_factory: Arc<ViewFactory>,
155        session_configurator: Arc<dyn SessionConfigurator>,
156    ) -> Self {
157        Self {
158            lakehouse,
159            part_provider,
160            view_factory,
161            session_configurator,
162        }
163    }
164
165    fn should_preserve_dictionary(metadata: &MetadataMap) -> bool {
166        metadata
167            .get("preserve_dictionary")
168            .and_then(|v| v.to_str().ok())
169            .map(|s| s.eq_ignore_ascii_case("true"))
170            .unwrap_or(false)
171    }
172
173    #[span_fn]
174    async fn execute_query(
175        &self,
176        ticket_stmt: TicketStatementQuery,
177        metadata: &MetadataMap,
178    ) -> Result<Response<FlightDataStream>, Status> {
179        let begin_request = now();
180        let sql = std::str::from_utf8(&ticket_stmt.statement_handle)
181            .map_err(|e| status!("Unable to parse query", e))?;
182
183        let mut begin = metadata.get("query_range_begin");
184        if let Some(s) = &begin
185            && s.is_empty()
186        {
187            begin = None;
188        }
189        let mut end = metadata.get("query_range_end");
190        if let Some(s) = &end
191            && s.is_empty()
192        {
193            end = None;
194        }
195        let query_range = if begin.is_some() && end.is_some() {
196            let begin_datetime = chrono::DateTime::parse_from_rfc3339(
197                begin
198                    .unwrap()
199                    .to_str()
200                    .map_err(|e| status!("Unable to convert query_range_begin to string", e))?,
201            )
202            .map_err(|e| status!("Unable to parse query_range_begin as a rfc3339 datetime", e))?;
203            let end_datetime = chrono::DateTime::parse_from_rfc3339(
204                end.unwrap()
205                    .to_str()
206                    .map_err(|e| status!("Unable to convert query_range_end to string", e))?,
207            )
208            .map_err(|e| status!("Unable to parse query_range_end as a rfc3339 datetime", e))?;
209            Some(TimeRange::new(begin_datetime.into(), end_datetime.into()))
210        } else {
211            None
212        };
213
214        // Validate and resolve user attribution
215        let attr = validate_and_resolve_user_attribution_grpc(metadata).map_err(|e| *e)?;
216
217        let client_type = metadata
218            .get("x-client-type")
219            .and_then(|v| v.to_str().ok())
220            .unwrap_or("unknown");
221
222        let user_name_display = attr.user_name.as_deref().unwrap_or("");
223
224        // Log query with full attribution
225        if let Some(service_account_name) = &attr.service_account {
226            info!(
227                "execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} service_account={service_account_name} client={client_type}",
228                metadata.get("limit"),
229                attr.user_id,
230                attr.user_email
231            );
232        } else {
233            info!(
234                "execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} client={client_type}",
235                metadata.get("limit"),
236                attr.user_id,
237                attr.user_email
238            );
239        }
240
241        // Session context creation phase
242        let session_begin = now();
243        let ctx = make_session_context(
244            self.lakehouse.clone(),
245            self.part_provider.clone(),
246            query_range,
247            self.view_factory.clone(),
248            self.session_configurator.clone(),
249        )
250        .await
251        .map_err(|e| status!("error in make_session_context", e))?;
252        let context_init_duration = now() - session_begin;
253
254        // Query planning phase
255        let planning_begin = now();
256        let mut df = ctx
257            .sql(sql)
258            .await
259            .map_err(|e| status!("error building dataframe", e))?;
260        let planning_duration = now() - planning_begin;
261
262        if let Some(limit_str) = metadata.get("limit") {
263            let limit: usize = usize::from_str(
264                limit_str
265                    .to_str()
266                    .map_err(|e| status!("error converting limit to str", e))?,
267            )
268            .map_err(|e| status!("error parsing limit", e))?;
269            df = df
270                .limit(0, Some(limit))
271                .map_err(|e| status!("error building dataframe with limit", e))?;
272        }
273
274        // Query execution phase
275        let execution_begin = now();
276        let schema = Arc::new(df.schema().as_arrow().clone());
277        let stream = df
278            .execute_stream()
279            .await
280            .map_err(|e| Status::internal(format!("Error executing plan: {e:?}")))?
281            .map_err(|e| FlightError::ExternalError(Box::new(e)));
282        let builder = if Self::should_preserve_dictionary(metadata) {
283            FlightDataEncoderBuilder::new()
284                .with_schema(schema.clone())
285                .with_dictionary_handling(DictionaryHandling::Resend)
286        } else {
287            FlightDataEncoderBuilder::new().with_schema(schema.clone())
288        };
289        let flight_data_stream = builder.build(stream);
290        let execution_duration = now() - execution_begin;
291
292        // Calculate total setup time and record detailed metrics
293        let total_setup_duration = now() - begin_request;
294
295        // Record detailed timing metrics
296        imetric!(
297            "context_init_duration",
298            "ticks",
299            context_init_duration as u64
300        );
301        imetric!("query_planning_duration", "ticks", planning_duration as u64);
302        imetric!(
303            "query_execution_duration",
304            "ticks",
305            execution_duration as u64
306        );
307        imetric!("query_setup_duration", "ticks", total_setup_duration as u64);
308
309        // Create instrumented stream that tracks completion
310        let instrumented_stream = flight_data_stream
311            .map_err(|e| status!("error building data stream", e))
312            .map({
313                let start_time = begin_request;
314                move |result| {
315                    // On stream completion or error, record total duration
316                    if result.is_err() {
317                        let total_duration = now() - start_time;
318                        imetric!("query_duration_with_error", "ticks", total_duration as u64);
319                    }
320                    result
321                }
322            });
323        let completion_tracked_stream =
324            CompletionTrackedStream::new(instrumented_stream.boxed(), begin_request);
325        Ok(Response::new(
326            Box::pin(completion_tracked_stream) as FlightDataStream
327        ))
328    }
329}
330
331#[tonic::async_trait]
332impl FlightSqlService for FlightSqlServiceImpl {
333    type FlightService = FlightSqlServiceImpl;
334
335    async fn do_handshake(
336        &self,
337        _request: Request<Streaming<HandshakeRequest>>,
338    ) -> Result<
339        Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
340        Status,
341    > {
342        api_entry_not_implemented!()
343    }
344
345    #[span_fn]
346    async fn do_get_fallback(
347        &self,
348        request: Request<Ticket>,
349        _message: Any,
350    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
351        let ticket_stmt = TicketStatementQuery::decode(request.get_ref().ticket.clone())
352            .map_err(|e| status!("Could not read ticket", e))?;
353        self.execute_query(ticket_stmt, request.metadata()).await
354    }
355
356    #[span_fn]
357    async fn get_flight_info_statement(
358        &self,
359        query: CommandStatementQuery,
360        _request: Request<FlightDescriptor>,
361    ) -> Result<Response<FlightInfo>, Status> {
362        let begin_request = now();
363        info!("get_flight_info_statement {query:?} ");
364        let CommandStatementQuery { query, .. } = query;
365        let schema = Schema::empty();
366        let ticket = TicketStatementQuery {
367            statement_handle: query.into(),
368        };
369        let mut bytes: Vec<u8> = Vec::new();
370        if ticket.encode(&mut bytes).is_ok() {
371            let info = FlightInfo::new()
372                .try_with_schema(&schema)
373                .unwrap()
374                .with_endpoint(FlightEndpoint::new().with_ticket(Ticket::new(bytes)));
375            let duration = now() - begin_request;
376            imetric!("request_duration", "ticks", duration as u64);
377            Ok(Response::new(info))
378        } else {
379            error!("Error encoding ticket");
380            Err(Status::internal("Error encoding ticket"))
381        }
382    }
383
384    async fn get_flight_info_substrait_plan(
385        &self,
386        _query: CommandStatementSubstraitPlan,
387        _request: Request<FlightDescriptor>,
388    ) -> Result<Response<FlightInfo>, Status> {
389        api_entry_not_implemented!()
390    }
391
392    async fn get_flight_info_prepared_statement(
393        &self,
394        _cmd: CommandPreparedStatementQuery,
395        _request: Request<FlightDescriptor>,
396    ) -> Result<Response<FlightInfo>, Status> {
397        api_entry_not_implemented!()
398    }
399
400    async fn get_flight_info_catalogs(
401        &self,
402        _query: CommandGetCatalogs,
403        _request: Request<FlightDescriptor>,
404    ) -> Result<Response<FlightInfo>, Status> {
405        api_entry_not_implemented!()
406    }
407
408    async fn get_flight_info_schemas(
409        &self,
410        _query: CommandGetDbSchemas,
411        _request: Request<FlightDescriptor>,
412    ) -> Result<Response<FlightInfo>, Status> {
413        api_entry_not_implemented!()
414    }
415
416    #[span_fn]
417    async fn get_flight_info_tables(
418        &self,
419        query: CommandGetTables,
420        request: Request<FlightDescriptor>,
421    ) -> Result<Response<FlightInfo>, Status> {
422        let begin_request = now();
423        info!("get_flight_info_tables");
424        let flight_descriptor = request.into_inner();
425        let ticket = Ticket {
426            ticket: query.as_any().encode_to_vec().into(),
427        };
428        let endpoint = FlightEndpoint::new().with_ticket(ticket);
429        let flight_info = FlightInfo::new()
430            .try_with_schema(&query.into_builder().schema())
431            .map_err(|e| status!("Unable to encode schema", e))?
432            .with_endpoint(endpoint)
433            .with_descriptor(flight_descriptor);
434        let duration = now() - begin_request;
435        imetric!("request_duration", "ticks", duration as u64);
436        Ok(tonic::Response::new(flight_info))
437    }
438
439    async fn get_flight_info_table_types(
440        &self,
441        _query: CommandGetTableTypes,
442        _request: Request<FlightDescriptor>,
443    ) -> Result<Response<FlightInfo>, Status> {
444        api_entry_not_implemented!()
445    }
446
447    #[span_fn]
448    async fn get_flight_info_sql_info(
449        &self,
450        query: CommandGetSqlInfo,
451        request: Request<FlightDescriptor>,
452    ) -> Result<Response<FlightInfo>, Status> {
453        let begin_request = now();
454        info!("get_flight_info_sql_info");
455        let flight_descriptor = request.into_inner();
456        let ticket = Ticket::new(query.as_any().encode_to_vec());
457        let endpoint = FlightEndpoint::new().with_ticket(ticket);
458        let flight_info = FlightInfo::new()
459            .try_with_schema(query.into_builder(&INSTANCE_SQL_DATA).schema().as_ref())
460            .map_err(|e| status!("Unable to encode schema", e))?
461            .with_endpoint(endpoint)
462            .with_descriptor(flight_descriptor);
463        let duration = now() - begin_request;
464        imetric!("request_duration", "ticks", duration as u64);
465        Ok(tonic::Response::new(flight_info))
466    }
467
468    async fn get_flight_info_primary_keys(
469        &self,
470        _query: CommandGetPrimaryKeys,
471        _request: Request<FlightDescriptor>,
472    ) -> Result<Response<FlightInfo>, Status> {
473        api_entry_not_implemented!()
474    }
475
476    async fn get_flight_info_exported_keys(
477        &self,
478        _query: CommandGetExportedKeys,
479        _request: Request<FlightDescriptor>,
480    ) -> Result<Response<FlightInfo>, Status> {
481        api_entry_not_implemented!()
482    }
483
484    async fn get_flight_info_imported_keys(
485        &self,
486        _query: CommandGetImportedKeys,
487        _request: Request<FlightDescriptor>,
488    ) -> Result<Response<FlightInfo>, Status> {
489        api_entry_not_implemented!()
490    }
491
492    async fn get_flight_info_cross_reference(
493        &self,
494        _query: CommandGetCrossReference,
495        _request: Request<FlightDescriptor>,
496    ) -> Result<Response<FlightInfo>, Status> {
497        api_entry_not_implemented!()
498    }
499
500    async fn get_flight_info_xdbc_type_info(
501        &self,
502        _query: CommandGetXdbcTypeInfo,
503        _request: Request<FlightDescriptor>,
504    ) -> Result<Response<FlightInfo>, Status> {
505        api_entry_not_implemented!()
506    }
507
508    #[span_fn]
509    async fn do_get_statement(
510        &self,
511        ticket: TicketStatementQuery,
512        request: Request<Ticket>,
513    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
514        self.execute_query(ticket, request.metadata()).await
515    }
516
517    async fn do_get_prepared_statement(
518        &self,
519        _query: CommandPreparedStatementQuery,
520        _request: Request<Ticket>,
521    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
522        api_entry_not_implemented!()
523    }
524
525    async fn do_get_catalogs(
526        &self,
527        _query: CommandGetCatalogs,
528        _request: Request<Ticket>,
529    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
530        api_entry_not_implemented!()
531    }
532
533    async fn do_get_schemas(
534        &self,
535        _query: CommandGetDbSchemas,
536        _request: Request<Ticket>,
537    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
538        api_entry_not_implemented!()
539    }
540
541    #[span_fn]
542    async fn do_get_tables(
543        &self,
544        query: CommandGetTables,
545        _request: Request<Ticket>,
546    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
547        let begin_request = now();
548        info!("do_get_tables {query:?}");
549        let mut builder = query.into_builder();
550        for view in self.view_factory.get_global_views() {
551            let catalog_name = "";
552            let schema_name = "";
553            builder
554                .append(
555                    catalog_name,
556                    schema_name,
557                    &*view.get_view_set_name(),
558                    "table",
559                    &view.get_file_schema(),
560                )
561                .map_err(Status::from)?;
562        }
563        let schema = builder.schema();
564        let batch = builder.build();
565        let stream = FlightDataEncoderBuilder::new()
566            .with_schema(schema)
567            .build(futures::stream::once(async { batch }))
568            .map_err(Status::from);
569        let duration = now() - begin_request;
570        imetric!("request_duration", "ticks", duration as u64);
571        Ok(Response::new(Box::pin(stream)))
572    }
573
574    async fn do_get_table_types(
575        &self,
576        _query: CommandGetTableTypes,
577        _request: Request<Ticket>,
578    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
579        api_entry_not_implemented!()
580    }
581
582    #[span_fn]
583    async fn do_get_sql_info(
584        &self,
585        query: CommandGetSqlInfo,
586        _request: Request<Ticket>,
587    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
588        info!("do_get_sql_info");
589        let builder = query.into_builder(&INSTANCE_SQL_DATA);
590        let schema = builder.schema();
591        let batch = builder.build();
592        let stream = FlightDataEncoderBuilder::new()
593            .with_schema(schema)
594            .build(futures::stream::once(async { batch }))
595            .map_err(Status::from);
596        Ok(Response::new(Box::pin(stream)))
597    }
598
599    async fn do_get_primary_keys(
600        &self,
601        _query: CommandGetPrimaryKeys,
602        _request: Request<Ticket>,
603    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
604        api_entry_not_implemented!()
605    }
606
607    async fn do_get_exported_keys(
608        &self,
609        _query: CommandGetExportedKeys,
610        _request: Request<Ticket>,
611    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
612        api_entry_not_implemented!()
613    }
614
615    async fn do_get_imported_keys(
616        &self,
617        _query: CommandGetImportedKeys,
618        _request: Request<Ticket>,
619    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
620        api_entry_not_implemented!()
621    }
622
623    async fn do_get_cross_reference(
624        &self,
625        _query: CommandGetCrossReference,
626        _request: Request<Ticket>,
627    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
628        api_entry_not_implemented!()
629    }
630
631    async fn do_get_xdbc_type_info(
632        &self,
633        _query: CommandGetXdbcTypeInfo,
634        _request: Request<Ticket>,
635    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
636        api_entry_not_implemented!()
637    }
638
639    async fn do_put_statement_update(
640        &self,
641        _ticket: CommandStatementUpdate,
642        _request: Request<PeekableFlightDataStream>,
643    ) -> Result<i64, Status> {
644        api_entry_not_implemented!()
645    }
646
647    #[span_fn]
648    async fn do_put_statement_ingest(
649        &self,
650        command: CommandStatementIngest,
651        request: Request<PeekableFlightDataStream>,
652    ) -> Result<i64, Status> {
653        let table_name = command.table;
654        info!("do_put_statement_ingest table_name={table_name}");
655        let stream = FlightRecordBatchStream::new_from_flight_data(
656            request.into_inner().map_err(|e| e.into()),
657        );
658        bulk_ingest(self.lakehouse.lake().clone(), &table_name, stream)
659            .await
660            .map_err(|e| {
661                let msg = format!("error ingesting into {table_name}: {e:?}");
662                error!("{msg}");
663                status!(msg, e)
664            })
665    }
666
667    async fn do_put_substrait_plan(
668        &self,
669        _ticket: CommandStatementSubstraitPlan,
670        _request: Request<PeekableFlightDataStream>,
671    ) -> Result<i64, Status> {
672        api_entry_not_implemented!()
673    }
674
675    async fn do_put_prepared_statement_query(
676        &self,
677        _query: CommandPreparedStatementQuery,
678        _request: Request<PeekableFlightDataStream>,
679    ) -> Result<DoPutPreparedStatementResult, Status> {
680        api_entry_not_implemented!()
681    }
682
683    async fn do_put_prepared_statement_update(
684        &self,
685        _query: CommandPreparedStatementUpdate,
686        _request: Request<PeekableFlightDataStream>,
687    ) -> Result<i64, Status> {
688        api_entry_not_implemented!()
689    }
690
691    #[span_fn]
692    async fn do_action_create_prepared_statement(
693        &self,
694        query: ActionCreatePreparedStatementRequest,
695        _request: Request<Action>,
696    ) -> Result<ActionCreatePreparedStatementResult, Status> {
697        info!("do_action_create_prepared_statement query={}", &query.query);
698
699        let ctx = make_session_context(
700            self.lakehouse.clone(),
701            self.part_provider.clone(),
702            None,
703            self.view_factory.clone(),
704            self.session_configurator.clone(),
705        )
706        .await
707        .map_err(|e| status!("error in make_session_context", e))?;
708
709        let df = ctx
710            .sql(&query.query)
711            .await
712            .map_err(|e| status!("error building dataframe", e))?;
713        let schema = df.schema().as_arrow();
714        let mut schema_buffer = Vec::new();
715        let mut writer = StreamWriter::try_new(&mut schema_buffer, schema)
716            .map_err(|e| status!("error writing schema to in-memory buffer", e))?;
717        writer
718            .finish()
719            .map_err(|e| status!("error closing arrow ipc stream writer", e))?;
720        // here we could serialize the logical plan and return that as the prepared statement, but we would
721        // need to register LogicalExtensionCodec for user-defined functions
722        // instead, we are sending back the sql as we received it
723        let result = ActionCreatePreparedStatementResult {
724            prepared_statement_handle: query.query.into(),
725            dataset_schema: schema_buffer.into(),
726            parameter_schema: "".into(),
727        };
728        Ok(result)
729    }
730
731    async fn do_action_close_prepared_statement(
732        &self,
733        _query: ActionClosePreparedStatementRequest,
734        _request: Request<Action>,
735    ) -> Result<(), Status> {
736        info!("do_action_close_prepared_statement");
737        Ok(())
738    }
739
740    async fn do_action_create_prepared_substrait_plan(
741        &self,
742        _query: ActionCreatePreparedSubstraitPlanRequest,
743        _request: Request<Action>,
744    ) -> Result<ActionCreatePreparedStatementResult, Status> {
745        api_entry_not_implemented!()
746    }
747
748    async fn do_action_begin_transaction(
749        &self,
750        _query: ActionBeginTransactionRequest,
751        _request: Request<Action>,
752    ) -> Result<ActionBeginTransactionResult, Status> {
753        api_entry_not_implemented!()
754    }
755
756    async fn do_action_end_transaction(
757        &self,
758        _query: ActionEndTransactionRequest,
759        _request: Request<Action>,
760    ) -> Result<(), Status> {
761        api_entry_not_implemented!()
762    }
763
764    async fn do_action_begin_savepoint(
765        &self,
766        _query: ActionBeginSavepointRequest,
767        _request: Request<Action>,
768    ) -> Result<ActionBeginSavepointResult, Status> {
769        api_entry_not_implemented!()
770    }
771
772    async fn do_action_end_savepoint(
773        &self,
774        _query: ActionEndSavepointRequest,
775        _request: Request<Action>,
776    ) -> Result<(), Status> {
777        api_entry_not_implemented!()
778    }
779
780    async fn do_action_cancel_query(
781        &self,
782        _query: ActionCancelQueryRequest,
783        _request: Request<Action>,
784    ) -> Result<ActionCancelQueryResult, Status> {
785        api_entry_not_implemented!()
786    }
787
788    async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {
789        info!("register_sql_info");
790    }
791}