micromegas/client/
flightsql_client.rs1use anyhow::{Context, Result};
2use arrow_flight::{
3 decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
4 sql::client::FlightSqlServiceClient,
5};
6use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
7use futures::stream::StreamExt;
8use micromegas_analytics::time::TimeRange;
9use std::sync::Arc;
10use tonic::transport::Channel;
11
12#[derive(Debug)]
14pub struct PreparedStatement {
15 pub schema: SchemaRef,
16 pub query: String,
17}
18
19pub struct Client {
21 inner: FlightSqlServiceClient<Channel>,
22}
23
24impl Client {
25 pub fn new(channel: Channel) -> Self {
31 let flight_client =
32 FlightServiceClient::new(channel).max_decoding_message_size(100 * 1024 * 1024);
33 let inner = FlightSqlServiceClient::new_from_inner(flight_client);
34 Self { inner }
35 }
36
37 pub fn inner_mut(&mut self) -> &mut FlightSqlServiceClient<Channel> {
41 &mut self.inner
42 }
43
44 fn set_query_range(&mut self, query_range: Option<TimeRange>) {
45 self.inner.set_header(
46 "query_range_begin",
47 query_range.map_or(String::from(""), |r| r.begin.to_rfc3339()),
48 );
49 self.inner.set_header(
50 "query_range_end",
51 query_range.map_or(String::from(""), |r| r.end.to_rfc3339()),
52 );
53 }
54
55 pub async fn query(
62 &mut self,
63 sql: String,
64 query_range: Option<TimeRange>,
65 ) -> Result<Vec<RecordBatch>> {
66 let mut record_batch_stream = self.query_stream(sql, query_range).await?;
67 let mut batches = vec![];
68 while let Some(batch_res) = record_batch_stream.next().await {
69 batches.push(batch_res?);
70 }
71 Ok(batches)
72 }
73
74 pub async fn query_stream(
83 &mut self,
84 sql: String,
85 query_range: Option<TimeRange>,
86 ) -> Result<FlightRecordBatchStream> {
87 self.set_query_range(query_range);
88 let info = self.inner.execute(sql, None).await?;
89 let ticket = info.endpoint[0]
90 .ticket
91 .clone()
92 .with_context(|| "reading ticket from endpoint")?;
93 let flight_data_stream = self.inner.do_get(ticket).await?.into_inner();
94 Ok(FlightRecordBatchStream::new(flight_data_stream))
95 }
96
97 pub async fn prepare_statement(&mut self, sql: String) -> Result<PreparedStatement> {
105 self.set_query_range(None);
106 let prepared = self.inner.prepare(sql.clone(), None).await?;
107 Ok(PreparedStatement {
108 schema: Arc::new(prepared.dataset_schema()?.clone()),
109 query: sql,
110 })
111 }
112}