micromegas_analytics/lakehouse/
log_block_processor.rs1use super::{
2 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
3 write_partition::PartitionRowSet,
4};
5use crate::{
6 log_entries_table::LogEntriesRecordBuilder, log_entry::for_each_log_entry_in_block,
7 time::make_time_converter_from_block_meta,
8};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use micromegas_telemetry::blob_storage::BlobStorage;
12use micromegas_tracing::prelude::*;
13use std::sync::Arc;
14
15#[derive(Debug)]
17pub struct LogBlockProcessor {}
18
19#[async_trait]
20impl BlockProcessor for LogBlockProcessor {
21 #[span_fn]
22 async fn process(
23 &self,
24 blob_storage: Arc<BlobStorage>,
25 src_block: Arc<PartitionSourceBlock>,
26 ) -> Result<Option<PartitionRowSet>> {
27 let convert_ticks =
28 make_time_converter_from_block_meta(&src_block.process, &src_block.block)?;
29 let nb_log_entries = src_block.block.nb_objects;
30 let mut record_builder = LogEntriesRecordBuilder::with_capacity(nb_log_entries as usize);
31 let mut entry_count = 0;
32
33 for_each_log_entry_in_block(
34 blob_storage,
35 &convert_ticks,
36 src_block.process.clone(),
37 &src_block.stream,
38 &src_block.block,
39 |log_entry| {
40 record_builder.append_entry_only(&log_entry)?;
41 entry_count += 1;
42 Ok(true) },
44 )
45 .await
46 .with_context(|| "for_each_log_entry_in_block")?;
47
48 if entry_count > 0 {
49 let stream_id_str = format!("{}", src_block.stream.stream_id);
50 let block_id_str = format!("{}", src_block.block.block_id);
51 let insert_time_nanos = src_block
52 .block
53 .insert_time
54 .timestamp_nanos_opt()
55 .with_context(|| "converting insert_time to nanoseconds")?;
56
57 record_builder.fill_constant_columns(
58 &src_block.process,
59 &stream_id_str,
60 &block_id_str,
61 insert_time_nanos,
62 entry_count,
63 )?;
64 }
65
66 if let Some(time_range) = record_builder.get_time_range() {
67 let record_batch = record_builder.finish()?;
68 Ok(Some(PartitionRowSet {
69 rows_time_range: time_range,
70 rows: record_batch,
71 }))
72 } else {
73 Ok(None)
74 }
75 }
76}