micromegas_analytics/lakehouse/
metadata_partition_spec.rs

1use super::{
2    dataframe_time_bounds::DataFrameTimeBounds,
3    view::{PartitionSpec, ViewMetadata},
4};
5use crate::{
6    lakehouse::write_partition::{PartitionRowSet, write_partition_from_rows},
7    response_writer::Logger,
8    sql_arrow_bridge::rows_to_record_batch,
9    time::TimeRange,
10};
11use anyhow::{Context, Result};
12use async_trait::async_trait;
13use datafusion::{arrow::datatypes::Schema, prelude::*};
14use micromegas_ingestion::data_lake_connection::DataLakeConnection;
15use micromegas_tracing::prelude::*;
16use sqlx::Row;
17use std::sync::Arc;
18
19#[derive(Debug)]
20pub struct MetadataPartitionSpec {
21    pub view_metadata: ViewMetadata,
22    pub schema: Arc<Schema>,
23    pub insert_range: TimeRange,
24    pub record_count: i64,
25    pub data_sql: Arc<String>,
26    pub compute_time_bounds: Arc<dyn DataFrameTimeBounds>,
27}
28
29pub async fn fetch_metadata_partition_spec(
30    pool: &sqlx::PgPool,
31    source_count_query: &str,
32    data_sql: Arc<String>,
33    view_metadata: ViewMetadata,
34    schema: Arc<Schema>,
35    insert_range: TimeRange,
36    compute_time_bounds: Arc<dyn DataFrameTimeBounds>,
37) -> Result<MetadataPartitionSpec> {
38    //todo: extract this query to allow join (instead of source_table)
39    let row = sqlx::query(source_count_query)
40        .bind(insert_range.begin)
41        .bind(insert_range.end)
42        .fetch_one(pool)
43        .await
44        .with_context(|| "select count source metadata")?;
45    Ok(MetadataPartitionSpec {
46        view_metadata,
47        schema,
48        insert_range,
49        record_count: row.try_get("count").with_context(|| "reading count")?,
50        data_sql,
51        compute_time_bounds,
52    })
53}
54
55#[async_trait]
56impl PartitionSpec for MetadataPartitionSpec {
57    fn is_empty(&self) -> bool {
58        self.record_count < 1
59    }
60
61    fn get_source_data_hash(&self) -> Vec<u8> {
62        self.record_count.to_le_bytes().to_vec()
63    }
64
65    async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()> {
66        // Allow empty record_count - write_partition_from_rows will create
67        // an empty partition record if no data is sent through the channel
68        let desc = format!(
69            "[{}, {}] {} {}",
70            self.view_metadata.view_set_name,
71            self.view_metadata.view_instance_id,
72            self.insert_range.begin.to_rfc3339(),
73            self.insert_range.end.to_rfc3339()
74        );
75        logger.write_log_entry(format!("writing {desc}")).await?;
76
77        let rows = sqlx::query(&self.data_sql)
78            .bind(self.insert_range.begin)
79            .bind(self.insert_range.end)
80            .fetch_all(&lake.db_pool)
81            .await?;
82        let row_count = rows.len() as i64;
83
84        let (tx, rx) = tokio::sync::mpsc::channel(1);
85        let join_handle = spawn_with_context(write_partition_from_rows(
86            lake.clone(),
87            self.view_metadata.clone(),
88            self.schema.clone(),
89            self.insert_range,
90            row_count.to_le_bytes().to_vec(),
91            rx,
92            logger.clone(),
93        ));
94
95        // Only send data if we have rows
96        if row_count > 0 {
97            let record_batch =
98                rows_to_record_batch(&rows).with_context(|| "converting rows to record batch")?;
99            drop(rows);
100            let ctx = SessionContext::new();
101            let event_time_range = self
102                .compute_time_bounds
103                .get_time_bounds(ctx.read_batch(record_batch.clone())?)
104                .await?;
105            tx.send(PartitionRowSet::new(event_time_range, record_batch))
106                .await?;
107        }
108
109        drop(tx);
110        join_handle.await??;
111        Ok(())
112    }
113}