micromegas_analytics/
payload.rs1use anyhow::{Context, Result};
2use micromegas_telemetry::{blob_storage::BlobStorage, compression::decompress};
3use micromegas_tracing::{parsing::make_custom_readers, prelude::*};
4use micromegas_transit::{parse_object_buffer, read_dependencies, value::Value};
5use std::sync::Arc;
6
7use crate::metadata::StreamMetadata;
8
9#[span_fn]
11pub async fn fetch_block_payload(
12 blob_storage: Arc<BlobStorage>,
13 process_id: sqlx::types::Uuid,
14 stream_id: sqlx::types::Uuid,
15 block_id: sqlx::types::Uuid,
16) -> Result<micromegas_telemetry::block_wire_format::BlockPayload> {
17 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
18 let buffer: Vec<u8> = blob_storage
19 .read_blob(&obj_path)
20 .await
21 .with_context(|| "reading block payload from blob storage")?
22 .into();
23 {
24 span_scope!("decode");
25 let payload: micromegas_telemetry::block_wire_format::BlockPayload =
26 ciborium::from_reader(&buffer[..])
27 .with_context(|| format!("reading payload {}", &block_id))?;
28 Ok(payload)
29 }
30}
31
32#[span_fn]
35pub fn parse_block<F>(
36 stream: &StreamMetadata,
37 payload: µmegas_telemetry::block_wire_format::BlockPayload,
38 fun: F,
39) -> Result<bool>
40where
41 F: FnMut(Value) -> Result<bool>,
42{
43 let dep_udts = &stream.dependencies_metadata;
44 let custom_readers = make_custom_readers();
45 let dependencies = read_dependencies(
46 &custom_readers,
47 dep_udts,
48 &decompress(&payload.dependencies).with_context(|| "decompressing dependencies payload")?,
49 )
50 .with_context(|| "reading dependencies")?;
51 let obj_udts = &stream.objects_metadata;
52 let continue_iterating = parse_object_buffer(
53 &custom_readers,
54 &dependencies,
55 obj_udts,
56 &decompress(&payload.objects).with_context(|| "decompressing objects payload")?,
57 fun,
58 )
59 .with_context(|| "parsing object buffer")?;
60 Ok(continue_iterating)
61}