micromegas_analytics/lakehouse/
async_events_block_processor.rs1use super::{
2 block_partition_spec::BlockProcessor, partition_source_data::PartitionSourceBlock,
3 write_partition::PartitionRowSet,
4};
5use crate::{
6 async_block_processing::{AsyncBlockProcessor, parse_async_block_payload},
7 async_events_table::{AsyncEventRecord, AsyncEventRecordBuilder},
8 payload::fetch_block_payload,
9 scope::ScopeDesc,
10 time::ConvertTicks,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use micromegas_telemetry::blob_storage::BlobStorage;
15use micromegas_tracing::prelude::*;
16use std::sync::Arc;
17
18lazy_static::lazy_static! {
19 static ref BEGIN_EVENT_TYPE: Arc<String> = Arc::new("begin".to_string());
20 static ref END_EVENT_TYPE: Arc<String> = Arc::new("end".to_string());
21}
22
23#[derive(Debug)]
25pub struct AsyncEventsBlockProcessor {
26 convert_ticks: Arc<ConvertTicks>,
27}
28
29impl AsyncEventsBlockProcessor {
30 pub fn new(convert_ticks: Arc<ConvertTicks>) -> Self {
31 Self { convert_ticks }
32 }
33}
34
35struct AsyncEventCollector {
37 record_builder: AsyncEventRecordBuilder,
38 stream_id: Arc<String>,
39 block_id: Arc<String>,
40 convert_ticks: Arc<ConvertTicks>,
41}
42
43impl AsyncEventCollector {
44 fn new(
45 capacity: usize,
46 stream_id: Arc<String>,
47 block_id: Arc<String>,
48 convert_ticks: Arc<ConvertTicks>,
49 ) -> Self {
50 Self {
51 record_builder: AsyncEventRecordBuilder::with_capacity(capacity),
52 stream_id,
53 block_id,
54 convert_ticks,
55 }
56 }
57}
58
59impl AsyncBlockProcessor for AsyncEventCollector {
60 fn on_begin_async_scope(
61 &mut self,
62 _block_id: &str,
63 scope: ScopeDesc,
64 ts: i64,
65 span_id: i64,
66 parent_span_id: i64,
67 depth: u32,
68 ) -> Result<bool> {
69 let time_ns = self.convert_ticks.ticks_to_nanoseconds(ts);
70 let record = AsyncEventRecord {
71 stream_id: self.stream_id.clone(),
72 block_id: self.block_id.clone(),
73 time: time_ns,
74 event_type: BEGIN_EVENT_TYPE.clone(),
75 span_id,
76 parent_span_id,
77 depth,
78 hash: scope.hash,
79 name: scope.name,
80 filename: scope.filename,
81 target: scope.target,
82 line: scope.line,
83 };
84 self.record_builder.append(&record)?;
85 Ok(true)
86 }
87
88 fn on_end_async_scope(
89 &mut self,
90 _block_id: &str,
91 scope: ScopeDesc,
92 ts: i64,
93 span_id: i64,
94 parent_span_id: i64,
95 depth: u32,
96 ) -> Result<bool> {
97 let time_ns = self.convert_ticks.ticks_to_nanoseconds(ts);
98 let record = AsyncEventRecord {
99 stream_id: self.stream_id.clone(),
100 block_id: self.block_id.clone(),
101 time: time_ns,
102 event_type: END_EVENT_TYPE.clone(),
103 span_id,
104 parent_span_id,
105 depth,
106 hash: scope.hash,
107 name: scope.name,
108 filename: scope.filename,
109 target: scope.target,
110 line: scope.line,
111 };
112 self.record_builder.append(&record)?;
113 Ok(true)
114 }
115}
116
117#[async_trait]
118impl BlockProcessor for AsyncEventsBlockProcessor {
119 #[span_fn]
120 async fn process(
121 &self,
122 blob_storage: Arc<BlobStorage>,
123 src_block: Arc<PartitionSourceBlock>,
124 ) -> Result<Option<PartitionRowSet>> {
125 let convert_ticks = self.convert_ticks.clone();
127 let estimated_capacity = src_block.block.nb_objects;
129 let mut collector = AsyncEventCollector::new(
130 estimated_capacity as usize,
131 Arc::new(format!("{}", src_block.stream.stream_id)),
132 Arc::new(format!("{}", src_block.block.block_id)),
133 convert_ticks,
134 );
135 let payload = fetch_block_payload(
136 blob_storage,
137 src_block.process.process_id,
138 src_block.stream.stream_id,
139 src_block.block.block_id,
140 )
141 .await
142 .with_context(|| "fetch_block_payload")?;
143 let block_id_str = src_block
144 .block
145 .block_id
146 .hyphenated()
147 .encode_lower(&mut sqlx::types::uuid::Uuid::encode_buffer())
148 .to_owned();
149 parse_async_block_payload(
150 &block_id_str,
151 0,
152 &payload,
153 &src_block.stream,
154 &mut collector,
155 )
156 .with_context(|| "parse_async_block_payload")?;
157 if let Some(time_range) = collector.record_builder.get_time_range() {
158 let record_batch = collector.record_builder.finish()?;
159 Ok(Some(PartitionRowSet {
160 rows_time_range: time_range,
161 rows: record_batch,
162 }))
163 } else {
164 Ok(None)
165 }
166 }
167}