micromegas_analytics/lakehouse/
metrics_block_processor.rs

1use crate::{
2    measure::for_each_measure_in_block, metrics_table::MetricsRecordBuilder,
3    time::make_time_converter_from_block_meta,
4};
5
6use super::{
7    block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
8    write_partition::PartitionRowSet,
9};
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use micromegas_telemetry::blob_storage::BlobStorage;
13use micromegas_tracing::prelude::*;
14use std::sync::Arc;
15
16#[derive(Debug)]
17pub struct MetricsBlockProcessor {}
18
19#[async_trait]
20impl BlockProcessor for MetricsBlockProcessor {
21    #[span_fn]
22    async fn process(
23        &self,
24        blob_storage: Arc<BlobStorage>,
25        src_block: Arc<PartitionSourceBlock>,
26    ) -> Result<Option<PartitionRowSet>> {
27        let convert_ticks =
28            make_time_converter_from_block_meta(&src_block.process, &src_block.block)?;
29        let nb_measures = src_block.block.nb_objects;
30        let mut record_builder = MetricsRecordBuilder::with_capacity(nb_measures as usize);
31        let mut entry_count = 0;
32
33        for_each_measure_in_block(
34            blob_storage,
35            &convert_ticks,
36            src_block.process.clone(),
37            &src_block.stream,
38            &src_block.block,
39            |measure| {
40                record_builder.append_entry_only(&measure)?;
41                entry_count += 1;
42                Ok(true) // continue
43            },
44        )
45        .await
46        .with_context(|| "for_each_measure_in_block")?;
47
48        if entry_count > 0 {
49            let stream_id_str = format!("{}", src_block.stream.stream_id);
50            let block_id_str = format!("{}", src_block.block.block_id);
51            let insert_time_nanos = src_block
52                .block
53                .insert_time
54                .timestamp_nanos_opt()
55                .with_context(|| "converting insert_time to nanoseconds")?;
56
57            record_builder.fill_constant_columns(
58                &src_block.process,
59                &stream_id_str,
60                &block_id_str,
61                insert_time_nanos,
62                entry_count,
63            )?;
64        }
65
66        if let Some(time_range) = record_builder.get_time_range() {
67            let record_batch = record_builder.finish()?;
68            Ok(Some(PartitionRowSet::new(time_range, record_batch)))
69        } else {
70            Ok(None)
71        }
72    }
73}