micromegas_analytics/lakehouse/
block_partition_spec.rs1use super::{
2 partition_source_data::{PartitionBlocksSource, PartitionSourceBlock},
3 view::{PartitionSpec, ViewMetadata},
4 write_partition::{PartitionRowSet, write_partition_from_rows},
5};
6use crate::{response_writer::Logger, time::TimeRange};
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use datafusion::arrow::datatypes::Schema;
10use futures::StreamExt;
11use micromegas_ingestion::data_lake_connection::DataLakeConnection;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17#[async_trait]
19pub trait BlockProcessor: Send + Sync + Debug {
20 async fn process(
22 &self,
23 blob_storage: Arc<BlobStorage>,
24 src_block: Arc<PartitionSourceBlock>,
25 ) -> Result<Option<PartitionRowSet>>;
26}
27
28#[derive(Debug)]
31pub struct BlockPartitionSpec {
32 pub view_metadata: ViewMetadata,
33 pub schema: Arc<Schema>,
34 pub insert_range: TimeRange,
35 pub source_data: Arc<dyn PartitionBlocksSource>,
36 pub block_processor: Arc<dyn BlockProcessor>,
37}
38
39#[async_trait]
40impl PartitionSpec for BlockPartitionSpec {
41 fn is_empty(&self) -> bool {
42 self.source_data.is_empty()
43 }
44
45 fn get_source_data_hash(&self) -> Vec<u8> {
46 self.source_data.get_source_data_hash()
47 }
48
49 #[span_fn]
50 async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()> {
51 let desc = format!(
52 "[{}, {}] {} {}",
53 self.view_metadata.view_set_name,
54 self.view_metadata.view_instance_id,
55 self.insert_range.begin.to_rfc3339(),
56 self.insert_range.end.to_rfc3339()
57 );
58 logger.write_log_entry(format!("writing {desc}")).await?;
59
60 logger
61 .write_log_entry(format!(
62 "reading {} blocks",
63 self.source_data.get_nb_blocks()
64 ))
65 .await?;
66
67 let (tx, rx) = tokio::sync::mpsc::channel(1);
70 let join_handle = spawn_with_context(write_partition_from_rows(
71 lake.clone(),
72 self.view_metadata.clone(),
73 self.schema.clone(),
74 self.insert_range,
75 self.source_data.get_source_data_hash(),
76 rx,
77 logger.clone(),
78 ));
79
80 if self.source_data.is_empty() {
82 drop(tx);
83 join_handle.await??;
84 return Ok(());
85 }
86
87 let max_size = self.source_data.get_max_payload_size() as usize;
88 let mut nb_tasks = (100 * 1024 * 1024) / max_size; nb_tasks = nb_tasks.clamp(1, 64);
90
91 let mut stream = self
92 .source_data
93 .get_blocks_stream()
94 .await
95 .map(|src_block_res| async {
96 let src_block = src_block_res.with_context(|| "get_blocks_stream")?;
97 let block_processor = self.block_processor.clone();
98 let blob_storage = lake.blob_storage.clone();
99 let handle = spawn_with_context(async move {
100 block_processor
101 .process(blob_storage, src_block)
102 .await
103 .with_context(|| "processing source block")
104 });
105 handle.await.with_context(|| "handle.await")?
106 })
107 .buffer_unordered(nb_tasks);
108
109 while let Some(res_opt_rows) = stream.next().await {
110 match res_opt_rows {
111 Err(e) => {
112 error!("{e:?}");
113 logger.write_log_entry(format!("{e:?}")).await?;
114 }
115 Ok(Some(row_set)) => {
116 tx.send(row_set).await?;
117 }
118 Ok(None) => {
119 debug!("empty block");
120 }
121 }
122 }
123 drop(tx);
124 join_handle.await??;
125 Ok(())
126 }
127}