micromegas_analytics/lakehouse/
async_events_block_processor.rs

1use super::{
2    block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
3    write_partition::PartitionRowSet,
4};
5use crate::{
6    async_block_processing::{AsyncBlockProcessor, parse_async_block_payload},
7    async_events_table::{AsyncEventRecord, AsyncEventRecordBuilder},
8    payload::fetch_block_payload,
9    scope::ScopeDesc,
10    time::ConvertTicks,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use micromegas_telemetry::blob_storage::BlobStorage;
15use micromegas_tracing::prelude::*;
16use std::sync::Arc;
17
18lazy_static::lazy_static! {
19    static ref BEGIN_EVENT_TYPE: Arc<String> = Arc::new("begin".to_string());
20    static ref END_EVENT_TYPE: Arc<String> = Arc::new("end".to_string());
21}
22
23/// A `BlockProcessor` implementation for processing async event blocks.
24#[derive(Debug)]
25pub struct AsyncEventsBlockProcessor {
26    convert_ticks: Arc<ConvertTicks>,
27}
28
29impl AsyncEventsBlockProcessor {
30    pub fn new(convert_ticks: Arc<ConvertTicks>) -> Self {
31        Self { convert_ticks }
32    }
33}
34
35/// Helper struct to collect async events during processing.
36struct AsyncEventCollector {
37    record_builder: AsyncEventRecordBuilder,
38    stream_id: Arc<String>,
39    block_id: Arc<String>,
40    convert_ticks: Arc<ConvertTicks>,
41}
42
43impl AsyncEventCollector {
44    fn new(
45        capacity: usize,
46        stream_id: Arc<String>,
47        block_id: Arc<String>,
48        convert_ticks: Arc<ConvertTicks>,
49    ) -> Self {
50        Self {
51            record_builder: AsyncEventRecordBuilder::with_capacity(capacity),
52            stream_id,
53            block_id,
54            convert_ticks,
55        }
56    }
57}
58
59impl AsyncBlockProcessor for AsyncEventCollector {
60    fn on_begin_async_scope(
61        &mut self,
62        _block_id: &str,
63        scope: ScopeDesc,
64        ts: i64,
65        span_id: i64,
66        parent_span_id: i64,
67        depth: u32,
68    ) -> Result<bool> {
69        let time_ns = self.convert_ticks.ticks_to_nanoseconds(ts);
70        let record = AsyncEventRecord {
71            stream_id: self.stream_id.clone(),
72            block_id: self.block_id.clone(),
73            time: time_ns,
74            event_type: BEGIN_EVENT_TYPE.clone(),
75            span_id,
76            parent_span_id,
77            depth,
78            hash: scope.hash,
79            name: scope.name,
80            filename: scope.filename,
81            target: scope.target,
82            line: scope.line,
83        };
84        self.record_builder.append(&record)?;
85        Ok(true)
86    }
87
88    fn on_end_async_scope(
89        &mut self,
90        _block_id: &str,
91        scope: ScopeDesc,
92        ts: i64,
93        span_id: i64,
94        parent_span_id: i64,
95        depth: u32,
96    ) -> Result<bool> {
97        let time_ns = self.convert_ticks.ticks_to_nanoseconds(ts);
98        let record = AsyncEventRecord {
99            stream_id: self.stream_id.clone(),
100            block_id: self.block_id.clone(),
101            time: time_ns,
102            event_type: END_EVENT_TYPE.clone(),
103            span_id,
104            parent_span_id,
105            depth,
106            hash: scope.hash,
107            name: scope.name,
108            filename: scope.filename,
109            target: scope.target,
110            line: scope.line,
111        };
112        self.record_builder.append(&record)?;
113        Ok(true)
114    }
115}
116
117#[async_trait]
118impl BlockProcessor for AsyncEventsBlockProcessor {
119    #[span_fn]
120    async fn process(
121        &self,
122        blob_storage: Arc<BlobStorage>,
123        src_block: Arc<PartitionSourceBlock>,
124    ) -> Result<Option<PartitionRowSet>> {
125        // Use the shared ConvertTicks instance instead of creating a new one per block
126        let convert_ticks = self.convert_ticks.clone();
127        // Use nb_objects as initial capacity estimate (may contain non-async events)
128        let estimated_capacity = src_block.block.nb_objects;
129        let mut collector = AsyncEventCollector::new(
130            estimated_capacity as usize,
131            Arc::new(format!("{}", src_block.stream.stream_id)),
132            Arc::new(format!("{}", src_block.block.block_id)),
133            convert_ticks,
134        );
135        let payload = fetch_block_payload(
136            blob_storage,
137            src_block.process.process_id,
138            src_block.stream.stream_id,
139            src_block.block.block_id,
140        )
141        .await
142        .with_context(|| "fetch_block_payload")?;
143        let block_id_str = src_block
144            .block
145            .block_id
146            .hyphenated()
147            .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer())
148            .to_owned();
149        parse_async_block_payload(
150            &block_id_str,
151            0,
152            &payload,
153            &src_block.stream,
154            &mut collector,
155        )
156        .with_context(|| "parse_async_block_payload")?;
157        if let Some(time_range) = collector.record_builder.get_time_range() {
158            let record_batch = collector.record_builder.finish()?;
159            Ok(Some(PartitionRowSet {
160                rows_time_range: time_range,
161                rows: record_batch,
162            }))
163        } else {
164            Ok(None)
165        }
166    }
167}