micromegas_analytics/lakehouse/
block_partition_spec.rs

1use super::{
2    partition_source_data::{PartitionBlocksSource, PartitionSourceBlock},
3    view::{PartitionSpec, ViewMetadata},
4    write_partition::{PartitionRowSet, write_partition_from_rows},
5};
6use crate::{response_writer::Logger, time::TimeRange};
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use datafusion::arrow::datatypes::Schema;
10use futures::StreamExt;
11use micromegas_ingestion::data_lake_connection::DataLakeConnection;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17/// BlockProcessor transforms a single block of telemetry into a set of rows
18#[async_trait]
19pub trait BlockProcessor: Send + Sync + Debug {
20    /// Processes a single block of telemetry.
21    async fn process(
22        &self,
23        blob_storage: Arc<BlobStorage>,
24        src_block: Arc<PartitionSourceBlock>,
25    ) -> Result<Option<PartitionRowSet>>;
26}
27
28/// BlockPartitionSpec processes blocks individually and out of order
29/// which works fine for measures & log entries
30#[derive(Debug)]
31pub struct BlockPartitionSpec {
32    pub view_metadata: ViewMetadata,
33    pub schema: Arc<Schema>,
34    pub insert_range: TimeRange,
35    pub source_data: Arc<dyn PartitionBlocksSource>,
36    pub block_processor: Arc<dyn BlockProcessor>,
37}
38
39#[async_trait]
40impl PartitionSpec for BlockPartitionSpec {
41    fn is_empty(&self) -> bool {
42        self.source_data.is_empty()
43    }
44
45    fn get_source_data_hash(&self) -> Vec<u8> {
46        self.source_data.get_source_data_hash()
47    }
48
49    #[span_fn]
50    async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()> {
51        let desc = format!(
52            "[{}, {}] {} {}",
53            self.view_metadata.view_set_name,
54            self.view_metadata.view_instance_id,
55            self.insert_range.begin.to_rfc3339(),
56            self.insert_range.end.to_rfc3339()
57        );
58        logger.write_log_entry(format!("writing {desc}")).await?;
59
60        logger
61            .write_log_entry(format!(
62                "reading {} blocks",
63                self.source_data.get_nb_blocks()
64            ))
65            .await?;
66
67        // Allow empty source data - write_partition_from_rows will create
68        // an empty partition record if no data is sent through the channel
69        let (tx, rx) = tokio::sync::mpsc::channel(1);
70        let join_handle = spawn_with_context(write_partition_from_rows(
71            lake.clone(),
72            self.view_metadata.clone(),
73            self.schema.clone(),
74            self.insert_range,
75            self.source_data.get_source_data_hash(),
76            rx,
77            logger.clone(),
78        ));
79
80        // If source data is empty, just close the channel to create an empty partition
81        if self.source_data.is_empty() {
82            drop(tx);
83            join_handle.await??;
84            return Ok(());
85        }
86
87        let max_size = self.source_data.get_max_payload_size() as usize;
88        let mut nb_tasks = (100 * 1024 * 1024) / max_size; // try to download up to 100 MB of payloads
89        nb_tasks = nb_tasks.clamp(1, 64);
90
91        let mut stream = self
92            .source_data
93            .get_blocks_stream()
94            .await
95            .map(|src_block_res| async {
96                let src_block = src_block_res.with_context(|| "get_blocks_stream")?;
97                let block_processor = self.block_processor.clone();
98                let blob_storage = lake.blob_storage.clone();
99                let handle = spawn_with_context(async move {
100                    block_processor
101                        .process(blob_storage, src_block)
102                        .await
103                        .with_context(|| "processing source block")
104                });
105                handle.await.with_context(|| "handle.await")?
106            })
107            .buffer_unordered(nb_tasks);
108
109        while let Some(res_opt_rows) = stream.next().await {
110            match res_opt_rows {
111                Err(e) => {
112                    error!("{e:?}");
113                    logger.write_log_entry(format!("{e:?}")).await?;
114                }
115                Ok(Some(row_set)) => {
116                    tx.send(row_set).await?;
117                }
118                Ok(None) => {
119                    debug!("empty block");
120                }
121            }
122        }
123        drop(tx);
124        join_handle.await??;
125        Ok(())
126    }
127}