micromegas_analytics/lakehouse/
block_partition_spec.rs

1use super::{
2    partition_source_data::{PartitionBlocksSource, PartitionSourceBlock},
3    view::{PartitionSpec, ViewMetadata},
4    write_partition::{PartitionRowSet, write_partition_from_rows},
5};
6use crate::{response_writer::Logger, time::TimeRange};
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use datafusion::arrow::datatypes::Schema;
10use futures::StreamExt;
11use micromegas_ingestion::data_lake_connection::DataLakeConnection;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::collections::HashMap;
15use std::fmt::Debug;
16use std::sync::Arc;
17
18/// BlockProcessor transforms a single block of telemetry into a set of rows
19#[async_trait]
20pub trait BlockProcessor: Send + Sync + Debug {
21    /// Processes a single block of telemetry.
22    async fn process(
23        &self,
24        blob_storage: Arc<BlobStorage>,
25        src_block: Arc<PartitionSourceBlock>,
26    ) -> Result<Option<PartitionRowSet>>;
27}
28
29/// Map from `streams.format` to the processor that handles that wire format.
30/// Views register one entry per format they understand (e.g. log entries register
31/// both `"micromegas-transit"` and `"otlp/v1/logs"`).
32pub type BlockProcessorMap = HashMap<&'static str, Arc<dyn BlockProcessor>>;
33
34/// BlockPartitionSpec processes blocks individually and out of order
35/// which works fine for measures & log entries.
36///
37/// Per-block dispatch keys on `PartitionSourceBlock::format` so a single view can
38/// materialize blocks coming from heterogeneous wire formats (native CBOR + OTLP).
39/// Unknown formats are warned and skipped.
40#[derive(Debug)]
41pub struct BlockPartitionSpec {
42    pub view_metadata: ViewMetadata,
43    pub schema: Arc<Schema>,
44    pub insert_range: TimeRange,
45    pub source_data: Arc<dyn PartitionBlocksSource>,
46    pub block_processors: Arc<BlockProcessorMap>,
47}
48
49#[async_trait]
50impl PartitionSpec for BlockPartitionSpec {
51    fn is_empty(&self) -> bool {
52        self.source_data.is_empty()
53    }
54
55    fn get_source_data_hash(&self) -> Vec<u8> {
56        self.source_data.get_source_data_hash()
57    }
58
59    #[span_fn]
60    async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()> {
61        let desc = format!(
62            "[{}, {}] {} {}",
63            self.view_metadata.view_set_name,
64            self.view_metadata.view_instance_id,
65            self.insert_range.begin.to_rfc3339(),
66            self.insert_range.end.to_rfc3339()
67        );
68        logger.write_log_entry(format!("writing {desc}")).await?;
69
70        logger
71            .write_log_entry(format!(
72                "reading {} blocks",
73                self.source_data.get_nb_blocks()
74            ))
75            .await?;
76
77        // Allow empty source data - write_partition_from_rows will create
78        // an empty partition record if no data is sent through the channel
79        let (tx, rx) = tokio::sync::mpsc::channel(1);
80        let join_handle = spawn_with_context(write_partition_from_rows(
81            lake.clone(),
82            self.view_metadata.clone(),
83            self.schema.clone(),
84            self.insert_range,
85            self.source_data.get_source_data_hash(),
86            rx,
87            logger.clone(),
88        ));
89
90        // If source data is empty, just close the channel to create an empty partition
91        if self.source_data.is_empty() {
92            drop(tx);
93            join_handle.await??;
94            return Ok(());
95        }
96
97        let max_size = self.source_data.get_max_payload_size() as usize;
98        let mut nb_tasks = (100 * 1024 * 1024) / max_size; // try to download up to 100 MB of payloads
99        nb_tasks = nb_tasks.clamp(1, 64);
100
101        let mut stream = self
102            .source_data
103            .get_blocks_stream()
104            .await
105            .map(|src_block_res| async {
106                let src_block = src_block_res.with_context(|| "get_blocks_stream")?;
107                // Per-block dispatch on `streams.format`. A view that doesn't register
108                // a processor for some format silently skips matching blocks instead of
109                // erroring — keeps the partition build moving when an unknown format
110                // shows up alongside known ones.
111                let Some(block_processor) = self
112                    .block_processors
113                    .get(src_block.format.as_str())
114                    .cloned()
115                else {
116                    warn!(
117                        "no block processor for format={} (view={}/{}); skipping block_id={}",
118                        src_block.format,
119                        self.view_metadata.view_set_name,
120                        self.view_metadata.view_instance_id,
121                        src_block.block.block_id
122                    );
123                    return Ok::<Option<PartitionRowSet>, anyhow::Error>(None);
124                };
125                let blob_storage = lake.blob_storage.clone();
126                let handle = spawn_with_context(async move {
127                    block_processor
128                        .process(blob_storage, src_block)
129                        .await
130                        .with_context(|| "processing source block")
131                });
132                handle.await.with_context(|| "handle.await")?
133            })
134            .buffer_unordered(nb_tasks);
135
136        while let Some(res_opt_rows) = stream.next().await {
137            match res_opt_rows {
138                Err(e) => {
139                    error!("{e:?}");
140                    logger.write_log_entry(format!("{e:?}")).await?;
141                }
142                Ok(Some(row_set)) => {
143                    tx.send(Ok(row_set)).await?;
144                }
145                Ok(None) => {
146                    debug!("empty block");
147                }
148            }
149        }
150        drop(tx);
151        join_handle.await??;
152        Ok(())
153    }
154}