1use super::sqlinfo::{
2 SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS,
3 SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS,
4};
5use anyhow::Result;
6use arrow_flight::decode::FlightRecordBatchStream;
7use arrow_flight::encode::{DictionaryHandling, FlightDataEncoderBuilder};
8use arrow_flight::error::FlightError;
9use arrow_flight::sql::DoPutPreparedStatementResult;
10use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
11use arrow_flight::sql::server::PeekableFlightDataStream;
12use arrow_flight::sql::{
13 ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest,
14 ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult,
15 ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
16 ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest,
17 ActionEndSavepointRequest, ActionEndTransactionRequest, Any, CommandGetCatalogs,
18 CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
19 CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
20 CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
21 CommandStatementIngest, CommandStatementQuery, CommandStatementSubstraitPlan,
22 CommandStatementUpdate, ProstMessageExt, SqlInfo, TicketStatementQuery,
23 server::FlightSqlService,
24};
25use arrow_flight::{
26 Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
27 Ticket, flight_service_server::FlightService,
28};
29use core::str;
30use datafusion::arrow::datatypes::Schema;
31use datafusion::arrow::ipc::writer::StreamWriter;
32use futures::StreamExt;
33use futures::{Stream, TryStreamExt};
34use micromegas_analytics::lakehouse::lakehouse_context::LakehouseContext;
35use micromegas_analytics::lakehouse::partition_cache::QueryPartitionProvider;
36use micromegas_analytics::lakehouse::query::make_session_context;
37use micromegas_analytics::lakehouse::session_configurator::SessionConfigurator;
38use micromegas_analytics::lakehouse::view_factory::ViewFactory;
39use micromegas_analytics::replication::bulk_ingest;
40use micromegas_analytics::time::TimeRange;
41use micromegas_auth::user_attribution::validate_and_resolve_user_attribution_grpc;
42use micromegas_tracing::prelude::*;
43use once_cell::sync::Lazy;
44use prost::Message;
45use std::pin::Pin;
46use std::str::FromStr;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use tonic::metadata::MetadataMap;
50use tonic::{Request, Response, Status, Streaming};
51
52type FlightDataStream =
53 Pin<Box<dyn Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>;
54
55macro_rules! status {
56 ($desc:expr, $err:expr) => {
57 Status::internal(format!("{}: {} at {}:{}", $desc, $err, file!(), line!()))
58 };
59}
60
61macro_rules! api_entry_not_implemented {
62 () => {{
63 let function_name = micromegas_tracing::__function_name!();
64 error!("not implemented: {function_name}");
65 Err(Status::unimplemented(format!(
66 "{}:{} not implemented: {function_name}",
67 file!(),
68 line!()
69 )))
70 }};
71}
72
73struct CompletionTrackedStream<S> {
75 inner: S,
76 start_time: i64,
77 completed: bool,
78}
79
80impl<S> CompletionTrackedStream<S> {
81 fn new(inner: S, start_time: i64) -> Self {
82 Self {
83 inner,
84 start_time,
85 completed: false,
86 }
87 }
88}
89
90impl<S> Stream for CompletionTrackedStream<S>
91where
92 S: Stream<Item = Result<arrow_flight::FlightData, Status>> + Unpin + Send,
93{
94 type Item = Result<arrow_flight::FlightData, Status>;
95
96 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97 match Pin::new(&mut self.inner).poll_next(cx) {
98 Poll::Ready(Some(result)) => {
99 if let Err(ref err) = result {
101 error!("stream error occurred: {err:?}");
102 if !self.completed {
103 let total_duration = now() - self.start_time;
104 imetric!("query_duration_with_error", "ticks", total_duration as u64);
105 imetric!("query_failed", "count", 1);
106 self.completed = true;
107 }
108 }
109 Poll::Ready(Some(result))
110 }
111 Poll::Ready(None) => {
112 if !self.completed {
114 let total_duration = now() - self.start_time;
115 imetric!("query_duration_total", "ticks", total_duration as u64);
116 imetric!("query_completed_successfully", "count", 1);
117 self.completed = true;
118 }
119 Poll::Ready(None)
120 }
121 Poll::Pending => Poll::Pending,
122 }
123 }
124}
125
126static INSTANCE_SQL_DATA: Lazy<SqlInfoData> = Lazy::new(|| {
127 let mut builder = SqlInfoDataBuilder::new();
128 builder.append(SqlInfo::FlightSqlServerName, "Micromegas Flight SQL Server");
130 builder.append(SqlInfo::FlightSqlServerVersion, "1");
131 builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
133 builder.append(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS);
134 builder.append(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS);
135 builder.append(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS);
136 builder.append(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS);
137 builder.append(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS);
138 builder.build().unwrap()
139});
140
141#[derive(Clone)]
143pub struct FlightSqlServiceImpl {
144 lakehouse: Arc<LakehouseContext>,
145 part_provider: Arc<dyn QueryPartitionProvider>,
146 view_factory: Arc<ViewFactory>,
147 session_configurator: Arc<dyn SessionConfigurator>,
148}
149
150impl FlightSqlServiceImpl {
151 pub fn new(
152 lakehouse: Arc<LakehouseContext>,
153 part_provider: Arc<dyn QueryPartitionProvider>,
154 view_factory: Arc<ViewFactory>,
155 session_configurator: Arc<dyn SessionConfigurator>,
156 ) -> Self {
157 Self {
158 lakehouse,
159 part_provider,
160 view_factory,
161 session_configurator,
162 }
163 }
164
165 fn should_preserve_dictionary(metadata: &MetadataMap) -> bool {
166 metadata
167 .get("preserve_dictionary")
168 .and_then(|v| v.to_str().ok())
169 .map(|s| s.eq_ignore_ascii_case("true"))
170 .unwrap_or(false)
171 }
172
173 #[span_fn]
174 async fn execute_query(
175 &self,
176 ticket_stmt: TicketStatementQuery,
177 metadata: &MetadataMap,
178 ) -> Result<Response<FlightDataStream>, Status> {
179 let begin_request = now();
180 let sql = std::str::from_utf8(&ticket_stmt.statement_handle)
181 .map_err(|e| status!("Unable to parse query", e))?;
182
183 let mut begin = metadata.get("query_range_begin");
184 if let Some(s) = &begin
185 && s.is_empty()
186 {
187 begin = None;
188 }
189 let mut end = metadata.get("query_range_end");
190 if let Some(s) = &end
191 && s.is_empty()
192 {
193 end = None;
194 }
195 let query_range = if begin.is_some() && end.is_some() {
196 let begin_datetime = chrono::DateTime::parse_from_rfc3339(
197 begin
198 .unwrap()
199 .to_str()
200 .map_err(|e| status!("Unable to convert query_range_begin to string", e))?,
201 )
202 .map_err(|e| status!("Unable to parse query_range_begin as a rfc3339 datetime", e))?;
203 let end_datetime = chrono::DateTime::parse_from_rfc3339(
204 end.unwrap()
205 .to_str()
206 .map_err(|e| status!("Unable to convert query_range_end to string", e))?,
207 )
208 .map_err(|e| status!("Unable to parse query_range_end as a rfc3339 datetime", e))?;
209 Some(TimeRange::new(begin_datetime.into(), end_datetime.into()))
210 } else {
211 None
212 };
213
214 let attr = validate_and_resolve_user_attribution_grpc(metadata).map_err(|e| *e)?;
216
217 let client_type = metadata
218 .get("x-client-type")
219 .and_then(|v| v.to_str().ok())
220 .unwrap_or("unknown");
221
222 let user_name_display = attr.user_name.as_deref().unwrap_or("");
223
224 if let Some(service_account_name) = &attr.service_account {
226 info!(
227 "execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} service_account={service_account_name} client={client_type}",
228 metadata.get("limit"),
229 attr.user_id,
230 attr.user_email
231 );
232 } else {
233 info!(
234 "execute_query range={query_range:?} sql={sql:?} limit={:?} user={} email={} name={user_name_display:?} client={client_type}",
235 metadata.get("limit"),
236 attr.user_id,
237 attr.user_email
238 );
239 }
240
241 let session_begin = now();
243 let ctx = make_session_context(
244 self.lakehouse.clone(),
245 self.part_provider.clone(),
246 query_range,
247 self.view_factory.clone(),
248 self.session_configurator.clone(),
249 )
250 .await
251 .map_err(|e| status!("error in make_session_context", e))?;
252 let context_init_duration = now() - session_begin;
253
254 let planning_begin = now();
256 let mut df = ctx
257 .sql(sql)
258 .await
259 .map_err(|e| status!("error building dataframe", e))?;
260 let planning_duration = now() - planning_begin;
261
262 if let Some(limit_str) = metadata.get("limit") {
263 let limit: usize = usize::from_str(
264 limit_str
265 .to_str()
266 .map_err(|e| status!("error converting limit to str", e))?,
267 )
268 .map_err(|e| status!("error parsing limit", e))?;
269 df = df
270 .limit(0, Some(limit))
271 .map_err(|e| status!("error building dataframe with limit", e))?;
272 }
273
274 let execution_begin = now();
276 let schema = Arc::new(df.schema().as_arrow().clone());
277 let stream = df
278 .execute_stream()
279 .await
280 .map_err(|e| Status::internal(format!("Error executing plan: {e:?}")))?
281 .map_err(|e| FlightError::ExternalError(Box::new(e)));
282 let builder = if Self::should_preserve_dictionary(metadata) {
283 FlightDataEncoderBuilder::new()
284 .with_schema(schema.clone())
285 .with_dictionary_handling(DictionaryHandling::Resend)
286 } else {
287 FlightDataEncoderBuilder::new().with_schema(schema.clone())
288 };
289 let flight_data_stream = builder.build(stream);
290 let execution_duration = now() - execution_begin;
291
292 let total_setup_duration = now() - begin_request;
294
295 imetric!(
297 "context_init_duration",
298 "ticks",
299 context_init_duration as u64
300 );
301 imetric!("query_planning_duration", "ticks", planning_duration as u64);
302 imetric!(
303 "query_execution_duration",
304 "ticks",
305 execution_duration as u64
306 );
307 imetric!("query_setup_duration", "ticks", total_setup_duration as u64);
308
309 let instrumented_stream = flight_data_stream
311 .map_err(|e| status!("error building data stream", e))
312 .map({
313 let start_time = begin_request;
314 move |result| {
315 if result.is_err() {
317 let total_duration = now() - start_time;
318 imetric!("query_duration_with_error", "ticks", total_duration as u64);
319 }
320 result
321 }
322 });
323 let completion_tracked_stream =
324 CompletionTrackedStream::new(instrumented_stream.boxed(), begin_request);
325 Ok(Response::new(
326 Box::pin(completion_tracked_stream) as FlightDataStream
327 ))
328 }
329}
330
331#[tonic::async_trait]
332impl FlightSqlService for FlightSqlServiceImpl {
333 type FlightService = FlightSqlServiceImpl;
334
335 async fn do_handshake(
336 &self,
337 _request: Request<Streaming<HandshakeRequest>>,
338 ) -> Result<
339 Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
340 Status,
341 > {
342 api_entry_not_implemented!()
343 }
344
345 #[span_fn]
346 async fn do_get_fallback(
347 &self,
348 request: Request<Ticket>,
349 _message: Any,
350 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
351 let ticket_stmt = TicketStatementQuery::decode(request.get_ref().ticket.clone())
352 .map_err(|e| status!("Could not read ticket", e))?;
353 self.execute_query(ticket_stmt, request.metadata()).await
354 }
355
356 #[span_fn]
357 async fn get_flight_info_statement(
358 &self,
359 query: CommandStatementQuery,
360 _request: Request<FlightDescriptor>,
361 ) -> Result<Response<FlightInfo>, Status> {
362 let begin_request = now();
363 info!("get_flight_info_statement {query:?} ");
364 let CommandStatementQuery { query, .. } = query;
365 let schema = Schema::empty();
366 let ticket = TicketStatementQuery {
367 statement_handle: query.into(),
368 };
369 let mut bytes: Vec<u8> = Vec::new();
370 if ticket.encode(&mut bytes).is_ok() {
371 let info = FlightInfo::new()
372 .try_with_schema(&schema)
373 .unwrap()
374 .with_endpoint(FlightEndpoint::new().with_ticket(Ticket::new(bytes)));
375 let duration = now() - begin_request;
376 imetric!("request_duration", "ticks", duration as u64);
377 Ok(Response::new(info))
378 } else {
379 error!("Error encoding ticket");
380 Err(Status::internal("Error encoding ticket"))
381 }
382 }
383
384 async fn get_flight_info_substrait_plan(
385 &self,
386 _query: CommandStatementSubstraitPlan,
387 _request: Request<FlightDescriptor>,
388 ) -> Result<Response<FlightInfo>, Status> {
389 api_entry_not_implemented!()
390 }
391
392 async fn get_flight_info_prepared_statement(
393 &self,
394 _cmd: CommandPreparedStatementQuery,
395 _request: Request<FlightDescriptor>,
396 ) -> Result<Response<FlightInfo>, Status> {
397 api_entry_not_implemented!()
398 }
399
400 async fn get_flight_info_catalogs(
401 &self,
402 _query: CommandGetCatalogs,
403 _request: Request<FlightDescriptor>,
404 ) -> Result<Response<FlightInfo>, Status> {
405 api_entry_not_implemented!()
406 }
407
408 async fn get_flight_info_schemas(
409 &self,
410 _query: CommandGetDbSchemas,
411 _request: Request<FlightDescriptor>,
412 ) -> Result<Response<FlightInfo>, Status> {
413 api_entry_not_implemented!()
414 }
415
416 #[span_fn]
417 async fn get_flight_info_tables(
418 &self,
419 query: CommandGetTables,
420 request: Request<FlightDescriptor>,
421 ) -> Result<Response<FlightInfo>, Status> {
422 let begin_request = now();
423 info!("get_flight_info_tables");
424 let flight_descriptor = request.into_inner();
425 let ticket = Ticket {
426 ticket: query.as_any().encode_to_vec().into(),
427 };
428 let endpoint = FlightEndpoint::new().with_ticket(ticket);
429 let flight_info = FlightInfo::new()
430 .try_with_schema(&query.into_builder().schema())
431 .map_err(|e| status!("Unable to encode schema", e))?
432 .with_endpoint(endpoint)
433 .with_descriptor(flight_descriptor);
434 let duration = now() - begin_request;
435 imetric!("request_duration", "ticks", duration as u64);
436 Ok(tonic::Response::new(flight_info))
437 }
438
439 async fn get_flight_info_table_types(
440 &self,
441 _query: CommandGetTableTypes,
442 _request: Request<FlightDescriptor>,
443 ) -> Result<Response<FlightInfo>, Status> {
444 api_entry_not_implemented!()
445 }
446
447 #[span_fn]
448 async fn get_flight_info_sql_info(
449 &self,
450 query: CommandGetSqlInfo,
451 request: Request<FlightDescriptor>,
452 ) -> Result<Response<FlightInfo>, Status> {
453 let begin_request = now();
454 info!("get_flight_info_sql_info");
455 let flight_descriptor = request.into_inner();
456 let ticket = Ticket::new(query.as_any().encode_to_vec());
457 let endpoint = FlightEndpoint::new().with_ticket(ticket);
458 let flight_info = FlightInfo::new()
459 .try_with_schema(query.into_builder(&INSTANCE_SQL_DATA).schema().as_ref())
460 .map_err(|e| status!("Unable to encode schema", e))?
461 .with_endpoint(endpoint)
462 .with_descriptor(flight_descriptor);
463 let duration = now() - begin_request;
464 imetric!("request_duration", "ticks", duration as u64);
465 Ok(tonic::Response::new(flight_info))
466 }
467
468 async fn get_flight_info_primary_keys(
469 &self,
470 _query: CommandGetPrimaryKeys,
471 _request: Request<FlightDescriptor>,
472 ) -> Result<Response<FlightInfo>, Status> {
473 api_entry_not_implemented!()
474 }
475
476 async fn get_flight_info_exported_keys(
477 &self,
478 _query: CommandGetExportedKeys,
479 _request: Request<FlightDescriptor>,
480 ) -> Result<Response<FlightInfo>, Status> {
481 api_entry_not_implemented!()
482 }
483
484 async fn get_flight_info_imported_keys(
485 &self,
486 _query: CommandGetImportedKeys,
487 _request: Request<FlightDescriptor>,
488 ) -> Result<Response<FlightInfo>, Status> {
489 api_entry_not_implemented!()
490 }
491
492 async fn get_flight_info_cross_reference(
493 &self,
494 _query: CommandGetCrossReference,
495 _request: Request<FlightDescriptor>,
496 ) -> Result<Response<FlightInfo>, Status> {
497 api_entry_not_implemented!()
498 }
499
500 async fn get_flight_info_xdbc_type_info(
501 &self,
502 _query: CommandGetXdbcTypeInfo,
503 _request: Request<FlightDescriptor>,
504 ) -> Result<Response<FlightInfo>, Status> {
505 api_entry_not_implemented!()
506 }
507
508 #[span_fn]
509 async fn do_get_statement(
510 &self,
511 ticket: TicketStatementQuery,
512 request: Request<Ticket>,
513 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
514 self.execute_query(ticket, request.metadata()).await
515 }
516
517 async fn do_get_prepared_statement(
518 &self,
519 _query: CommandPreparedStatementQuery,
520 _request: Request<Ticket>,
521 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
522 api_entry_not_implemented!()
523 }
524
525 async fn do_get_catalogs(
526 &self,
527 _query: CommandGetCatalogs,
528 _request: Request<Ticket>,
529 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
530 api_entry_not_implemented!()
531 }
532
533 async fn do_get_schemas(
534 &self,
535 _query: CommandGetDbSchemas,
536 _request: Request<Ticket>,
537 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
538 api_entry_not_implemented!()
539 }
540
541 #[span_fn]
542 async fn do_get_tables(
543 &self,
544 query: CommandGetTables,
545 _request: Request<Ticket>,
546 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
547 let begin_request = now();
548 info!("do_get_tables {query:?}");
549 let mut builder = query.into_builder();
550 for view in self.view_factory.get_global_views() {
551 let catalog_name = "";
552 let schema_name = "";
553 builder
554 .append(
555 catalog_name,
556 schema_name,
557 &*view.get_view_set_name(),
558 "table",
559 &view.get_file_schema(),
560 )
561 .map_err(Status::from)?;
562 }
563 let schema = builder.schema();
564 let batch = builder.build();
565 let stream = FlightDataEncoderBuilder::new()
566 .with_schema(schema)
567 .build(futures::stream::once(async { batch }))
568 .map_err(Status::from);
569 let duration = now() - begin_request;
570 imetric!("request_duration", "ticks", duration as u64);
571 Ok(Response::new(Box::pin(stream)))
572 }
573
574 async fn do_get_table_types(
575 &self,
576 _query: CommandGetTableTypes,
577 _request: Request<Ticket>,
578 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
579 api_entry_not_implemented!()
580 }
581
582 #[span_fn]
583 async fn do_get_sql_info(
584 &self,
585 query: CommandGetSqlInfo,
586 _request: Request<Ticket>,
587 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
588 info!("do_get_sql_info");
589 let builder = query.into_builder(&INSTANCE_SQL_DATA);
590 let schema = builder.schema();
591 let batch = builder.build();
592 let stream = FlightDataEncoderBuilder::new()
593 .with_schema(schema)
594 .build(futures::stream::once(async { batch }))
595 .map_err(Status::from);
596 Ok(Response::new(Box::pin(stream)))
597 }
598
599 async fn do_get_primary_keys(
600 &self,
601 _query: CommandGetPrimaryKeys,
602 _request: Request<Ticket>,
603 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
604 api_entry_not_implemented!()
605 }
606
607 async fn do_get_exported_keys(
608 &self,
609 _query: CommandGetExportedKeys,
610 _request: Request<Ticket>,
611 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
612 api_entry_not_implemented!()
613 }
614
615 async fn do_get_imported_keys(
616 &self,
617 _query: CommandGetImportedKeys,
618 _request: Request<Ticket>,
619 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
620 api_entry_not_implemented!()
621 }
622
623 async fn do_get_cross_reference(
624 &self,
625 _query: CommandGetCrossReference,
626 _request: Request<Ticket>,
627 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
628 api_entry_not_implemented!()
629 }
630
631 async fn do_get_xdbc_type_info(
632 &self,
633 _query: CommandGetXdbcTypeInfo,
634 _request: Request<Ticket>,
635 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
636 api_entry_not_implemented!()
637 }
638
639 async fn do_put_statement_update(
640 &self,
641 _ticket: CommandStatementUpdate,
642 _request: Request<PeekableFlightDataStream>,
643 ) -> Result<i64, Status> {
644 api_entry_not_implemented!()
645 }
646
647 #[span_fn]
648 async fn do_put_statement_ingest(
649 &self,
650 command: CommandStatementIngest,
651 request: Request<PeekableFlightDataStream>,
652 ) -> Result<i64, Status> {
653 let table_name = command.table;
654 info!("do_put_statement_ingest table_name={table_name}");
655 let stream = FlightRecordBatchStream::new_from_flight_data(
656 request.into_inner().map_err(|e| e.into()),
657 );
658 bulk_ingest(self.lakehouse.lake().clone(), &table_name, stream)
659 .await
660 .map_err(|e| {
661 let msg = format!("error ingesting into {table_name}: {e:?}");
662 error!("{msg}");
663 status!(msg, e)
664 })
665 }
666
667 async fn do_put_substrait_plan(
668 &self,
669 _ticket: CommandStatementSubstraitPlan,
670 _request: Request<PeekableFlightDataStream>,
671 ) -> Result<i64, Status> {
672 api_entry_not_implemented!()
673 }
674
675 async fn do_put_prepared_statement_query(
676 &self,
677 _query: CommandPreparedStatementQuery,
678 _request: Request<PeekableFlightDataStream>,
679 ) -> Result<DoPutPreparedStatementResult, Status> {
680 api_entry_not_implemented!()
681 }
682
683 async fn do_put_prepared_statement_update(
684 &self,
685 _query: CommandPreparedStatementUpdate,
686 _request: Request<PeekableFlightDataStream>,
687 ) -> Result<i64, Status> {
688 api_entry_not_implemented!()
689 }
690
691 #[span_fn]
692 async fn do_action_create_prepared_statement(
693 &self,
694 query: ActionCreatePreparedStatementRequest,
695 _request: Request<Action>,
696 ) -> Result<ActionCreatePreparedStatementResult, Status> {
697 info!("do_action_create_prepared_statement query={}", &query.query);
698
699 let ctx = make_session_context(
700 self.lakehouse.clone(),
701 self.part_provider.clone(),
702 None,
703 self.view_factory.clone(),
704 self.session_configurator.clone(),
705 )
706 .await
707 .map_err(|e| status!("error in make_session_context", e))?;
708
709 let df = ctx
710 .sql(&query.query)
711 .await
712 .map_err(|e| status!("error building dataframe", e))?;
713 let schema = df.schema().as_arrow();
714 let mut schema_buffer = Vec::new();
715 let mut writer = StreamWriter::try_new(&mut schema_buffer, schema)
716 .map_err(|e| status!("error writing schema to in-memory buffer", e))?;
717 writer
718 .finish()
719 .map_err(|e| status!("error closing arrow ipc stream writer", e))?;
720 let result = ActionCreatePreparedStatementResult {
724 prepared_statement_handle: query.query.into(),
725 dataset_schema: schema_buffer.into(),
726 parameter_schema: "".into(),
727 };
728 Ok(result)
729 }
730
731 async fn do_action_close_prepared_statement(
732 &self,
733 _query: ActionClosePreparedStatementRequest,
734 _request: Request<Action>,
735 ) -> Result<(), Status> {
736 info!("do_action_close_prepared_statement");
737 Ok(())
738 }
739
740 async fn do_action_create_prepared_substrait_plan(
741 &self,
742 _query: ActionCreatePreparedSubstraitPlanRequest,
743 _request: Request<Action>,
744 ) -> Result<ActionCreatePreparedStatementResult, Status> {
745 api_entry_not_implemented!()
746 }
747
748 async fn do_action_begin_transaction(
749 &self,
750 _query: ActionBeginTransactionRequest,
751 _request: Request<Action>,
752 ) -> Result<ActionBeginTransactionResult, Status> {
753 api_entry_not_implemented!()
754 }
755
756 async fn do_action_end_transaction(
757 &self,
758 _query: ActionEndTransactionRequest,
759 _request: Request<Action>,
760 ) -> Result<(), Status> {
761 api_entry_not_implemented!()
762 }
763
764 async fn do_action_begin_savepoint(
765 &self,
766 _query: ActionBeginSavepointRequest,
767 _request: Request<Action>,
768 ) -> Result<ActionBeginSavepointResult, Status> {
769 api_entry_not_implemented!()
770 }
771
772 async fn do_action_end_savepoint(
773 &self,
774 _query: ActionEndSavepointRequest,
775 _request: Request<Action>,
776 ) -> Result<(), Status> {
777 api_entry_not_implemented!()
778 }
779
780 async fn do_action_cancel_query(
781 &self,
782 _query: ActionCancelQueryRequest,
783 _request: Request<Action>,
784 ) -> Result<ActionCancelQueryResult, Status> {
785 api_entry_not_implemented!()
786 }
787
788 async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {
789 info!("register_sql_info");
790 }
791}