micromegas_analytics/lakehouse/
partition_source_data.rs

1use super::blocks_view::blocks_file_schema_hash;
2use super::lakehouse_context::LakehouseContext;
3use super::partition_cache::PartitionCache;
4use crate::dfext::{
5    string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
6};
7use crate::metadata::{ProcessMetadata, StreamMetadata};
8use crate::properties::properties_column_accessor::properties_column_by_name;
9use crate::time::TimeRange;
10use crate::{
11    dfext::typed_column::typed_column,
12    lakehouse::{blocks_view::blocks_view_schema, query::query_partitions},
13};
14use anyhow::{Context, Result};
15use async_stream::try_stream;
16use async_trait::async_trait;
17use chrono::DateTime;
18use datafusion::functions_aggregate::{count::count_all, expr_fn::sum, min_max::max};
19use datafusion::{
20    arrow::array::{
21        Array, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray,
22        TimestampNanosecondArray,
23    },
24    prelude::*,
25};
26use futures::{StreamExt, stream::BoxStream};
27use micromegas_telemetry::types::block::BlockMetadata;
28use std::fmt::Debug;
29use std::sync::Arc;
30use uuid::Uuid;
31
32/// Represents a single block of source data for a partition.
33#[derive(Debug)]
34pub struct PartitionSourceBlock {
35    pub block: BlockMetadata,
36    pub stream: Arc<StreamMetadata>,
37    pub process: Arc<ProcessMetadata>,
38    /// Wire-format identifier for the block payload — keys per-block dispatch
39    /// in `BlockPartitionSpec` and `write_partition_from_blocks`. Examples:
40    /// `"micromegas-transit"`, `"otlp/v1/logs"`, `"otlp/v1/metrics"`, `"otlp/v1/traces"`.
41    pub format: String,
42}
43
44/// A trait for providing blocks of source data for partitions.
45#[async_trait]
46pub trait PartitionBlocksSource: Sync + Send + Debug {
47    /// Returns true if there are no blocks in the source.
48    fn is_empty(&self) -> bool;
49    /// Returns the number of blocks in the source.
50    fn get_nb_blocks(&self) -> i64;
51    /// Returns the maximum payload size of the blocks in the source.
52    fn get_max_payload_size(&self) -> i64;
53    /// Returns a hash of the source data.
54    fn get_source_data_hash(&self) -> Vec<u8>;
55    /// Returns a stream of the source blocks.
56    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>>;
57}
58
59/// A `PartitionBlocksSource` implementation that stores blocks in memory.
60#[derive(Debug)]
61pub struct SourceDataBlocksInMemory {
62    pub blocks: Vec<Arc<PartitionSourceBlock>>,
63    pub block_ids_hash: Vec<u8>,
64}
65
66#[async_trait]
67impl PartitionBlocksSource for SourceDataBlocksInMemory {
68    fn is_empty(&self) -> bool {
69        self.blocks.is_empty()
70    }
71
72    fn get_nb_blocks(&self) -> i64 {
73        self.blocks.len() as i64
74    }
75
76    fn get_max_payload_size(&self) -> i64 {
77        let mut max_size = self.blocks[0].block.payload_size;
78        for block in &self.blocks {
79            max_size = max_size.max(block.block.payload_size);
80        }
81        max_size
82    }
83
84    fn get_source_data_hash(&self) -> Vec<u8> {
85        self.block_ids_hash.clone()
86    }
87
88    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
89        let stream = futures::stream::iter(self.blocks.clone()).map(Ok);
90        stream.boxed()
91    }
92}
93
94/// A `PartitionBlocksSource` implementation that fetches blocks from a DataFusion DataFrame.
95#[derive(Debug)]
96pub struct SourceDataBlocks {
97    pub blocks_dataframe: DataFrame,
98    pub object_count: i64,
99    pub block_count: i64,
100    pub max_payload: i64,
101}
102
103#[async_trait]
104impl PartitionBlocksSource for SourceDataBlocks {
105    fn is_empty(&self) -> bool {
106        self.object_count == 0
107    }
108
109    fn get_nb_blocks(&self) -> i64 {
110        self.block_count
111    }
112
113    fn get_max_payload_size(&self) -> i64 {
114        self.max_payload
115    }
116
117    fn get_source_data_hash(&self) -> Vec<u8> {
118        self.object_count.to_le_bytes().to_vec()
119    }
120
121    async fn get_blocks_stream(&self) -> BoxStream<'static, Result<Arc<PartitionSourceBlock>>> {
122        let df = self.blocks_dataframe.clone();
123        Box::pin(try_stream! {
124            let mut stream = df.execute_stream().await?;
125            while let Some(res) = stream.next().await {
126                let b = res.with_context(|| "fetching blocks query results")?;
127                let block_id_column = string_column_by_name(&b, "block_id")?;
128                let stream_id_column = string_column_by_name(&b, "stream_id")?;
129                let process_id_column = string_column_by_name(&b, "process_id")?;
130                let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
131                let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
132                let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
133                let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
134                let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
135                let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
136                let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
137                let block_insert_time_column: &TimestampNanosecondArray =
138                    typed_column_by_name(&b, "insert_time")?;
139                let dependencies_metadata_column: &BinaryArray =
140                    typed_column_by_name(&b, "streams.dependencies_metadata")?;
141                let objects_metadata_column: &BinaryArray =
142                    typed_column_by_name(&b, "streams.objects_metadata")?;
143                let stream_tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "streams.tags")?;
144                let stream_properties_accessor = properties_column_by_name(&b, "streams.properties")?;
145                let stream_format_column = string_column_by_name(&b, "streams.format")?;
146
147                let process_start_time_column: &TimestampNanosecondArray =
148                    typed_column_by_name(&b, "processes.start_time")?;
149                let process_start_ticks_column: &Int64Array =
150                    typed_column_by_name(&b, "processes.start_ticks")?;
151                let process_tsc_freq_column: &Int64Array =
152                    typed_column_by_name(&b, "processes.tsc_frequency")?;
153                let process_exe_column = string_column_by_name(&b, "processes.exe")?;
154                let process_username_column = string_column_by_name(&b, "processes.username")?;
155                let process_realname_column = string_column_by_name(&b, "processes.realname")?;
156                let process_computer_column = string_column_by_name(&b, "processes.computer")?;
157                let process_distro_column = string_column_by_name(&b, "processes.distro")?;
158                let process_cpu_column = string_column_by_name(&b, "processes.cpu_brand")?;
159                let process_parent_column = string_column_by_name(&b, "processes.parent_process_id")?;
160                let process_properties_accessor = properties_column_by_name(&b, "processes.properties")?;
161                for ir in 0..b.num_rows() {
162                    let block_insert_time = block_insert_time_column.value(ir);
163                    let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)?;
164                    let process_id = Uuid::parse_str(process_id_column.value(ir)?)?;
165                    let block = BlockMetadata {
166                        block_id: Uuid::parse_str(block_id_column.value(ir)?)?,
167                        stream_id,
168                        process_id,
169                        begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(ir)),
170                        end_time: DateTime::from_timestamp_nanos(end_time_column.value(ir)),
171                        begin_ticks: begin_ticks_column.value(ir),
172                        end_ticks: end_ticks_column.value(ir),
173                        nb_objects: nb_objects_column.value(ir),
174                        payload_size: payload_size_column.value(ir),
175                        object_offset: object_offset_column.value(ir),
176                        insert_time: DateTime::from_timestamp_nanos(block_insert_time),
177                    };
178
179                    let dependencies_metadata = dependencies_metadata_column.value(ir);
180                    let objects_metadata = objects_metadata_column.value(ir);
181                    let stream_tags = stream_tags_column
182                        .value(ir)
183                        .as_any()
184                        .downcast_ref::<StringArray>()
185                        .with_context(|| "casting stream_tags")?
186                        .iter()
187                        .map(|item| String::from(item.unwrap_or_default()))
188                        .collect();
189                    let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
190                    let stream = StreamMetadata {
191                        process_id,
192                        stream_id,
193                        dependencies_metadata: ciborium::from_reader(dependencies_metadata)
194                            .with_context(|| "decoding dependencies_metadata")?,
195                        objects_metadata: ciborium::from_reader(objects_metadata)
196                            .with_context(|| "decoding objects_metadata")?,
197                        tags: stream_tags,
198                        properties: Arc::new(stream_properties_jsonb),
199                    };
200                    let process_properties_jsonb = process_properties_accessor.jsonb_value(ir)?;
201                    let parent_value = process_parent_column.value(ir)?;
202                    let parent_process_id = if parent_value.is_empty() {
203                        None
204                    } else {
205                        Some(Uuid::parse_str(parent_value).with_context(|| "parsing parent process_id")?)
206                    };
207
208                    let process = ProcessMetadata {
209                        process_id,
210                        exe: process_exe_column.value(ir)?.into(),
211                        username: process_username_column.value(ir)?.into(),
212                        realname: process_realname_column.value(ir)?.into(),
213                        computer: process_computer_column.value(ir)?.into(),
214                        distro: process_distro_column.value(ir)?.into(),
215                        cpu_brand: process_cpu_column.value(ir)?.into(),
216                        tsc_frequency: process_tsc_freq_column.value(ir),
217                        start_time: DateTime::from_timestamp_nanos(process_start_time_column.value(ir)),
218                        start_ticks: process_start_ticks_column.value(ir),
219                        parent_process_id,
220                        properties: Arc::new(process_properties_jsonb),
221                    };
222                    let format = stream_format_column.value(ir)?.to_string();
223                    yield Arc::new(PartitionSourceBlock {
224                        block,
225                        stream: Arc::new(stream),
226                        process: Arc::new(process),
227                        format,
228                    });
229                }
230            }
231
232        })
233    }
234}
235
236/// Converts a hash (expected to be an i64 as bytes) to an object count.
237pub fn hash_to_object_count(hash: &[u8]) -> Result<i64> {
238    Ok(i64::from_le_bytes(
239        hash.try_into().with_context(|| "hash_to_object_count")?,
240    ))
241}
242
243/// Fetches partition source data from the data lake.
244pub async fn fetch_partition_source_data(
245    lakehouse: Arc<LakehouseContext>,
246    existing_partitions: Arc<PartitionCache>,
247    insert_range: TimeRange,
248    source_stream_tag: &str,
249) -> Result<SourceDataBlocks> {
250    let begin_rfc = insert_range.begin.to_rfc3339();
251    let end_rfc = insert_range.end.to_rfc3339();
252    let sql = format!(
253        r#"
254          SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
255              object_offset, payload_size, insert_time,
256              "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties", "streams.format",
257              "processes.start_time", "processes.start_ticks", "processes.tsc_frequency", "processes.exe",
258              "processes.username", "processes.realname", "processes.computer", "processes.distro", "processes.cpu_brand",
259              "processes.parent_process_id", "processes.properties"
260          FROM source
261          WHERE array_has( "streams.tags", '{source_stream_tag}' )
262          AND insert_time >= '{begin_rfc}'
263          AND insert_time < '{end_rfc}'
264          ;"#
265    );
266    let block_partitions = existing_partitions
267        .filter("blocks", "global", &blocks_file_schema_hash(), insert_range)
268        .partitions;
269    let reader_factory = lakehouse.reader_factory().clone();
270    let df = query_partitions(
271        lakehouse.runtime().clone(),
272        reader_factory,
273        lakehouse.lake().blob_storage.inner(),
274        Arc::new(blocks_view_schema()),
275        Arc::new(block_partitions),
276        &sql,
277    )
278    .await
279    .with_context(|| "blocks query")?;
280    let blocks_stats_df = df.clone().aggregate(
281        vec![],
282        vec![
283            sum(col("nb_objects")),
284            count_all(),
285            max(col("payload_size")),
286        ],
287    )?;
288    let blocks_stats_rbs = blocks_stats_df.collect().await?;
289    if blocks_stats_rbs.len() != 1 {
290        anyhow::bail!("nb_objects_rbs has size {}", blocks_stats_rbs.len());
291    }
292    if blocks_stats_rbs[0].num_rows() != 1 {
293        anyhow::bail!(
294            "nb_objects_rbs[0] has size {}",
295            blocks_stats_rbs[0].num_rows()
296        );
297    }
298    let sub_nb_objects_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 0)?;
299    let object_count = sub_nb_objects_column.value(0);
300    let block_count_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 1)?;
301    let block_count = block_count_column.value(0);
302    let max_payload_size_column: &Int64Array = typed_column(&blocks_stats_rbs[0], 2)?;
303    let max_payload = max_payload_size_column.value(0);
304
305    Ok(SourceDataBlocks {
306        blocks_dataframe: df,
307        object_count,
308        block_count,
309        max_payload,
310    })
311}