micromegas_analytics/lakehouse/
block_partition_spec.rs1use super::{
2 partition_source_data::{PartitionBlocksSource, PartitionSourceBlock},
3 view::{PartitionSpec, ViewMetadata},
4 write_partition::{PartitionRowSet, write_partition_from_rows},
5};
6use crate::{response_writer::Logger, time::TimeRange};
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use datafusion::arrow::datatypes::Schema;
10use futures::StreamExt;
11use micromegas_ingestion::data_lake_connection::DataLakeConnection;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::collections::HashMap;
15use std::fmt::Debug;
16use std::sync::Arc;
17
18#[async_trait]
20pub trait BlockProcessor: Send + Sync + Debug {
21 async fn process(
23 &self,
24 blob_storage: Arc<BlobStorage>,
25 src_block: Arc<PartitionSourceBlock>,
26 ) -> Result<Option<PartitionRowSet>>;
27}
28
29pub type BlockProcessorMap = HashMap<&'static str, Arc<dyn BlockProcessor>>;
33
34#[derive(Debug)]
41pub struct BlockPartitionSpec {
42 pub view_metadata: ViewMetadata,
43 pub schema: Arc<Schema>,
44 pub insert_range: TimeRange,
45 pub source_data: Arc<dyn PartitionBlocksSource>,
46 pub block_processors: Arc<BlockProcessorMap>,
47}
48
49#[async_trait]
50impl PartitionSpec for BlockPartitionSpec {
51 fn is_empty(&self) -> bool {
52 self.source_data.is_empty()
53 }
54
55 fn get_source_data_hash(&self) -> Vec<u8> {
56 self.source_data.get_source_data_hash()
57 }
58
59 #[span_fn]
60 async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()> {
61 let desc = format!(
62 "[{}, {}] {} {}",
63 self.view_metadata.view_set_name,
64 self.view_metadata.view_instance_id,
65 self.insert_range.begin.to_rfc3339(),
66 self.insert_range.end.to_rfc3339()
67 );
68 logger.write_log_entry(format!("writing {desc}")).await?;
69
70 logger
71 .write_log_entry(format!(
72 "reading {} blocks",
73 self.source_data.get_nb_blocks()
74 ))
75 .await?;
76
77 let (tx, rx) = tokio::sync::mpsc::channel(1);
80 let join_handle = spawn_with_context(write_partition_from_rows(
81 lake.clone(),
82 self.view_metadata.clone(),
83 self.schema.clone(),
84 self.insert_range,
85 self.source_data.get_source_data_hash(),
86 rx,
87 logger.clone(),
88 ));
89
90 if self.source_data.is_empty() {
92 drop(tx);
93 join_handle.await??;
94 return Ok(());
95 }
96
97 let max_size = self.source_data.get_max_payload_size() as usize;
98 let mut nb_tasks = (100 * 1024 * 1024) / max_size; nb_tasks = nb_tasks.clamp(1, 64);
100
101 let mut stream = self
102 .source_data
103 .get_blocks_stream()
104 .await
105 .map(|src_block_res| async {
106 let src_block = src_block_res.with_context(|| "get_blocks_stream")?;
107 let Some(block_processor) = self
112 .block_processors
113 .get(src_block.format.as_str())
114 .cloned()
115 else {
116 warn!(
117 "no block processor for format={} (view={}/{}); skipping block_id={}",
118 src_block.format,
119 self.view_metadata.view_set_name,
120 self.view_metadata.view_instance_id,
121 src_block.block.block_id
122 );
123 return Ok::<Option<PartitionRowSet>, anyhow::Error>(None);
124 };
125 let blob_storage = lake.blob_storage.clone();
126 let handle = spawn_with_context(async move {
127 block_processor
128 .process(blob_storage, src_block)
129 .await
130 .with_context(|| "processing source block")
131 });
132 handle.await.with_context(|| "handle.await")?
133 })
134 .buffer_unordered(nb_tasks);
135
136 while let Some(res_opt_rows) = stream.next().await {
137 match res_opt_rows {
138 Err(e) => {
139 error!("{e:?}");
140 logger.write_log_entry(format!("{e:?}")).await?;
141 }
142 Ok(Some(row_set)) => {
143 tx.send(Ok(row_set)).await?;
144 }
145 Ok(None) => {
146 debug!("empty block");
147 }
148 }
149 }
150 drop(tx);
151 join_handle.await??;
152 Ok(())
153 }
154}