micromegas_analytics/lakehouse/
metrics_block_processor.rs1use crate::{
2 measure::for_each_measure_in_block, metrics_table::MetricsRecordBuilder,
3 time::make_time_converter_from_block_meta,
4};
5
6use super::{
7 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
8 write_partition::PartitionRowSet,
9};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::sync::Arc;
15
16#[derive(Debug)]
17pub struct MetricsBlockProcessor {}
18
19#[async_trait]
20impl BlockProcessor for MetricsBlockProcessor {
21 #[span_fn]
22 async fn process(
23 &self,
24 blob_storage: Arc<BlobStorage>,
25 src_block: Arc<PartitionSourceBlock>,
26 ) -> Result<Option<PartitionRowSet>> {
27 let convert_ticks =
28 make_time_converter_from_block_meta(&src_block.process, &src_block.block)?;
29 let nb_measures = src_block.block.nb_objects;
30 let mut record_builder = MetricsRecordBuilder::with_capacity(nb_measures as usize);
31 let mut entry_count = 0;
32
33 for_each_measure_in_block(
34 blob_storage,
35 &convert_ticks,
36 src_block.process.clone(),
37 &src_block.stream,
38 &src_block.block,
39 |measure| {
40 record_builder.append_entry_only(&measure)?;
41 entry_count += 1;
42 Ok(true) },
44 )
45 .await
46 .with_context(|| "for_each_measure_in_block")?;
47
48 if entry_count > 0 {
49 let stream_id_str = format!("{}", src_block.stream.stream_id);
50 let block_id_str = format!("{}", src_block.block.block_id);
51 let insert_time_nanos = src_block
52 .block
53 .insert_time
54 .timestamp_nanos_opt()
55 .with_context(|| "converting insert_time to nanoseconds")?;
56
57 record_builder.fill_constant_columns(
58 &src_block.process,
59 &stream_id_str,
60 &block_id_str,
61 insert_time_nanos,
62 entry_count,
63 )?;
64 }
65
66 if let Some(time_range) = record_builder.get_time_range() {
67 let record_batch = record_builder.finish()?;
68 Ok(Some(PartitionRowSet::new(time_range, record_batch)))
69 } else {
70 Ok(None)
71 }
72 }
73}