1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use datafusion::arrow::array::{Int32Array, Int64Array, RecordBatch, TimestampNanosecondArray};
4use micromegas_telemetry::{
5 property::Property, stream_info::StreamInfo, types::block::BlockMetadata,
6};
7use micromegas_tracing::prelude::*;
8use micromegas_transit::{UserDefinedType, uuid_utils::parse_optional_uuid};
9use sqlx::Row;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14 arrow_properties::serialize_properties_to_jsonb,
15 dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
16 lakehouse::{
17 lakehouse_context::LakehouseContext, partition_cache::LivePartitionProvider,
18 query::make_session_context, session_configurator::NoOpSessionConfigurator,
19 view_factory::ViewFactory,
20 },
21 properties::properties_column_accessor::properties_column_by_name,
22 time::TimeRange,
23};
24pub type SharedJsonbSerialized = Arc<Vec<u8>>;
27
28#[derive(Debug, Clone)]
35pub struct ProcessMetadata {
36 pub process_id: uuid::Uuid,
38 pub exe: String,
39 pub username: String,
40 pub realname: String,
41 pub computer: String,
42 pub distro: String,
43 pub cpu_brand: String,
44 pub tsc_frequency: i64,
45 pub start_time: chrono::DateTime<chrono::Utc>,
46 pub start_ticks: i64,
47 pub parent_process_id: Option<uuid::Uuid>,
48 pub properties: SharedJsonbSerialized,
49}
50
51#[derive(Debug, Clone)]
58pub struct StreamMetadata {
59 pub process_id: Uuid,
61 pub stream_id: Uuid,
62 pub dependencies_metadata: Vec<UserDefinedType>,
63 pub objects_metadata: Vec<UserDefinedType>,
64 pub tags: Vec<String>,
65 pub properties: SharedJsonbSerialized,
66}
67
68impl StreamMetadata {
69 pub fn from_stream_info(stream_info: &StreamInfo) -> Result<Self> {
71 let properties = serialize_properties_to_jsonb(&stream_info.properties)
72 .with_context(|| "serializing stream properties to JSONB")?;
73 Ok(Self {
74 process_id: stream_info.process_id,
75 stream_id: stream_info.stream_id,
76 dependencies_metadata: stream_info.dependencies_metadata.clone(),
77 objects_metadata: stream_info.objects_metadata.clone(),
78 tags: stream_info.tags.clone(),
79 properties: Arc::new(properties),
80 })
81 }
82}
83
84pub fn get_thread_name_from_stream_metadata(stream: &StreamMetadata) -> Result<String> {
88 use jsonb::RawJsonb;
89
90 const THREAD_NAME_KEY: &str = "thread-name";
91 const THREAD_ID_KEY: &str = "thread-id";
92
93 if stream.properties.is_empty() {
94 return Ok(format!("{}", &stream.stream_id));
95 }
96
97 let jsonb = RawJsonb::new(&stream.properties);
98
99 let thread_name = if let Ok(Some(thread_name_value)) = jsonb.get_by_name(THREAD_NAME_KEY, false)
100 && let Ok(Some(name)) = thread_name_value.as_raw().as_str()
101 {
102 Some(name.to_string())
103 } else {
104 None
105 };
106
107 let thread_id = if let Ok(Some(thread_id_value)) = jsonb.get_by_name(THREAD_ID_KEY, false)
109 && let Ok(Some(id)) = thread_id_value.as_raw().as_str()
110 {
111 id.to_string()
112 } else {
113 stream.stream_id.to_string()
114 };
115
116 match thread_name {
118 Some(name) => Ok(format!("{}-{}", name, thread_id)),
119 None => Ok(thread_id),
120 }
121}
122
123#[span_fn]
125pub fn stream_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<StreamMetadata> {
126 let dependencies_metadata_buffer: Vec<u8> = row.try_get("dependencies_metadata")?;
127 let dependencies_metadata: Vec<UserDefinedType> =
128 ciborium::from_reader(&dependencies_metadata_buffer[..])
129 .with_context(|| "decoding dependencies metadata")?;
130 let objects_metadata_buffer: Vec<u8> = row.try_get("objects_metadata")?;
131 let objects_metadata: Vec<UserDefinedType> =
132 ciborium::from_reader(&objects_metadata_buffer[..])
133 .with_context(|| "decoding objects metadata")?;
134 let tags: Vec<String> = row.try_get("tags")?;
135 let properties: Vec<Property> = row.try_get("properties")?;
136 let properties_map = micromegas_telemetry::property::into_hashmap(properties);
137 let serialized_properties = serialize_properties_to_jsonb(&properties_map)
138 .with_context(|| "serializing stream properties to JSONB")?;
139
140 Ok(StreamMetadata {
141 stream_id: row.try_get("stream_id")?,
142 process_id: row.try_get("process_id")?,
143 dependencies_metadata,
144 objects_metadata,
145 tags,
146 properties: Arc::new(serialized_properties),
147 })
148}
149
150#[span_fn]
152pub async fn find_stream(
153 pool: &sqlx::Pool<sqlx::Postgres>,
154 stream_id: sqlx::types::Uuid,
155) -> Result<StreamMetadata> {
156 let row = sqlx::query(
157 "SELECT stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties
158 FROM streams
159 WHERE stream_id = $1
160 ;",
161 )
162 .bind(stream_id)
163 .fetch_one(pool)
164 .await
165 .with_context(|| "select from streams")?;
166 stream_metadata_from_row(&row)
167}
168
169#[span_fn]
171pub fn process_metadata_from_row(row: &sqlx::postgres::PgRow) -> Result<ProcessMetadata> {
172 let properties: Vec<Property> = row.try_get("process_properties")?;
173 let properties_map = micromegas_telemetry::property::into_hashmap(properties);
174 let serialized_properties = serialize_properties_to_jsonb(&properties_map)
175 .with_context(|| "serializing process properties to JSONB")?;
176
177 Ok(ProcessMetadata {
178 process_id: row.try_get("process_id")?,
179 exe: row.try_get("exe")?,
180 username: row.try_get("username")?,
181 realname: row.try_get("realname")?,
182 computer: row.try_get("computer")?,
183 distro: row.try_get("distro")?,
184 cpu_brand: row.try_get("cpu_brand")?,
185 tsc_frequency: row.try_get("tsc_frequency")?,
186 start_time: row.try_get("start_time")?,
187 start_ticks: row.try_get("start_ticks")?,
188 parent_process_id: row.try_get("parent_process_id")?,
189 properties: Arc::new(serialized_properties),
190 })
191}
192
193#[span_fn]
195pub async fn find_process(
196 pool: &sqlx::Pool<sqlx::Postgres>,
197 process_id: &sqlx::types::Uuid,
198) -> Result<ProcessMetadata> {
199 let row = sqlx::query(
200 "SELECT process_id,
201 exe,
202 username,
203 realname,
204 computer,
205 distro,
206 cpu_brand,
207 tsc_frequency,
208 start_time,
209 start_ticks,
210 parent_process_id,
211 properties as process_properties
212 FROM processes
213 WHERE process_id = $1;",
214 )
215 .bind(process_id)
216 .fetch_one(pool)
217 .await
218 .with_context(|| "select from processes")?;
219 process_metadata_from_row(&row)
220}
221
222#[span_fn]
225pub async fn find_process_with_latest_timing(
226 lakehouse: Arc<LakehouseContext>,
227 view_factory: Arc<ViewFactory>,
228 process_id: &Uuid,
229 query_range: Option<TimeRange>,
230) -> Result<(ProcessMetadata, i64, DateTime<Utc>)> {
231 let partition_provider = Arc::new(LivePartitionProvider::new(lakehouse.lake().db_pool.clone()));
232
233 let ctx = make_session_context(
234 lakehouse,
235 partition_provider,
236 query_range,
237 view_factory,
238 Arc::new(NoOpSessionConfigurator),
239 )
240 .await
241 .with_context(|| "creating DataFusion session context")?;
242
243 let sql = format!(
244 "SELECT process_id, exe, username, realname, computer, distro, cpu_brand,
245 tsc_frequency, start_time, start_ticks, parent_process_id, properties,
246 last_block_end_ticks, last_block_end_time
247 FROM processes
248 WHERE process_id = '{}'",
249 process_id
250 );
251
252 let df = ctx
253 .sql(&sql)
254 .await
255 .with_context(|| "executing SQL query for process with timing")?;
256
257 let results = df
258 .collect()
259 .await
260 .with_context(|| "collecting results from DataFusion")?;
261
262 if results.is_empty() || results[0].num_rows() == 0 {
263 anyhow::bail!("Process not found");
264 }
265
266 let batch = &results[0];
267
268 let process_id_column = string_column_by_name(batch, "process_id")?;
270 let exe_column = string_column_by_name(batch, "exe")?;
271 let username_column = string_column_by_name(batch, "username")?;
272 let realname_column = string_column_by_name(batch, "realname")?;
273 let computer_column = string_column_by_name(batch, "computer")?;
274 let distro_column = string_column_by_name(batch, "distro")?;
275 let cpu_brand_column = string_column_by_name(batch, "cpu_brand")?;
276 let tsc_frequency_column: &Int64Array = typed_column_by_name(batch, "tsc_frequency")?;
277 let start_time_column: &TimestampNanosecondArray = typed_column_by_name(batch, "start_time")?;
278 let start_ticks_column: &Int64Array = typed_column_by_name(batch, "start_ticks")?;
279 let last_block_end_ticks_column: &Int64Array =
280 typed_column_by_name(batch, "last_block_end_ticks")?;
281 let last_block_end_time_column: &TimestampNanosecondArray =
282 typed_column_by_name(batch, "last_block_end_time")?;
283 let parent_process_id_column = string_column_by_name(batch, "parent_process_id")?;
284
285 let parent_process_id = if parent_process_id_column.is_null(0) {
286 None
287 } else {
288 parse_optional_uuid(parent_process_id_column.value(0)?)?
289 };
290
291 let properties_accessor = properties_column_by_name(batch, "properties")
293 .with_context(|| "accessing properties column")?;
294
295 let properties_jsonb = Arc::new(
297 properties_accessor
298 .jsonb_value(0)
299 .with_context(|| "extracting JSONB from properties column")?,
300 );
301
302 let process_metadata = ProcessMetadata {
303 process_id: parse_optional_uuid(process_id_column.value(0)?)?
304 .ok_or_else(|| anyhow::anyhow!("process_id cannot be empty"))?,
305 exe: exe_column.value(0)?.to_string(),
306 username: username_column.value(0)?.to_string(),
307 realname: realname_column.value(0)?.to_string(),
308 computer: computer_column.value(0)?.to_string(),
309 distro: distro_column.value(0)?.to_string(),
310 cpu_brand: cpu_brand_column.value(0)?.to_string(),
311 tsc_frequency: tsc_frequency_column.value(0),
312 start_time: DateTime::from_timestamp_nanos(start_time_column.value(0)),
313 start_ticks: start_ticks_column.value(0),
314 parent_process_id,
315 properties: properties_jsonb,
316 };
317
318 let last_block_end_ticks = last_block_end_ticks_column.value(0);
319 let last_block_end_time = DateTime::from_timestamp_nanos(last_block_end_time_column.value(0));
320
321 Ok((process_metadata, last_block_end_ticks, last_block_end_time))
322}
323#[span_fn]
325pub fn block_from_batch_row(rb: &RecordBatch, row: usize) -> Result<BlockMetadata> {
326 let block_id_column = string_column_by_name(rb, "block_id")?;
327 let stream_id_column = string_column_by_name(rb, "stream_id")?;
328 let process_id_column = string_column_by_name(rb, "process_id")?;
329 let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "begin_time")?;
330 let begin_ticks_column: &Int64Array = typed_column_by_name(rb, "begin_ticks")?;
331 let end_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "end_time")?;
332 let end_ticks_column: &Int64Array = typed_column_by_name(rb, "end_ticks")?;
333 let nb_objects_column: &Int32Array = typed_column_by_name(rb, "nb_objects")?;
334 let object_offset_column: &Int64Array = typed_column_by_name(rb, "object_offset")?;
335 let payload_size_column: &Int64Array = typed_column_by_name(rb, "payload_size")?;
336 let insert_time_column: &TimestampNanosecondArray = typed_column_by_name(rb, "insert_time")?;
337 Ok(BlockMetadata {
338 block_id: Uuid::parse_str(block_id_column.value(row)?)?,
339 stream_id: Uuid::parse_str(stream_id_column.value(row)?)?,
340 process_id: Uuid::parse_str(process_id_column.value(row)?)?,
341 begin_time: DateTime::from_timestamp_nanos(begin_time_column.value(row)),
342 end_time: DateTime::from_timestamp_nanos(end_time_column.value(row)),
343 begin_ticks: begin_ticks_column.value(row),
344 end_ticks: end_ticks_column.value(row),
345 nb_objects: nb_objects_column.value(row),
346 object_offset: object_offset_column.value(row),
347 payload_size: payload_size_column.value(row),
348 insert_time: DateTime::from_timestamp_nanos(insert_time_column.value(row)),
349 })
350}