micromegas/client/
flightsql_client.rs

1use anyhow::{Context, Result};
2use arrow_flight::{
3    decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
4    sql::client::FlightSqlServiceClient,
5};
6use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
7use futures::stream::StreamExt;
8use micromegas_analytics::time::TimeRange;
9use std::sync::Arc;
10use tonic::transport::Channel;
11
12/// Represents a prepared statement with its schema and query string.
13#[derive(Debug)]
14pub struct PreparedStatement {
15    pub schema: SchemaRef,
16    pub query: String,
17}
18
19/// Micromegas FlightSQL client
20pub struct Client {
21    inner: FlightSqlServiceClient<Channel>,
22}
23
24impl Client {
25    /// Creates a new client from a grpc channel
26    ///
27    /// # Arguments
28    ///
29    /// * `channel` - The gRPC channel to use for communication.
30    pub fn new(channel: Channel) -> Self {
31        let flight_client =
32            FlightServiceClient::new(channel).max_decoding_message_size(100 * 1024 * 1024);
33        let inner = FlightSqlServiceClient::new_from_inner(flight_client);
34        Self { inner }
35    }
36
37    /// Returns a mutable reference to the inner `FlightSqlServiceClient`.
38    ///
39    /// This allows direct access to the underlying FlightSQL client for advanced operations.
40    pub fn inner_mut(&mut self) -> &mut FlightSqlServiceClient<Channel> {
41        &mut self.inner
42    }
43
44    fn set_query_range(&mut self, query_range: Option<TimeRange>) {
45        self.inner.set_header(
46            "query_range_begin",
47            query_range.map_or(String::from(""), |r| r.begin.to_rfc3339()),
48        );
49        self.inner.set_header(
50            "query_range_end",
51            query_range.map_or(String::from(""), |r| r.end.to_rfc3339()),
52        );
53    }
54
55    /// Executes a SQL query and returns the results as a vector of `RecordBatch`.
56    ///
57    /// # Arguments
58    ///
59    /// * `sql` - The SQL query string to execute.
60    /// * `query_range` - An optional `TimeRange` to filter the query results by time.
61    pub async fn query(
62        &mut self,
63        sql: String,
64        query_range: Option<TimeRange>,
65    ) -> Result<Vec<RecordBatch>> {
66        let mut record_batch_stream = self.query_stream(sql, query_range).await?;
67        let mut batches = vec![];
68        while let Some(batch_res) = record_batch_stream.next().await {
69            batches.push(batch_res?);
70        }
71        Ok(batches)
72    }
73
74    /// Executes a SQL query and returns the results as a `FlightRecordBatchStream`.
75    ///
76    /// This function is useful for processing large query results incrementally.
77    ///
78    /// # Arguments
79    ///
80    /// * `sql` - The SQL query string to execute.
81    /// * `query_range` - An optional `TimeRange` to filter the query results by time.
82    pub async fn query_stream(
83        &mut self,
84        sql: String,
85        query_range: Option<TimeRange>,
86    ) -> Result<FlightRecordBatchStream> {
87        self.set_query_range(query_range);
88        let info = self.inner.execute(sql, None).await?;
89        let ticket = info.endpoint[0]
90            .ticket
91            .clone()
92            .with_context(|| "reading ticket from endpoint")?;
93        let flight_data_stream = self.inner.do_get(ticket).await?.into_inner();
94        Ok(FlightRecordBatchStream::new(flight_data_stream))
95    }
96
97    /// Prepares a SQL statement and returns a `PreparedStatement`.
98    ///
99    /// This function allows to compute the schema of a query without actually executing it.
100    ///
101    /// # Arguments
102    ///
103    /// * `sql` - The SQL query string to prepare.
104    pub async fn prepare_statement(&mut self, sql: String) -> Result<PreparedStatement> {
105        self.set_query_range(None);
106        let prepared = self.inner.prepare(sql.clone(), None).await?;
107        Ok(PreparedStatement {
108            schema: Arc::new(prepared.dataset_schema()?.clone()),
109            query: sql,
110        })
111    }
112}