micromegas_analytics/
payload.rs

1use anyhow::{Context, Result};
2use micromegas_telemetry::{blob_storage::BlobStorage, compression::decompress};
3use micromegas_tracing::{parsing::make_custom_readers, prelude::*};
4use micromegas_transit::{parse_object_buffer, read_dependencies, value::Value};
5use std::sync::Arc;
6
7use crate::metadata::StreamMetadata;
8
9/// Fetches the payload of a block from blob storage.
10#[span_fn]
11pub async fn fetch_block_payload(
12    blob_storage: Arc<BlobStorage>,
13    process_id: sqlx::types::Uuid,
14    stream_id: sqlx::types::Uuid,
15    block_id: sqlx::types::Uuid,
16) -> Result<micromegas_telemetry::block_wire_format::BlockPayload> {
17    let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
18    let buffer: Vec<u8> = blob_storage
19        .read_blob(&obj_path)
20        .await
21        .with_context(|| "reading block payload from blob storage")?
22        .into();
23    {
24        span_scope!("decode");
25        let payload: micromegas_telemetry::block_wire_format::BlockPayload =
26            ciborium::from_reader(&buffer[..])
27                .with_context(|| format!("reading payload {}", &block_id))?;
28        Ok(payload)
29    }
30}
31
32/// Parses a block of telemetry data, calling a function for each object in the block.
33// parse_block calls fun for each object in the block until fun returns `false`
34#[span_fn]
35pub fn parse_block<F>(
36    stream: &StreamMetadata,
37    payload: &micromegas_telemetry::block_wire_format::BlockPayload,
38    fun: F,
39) -> Result<bool>
40where
41    F: FnMut(Value) -> Result<bool>,
42{
43    let dep_udts = &stream.dependencies_metadata;
44    let custom_readers = make_custom_readers();
45    let dependencies = read_dependencies(
46        &custom_readers,
47        dep_udts,
48        &decompress(&payload.dependencies).with_context(|| "decompressing dependencies payload")?,
49    )
50    .with_context(|| "reading dependencies")?;
51    let obj_udts = &stream.objects_metadata;
52    let continue_iterating = parse_object_buffer(
53        &custom_readers,
54        &dependencies,
55        obj_udts,
56        &decompress(&payload.objects).with_context(|| "decompressing objects payload")?,
57        fun,
58    )
59    .with_context(|| "parsing object buffer")?;
60    Ok(continue_iterating)
61}