micromegas_analytics/
metadata.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use datafusion::arrow::array::{Int32Array, Int64Array, RecordBatch, TimestampNanosecondArray};
4use micromegas_telemetry::{
5    property::Property, stream_info::StreamInfo, types::block::BlockMetadata,
6};
7use micromegas_tracing::prelude::*;
8use micromegas_transit::{UserDefinedType, uuid_utils::parse_optional_uuid};
9use sqlx::Row;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14    arrow_properties::serialize_properties_to_jsonb,
15    dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
16    lakehouse::{
17        lakehouse_context::LakehouseContext, partition_cache::LivePartitionProvider,
18        query::make_session_context, session_configurator::NoOpSessionConfigurator,
19        view_factory::ViewFactory,
20    },
21    properties::properties_column_accessor::properties_column_by_name,
22    time::TimeRange,
23};
24/// Type alias for shared, pre-serialized JSONB data.
25/// This represents JSONB properties that have been serialized once and can be reused.
26pub type SharedJsonbSerialized = Arc<Vec<u8>>;
27
28/// Analytics-optimized process metadata.
29///
30/// This struct is designed for analytics use cases where process properties need to be
31/// efficiently serialized to JSONB format multiple times. Unlike `ProcessInfo`, which
32/// uses `HashMap<String, String>` for properties, this struct stores pre-serialized
33/// JSONB data to eliminate redundant serialization overhead.
34#[derive(Debug, Clone)]
35pub struct ProcessMetadata {
36    // Core fields (same as ProcessInfo)
37    pub process_id: uuid::Uuid,
38    pub exe: String,
39    pub username: String,
40    pub realname: String,
41    pub computer: String,
42    pub distro: String,
43    pub cpu_brand: String,
44    pub tsc_frequency: i64,
45    pub start_time: chrono::DateTime<chrono::Utc>,
46    pub start_ticks: i64,
47    pub parent_process_id: Option<uuid::Uuid>,
48    pub properties: SharedJsonbSerialized,
49}
50
51/// Analytics-optimized stream metadata.
52///
53/// This struct is designed for analytics use cases where stream properties need to be
54/// efficiently serialized to JSONB format multiple times. Unlike `StreamInfo`, which
55/// uses `HashMap<String, String>` for properties, this struct stores pre-serialized
56/// JSONB data to eliminate redundant serialization overhead.
57#[derive(Debug, Clone)]
58pub struct StreamMetadata {
59    // Core fields (same as StreamInfo)
60    pub process_id: Uuid,
61    pub stream_id: Uuid,
62    pub dependencies_metadata: Vec<UserDefinedType>,
63    pub objects_metadata: Vec<UserDefinedType>,
64    pub tags: Vec<String>,
65    pub properties: SharedJsonbSerialized,
66}
67
68impl StreamMetadata {
69    /// Creates StreamMetadata from StreamInfo by converting properties to JSONB format.
70    pub fn from_stream_info(stream_info: &StreamInfo) -> Result<Self> {
71        let properties = serialize_properties_to_jsonb(&stream_info.properties)
72            .with_context(|| "serializing stream properties to JSONB")?;
73        Ok(Self {
74            process_id: stream_info.process_id,
75            stream_id: stream_info.stream_id,
76            dependencies_metadata: stream_info.dependencies_metadata.clone(),
77            objects_metadata: stream_info.objects_metadata.clone(),
78            tags: stream_info.tags.clone(),
79            properties: Arc::new(properties),
80        })
81    }
82}
83
84/// Returns the thread name associated with the stream, if available.
85/// This function is only meaningful for streams associated with CPU threads.
86/// Returns format: "thread-name-thread-id" (e.g., "main-12345") or just "thread-id" if no name.
87pub fn get_thread_name_from_stream_metadata(stream: &StreamMetadata) -> Result<String> {
88    use jsonb::RawJsonb;
89
90    const THREAD_NAME_KEY: &str = "thread-name";
91    const THREAD_ID_KEY: &str = "thread-id";
92
93    if stream.properties.is_empty() {
94        return Ok(format!("{}", &stream.stream_id));
95    }
96
97    let jsonb = RawJsonb::new(&stream.properties);
98
99    let thread_name = if let Ok(Some(thread_name_value)) = jsonb.get_by_name(THREAD_NAME_KEY, false)
100        && let Ok(Some(name)) = thread_name_value.as_raw().as_str()
101    {
102        Some(name.to_string())
103    } else {
104        None
105    };
106
107    // thread_id falls back to stream_id
108    let thread_id = if let Ok(Some(thread_id_value)) = jsonb.get_by_name(THREAD_ID_KEY, false)
109        && let Ok(Some(id)) = thread_id_value.as_raw().as_str()
110    {
111        id.to_string()
112    } else {
113        stream.stream_id.to_string()
114    };
115
116    // Return "name-id" if name exists, otherwise just "id"
117    match thread_name {
118        Some(name) => Ok(format!("{}-{}", name, thread_id)),
119        None => Ok(thread_id),
120    }
121}
122
123/// Creates a `StreamMetadata` from a database row with pre-serialized JSONB properties.
124#[span_fn]
125pub fn stream_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<StreamMetadata> {
126    let dependencies_metadata_buffer: Vec<u8> = row.try_get("dependencies_metadata")?;
127    let dependencies_metadata: Vec<UserDefinedType> =
128        ciborium::from_reader(&dependencies_metadata_buffer[..])
129            .with_context(|| "decoding dependencies metadata")?;
130    let objects_metadata_buffer: Vec<u8> = row.try_get("objects_metadata")?;
131    let objects_metadata: Vec<UserDefinedType> =
132        ciborium::from_reader(&objects_metadata_buffer[..])
133            .with_context(|| "decoding objects metadata")?;
134    let tags: Vec<String> = row.try_get("tags")?;
135    let properties: Vec<Property> = row.try_get("properties")?;
136    let properties_map = micromegas_telemetry::property::into_hashmap(properties);
137    let serialized_properties = serialize_properties_to_jsonb(&properties_map)
138        .with_context(|| "serializing stream properties to JSONB")?;
139
140    Ok(StreamMetadata {
141        stream_id: row.try_get("stream_id")?,
142        process_id: row.try_get("process_id")?,
143        dependencies_metadata,
144        objects_metadata,
145        tags,
146        properties: Arc::new(serialized_properties),
147    })
148}
149
150/// Finds a stream by its ID and returns it as StreamMetadata.
151#[span_fn]
152pub async fn find_stream(
153    pool: &sqlx::Pool<sqlx::Postgres>,
154    stream_id: sqlx::types::Uuid,
155) -> Result<StreamMetadata> {
156    let row = sqlx::query(
157        "SELECT stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties
158         FROM streams
159         WHERE stream_id = $1
160         ;",
161    )
162    .bind(stream_id)
163    .fetch_one(pool)
164    .await
165    .with_context(|| "select from streams")?;
166    stream_metadata_from_row(&row)
167}
168
169/// Creates a `ProcessMetadata` from a database row with pre-serialized JSONB properties.
170#[span_fn]
171pub fn process_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<ProcessMetadata> {
172    let properties: Vec<Property> = row.try_get("process_properties")?;
173    let properties_map = micromegas_telemetry::property::into_hashmap(properties);
174    let serialized_properties = serialize_properties_to_jsonb(&properties_map)
175        .with_context(|| "serializing process properties to JSONB")?;
176
177    Ok(ProcessMetadata {
178        process_id: row.try_get("process_id")?,
179        exe: row.try_get("exe")?,
180        username: row.try_get("username")?,
181        realname: row.try_get("realname")?,
182        computer: row.try_get("computer")?,
183        distro: row.try_get("distro")?,
184        cpu_brand: row.try_get("cpu_brand")?,
185        tsc_frequency: row.try_get("tsc_frequency")?,
186        start_time: row.try_get("start_time")?,
187        start_ticks: row.try_get("start_ticks")?,
188        parent_process_id: row.try_get("parent_process_id")?,
189        properties: Arc::new(serialized_properties),
190    })
191}
192
193/// Finds a process by its ID and returns it as ProcessMetadata with pre-serialized JSONB properties.
194#[span_fn]
195pub async fn find_process(
196    pool: &sqlx::Pool<sqlx::Postgres>,
197    process_id: &sqlx::types::Uuid,
198) -> Result<ProcessMetadata> {
199    let row = sqlx::query(
200        "SELECT process_id,
201                exe,
202                username,
203                realname,
204                computer,
205                distro,
206                cpu_brand,
207                tsc_frequency,
208                start_time,
209                start_ticks,
210                parent_process_id,
211                properties as process_properties
212         FROM processes
213         WHERE process_id = $1;",
214    )
215    .bind(process_id)
216    .fetch_one(pool)
217    .await
218    .with_context(|| "select from processes")?;
219    process_metadata_from_row(&row)
220}
221
222/// Finds a process and its latest timing information using DataFusion (optimized version).
223/// Returns (ProcessMetadata, last_block_end_ticks, last_block_end_time)
224#[span_fn]
225pub async fn find_process_with_latest_timing(
226    lakehouse: Arc<LakehouseContext>,
227    view_factory: Arc<ViewFactory>,
228    process_id: &Uuid,
229    query_range: Option<TimeRange>,
230) -> Result<(ProcessMetadata, i64, DateTime<Utc>)> {
231    let partition_provider = Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
232
233    let ctx = make_session_context(
234        lakehouse,
235        partition_provider,
236        query_range,
237        view_factory,
238        Arc::new(NoOpSessionConfigurator),
239    )
240    .await
241    .with_context(|| "creating DataFusion session context")?;
242
243    let sql = format!(
244        "SELECT process_id, exe, username, realname, computer, distro, cpu_brand,
245                tsc_frequency, start_time, start_ticks, parent_process_id, properties,
246                last_block_end_ticks, last_block_end_time
247         FROM processes
248         WHERE process_id = '{}'",
249        process_id
250    );
251
252    let df = ctx
253        .sql(&sql)
254        .await
255        .with_context(|| "executing SQL query for process with timing")?;
256
257    let results = df
258        .collect()
259        .await
260        .with_context(|| "collecting results from DataFusion")?;
261
262    if results.is_empty() || results[0].num_rows() == 0 {
263        anyhow::bail!("Process not found");
264    }
265
266    let batch = &results[0];
267
268    // Extract all the required columns
269    let process_id_column = string_column_by_name(batch, "process_id")?;
270    let exe_column = string_column_by_name(batch, "exe")?;
271    let username_column = string_column_by_name(batch, "username")?;
272    let realname_column = string_column_by_name(batch, "realname")?;
273    let computer_column = string_column_by_name(batch, "computer")?;
274    let distro_column = string_column_by_name(batch, "distro")?;
275    let cpu_brand_column = string_column_by_name(batch, "cpu_brand")?;
276    let tsc_frequency_column: &Int64Array = typed_column_by_name(batch, "tsc_frequency")?;
277    let start_time_column: &TimestampNanosecondArray = typed_column_by_name(batch, "start_time")?;
278    let start_ticks_column: &Int64Array = typed_column_by_name(batch, "start_ticks")?;
279    let last_block_end_ticks_column: &Int64Array =
280        typed_column_by_name(batch, "last_block_end_ticks")?;
281    let last_block_end_time_column: &TimestampNanosecondArray =
282        typed_column_by_name(batch, "last_block_end_time")?;
283    let parent_process_id_column = string_column_by_name(batch, "parent_process_id")?;
284
285    let parent_process_id = if parent_process_id_column.is_null(0) {
286        None
287    } else {
288        parse_optional_uuid(parent_process_id_column.value(0)?)?
289    };
290
291    // Handle properties column using PropertiesColumnAccessor
292    let properties_accessor = properties_column_by_name(batch, "properties")
293        .with_context(|| "accessing properties column")?;
294
295    // Get JSONB bytes directly from the properties column
296    let properties_jsonb = Arc::new(
297        properties_accessor
298            .jsonb_value(0)
299            .with_context(|| "extracting JSONB from properties column")?,
300    );
301
302    let process_metadata = ProcessMetadata {
303        process_id: parse_optional_uuid(process_id_column.value(0)?)?
304            .ok_or_else(|| anyhow::anyhow!("process_id cannot be empty"))?,
305        exe: exe_column.value(0)?.to_string(),
306        username: username_column.value(0)?.to_string(),
307        realname: realname_column.value(0)?.to_string(),
308        computer: computer_column.value(0)?.to_string(),
309        distro: distro_column.value(0)?.to_string(),
310        cpu_brand: cpu_brand_column.value(0)?.to_string(),
311        tsc_frequency: tsc_frequency_column.value(0),
312        start_time: DateTime::from_timestamp_nanos(start_time_column.value(0)),
313        start_ticks: start_ticks_column.value(0),
314        parent_process_id,
315        properties: properties_jsonb,
316    };
317
318    let last_block_end_ticks = last_block_end_ticks_column.value(0);
319    let last_block_end_time = DateTime::from_timestamp_nanos(last_block_end_time_column.value(0));
320
321    Ok((process_metadata, last_block_end_ticks, last_block_end_time))
322}
323/// Creates a `BlockMetadata` from a recordbatch row.
324#[span_fn]
325pub fn block_from_batch_row(rb: &RecordBatch, row: usize) -> Result<BlockMetadata> {
326    let block_id_column = string_column_by_name(rb, "block_id")?;
327    let stream_id_column = string_column_by_name(rb, "stream_id")?;
328    let process_id_column = string_column_by_name(rb, "process_id")?;
329    let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "begin_time")?;
330    let begin_ticks_column: &Int64Array = typed_column_by_name(rb, "begin_ticks")?;
331    let end_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "end_time")?;
332    let end_ticks_column: &Int64Array = typed_column_by_name(rb, "end_ticks")?;
333    let nb_objects_column: &Int32Array = typed_column_by_name(rb, "nb_objects")?;
334    let object_offset_column: &Int64Array = typed_column_by_name(rb, "object_offset")?;
335    let payload_size_column: &Int64Array = typed_column_by_name(rb, "payload_size")?;
336    let insert_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "insert_time")?;
337    Ok(BlockMetadata {
338        block_id: Uuid::parse_str(block_id_column.value(row)?)?,
339        stream_id: Uuid::parse_str(stream_id_column.value(row)?)?,
340        process_id: Uuid::parse_str(process_id_column.value(row)?)?,
341        begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(row)),
342        end_time: DateTime::from_timestamp_nanos(end_time_column.value(row)),
343        begin_ticks: begin_ticks_column.value(row),
344        end_ticks: end_ticks_column.value(row),
345        nb_objects: nb_objects_column.value(row),
346        object_offset: object_offset_column.value(row),
347        payload_size: payload_size_column.value(row),
348        insert_time: DateTime::from_timestamp_nanos(insert_time_column.value(row)),
349    })
350}