micromegas_analytics/
replication.rs

1use anyhow::{Context, Result};
2use arrow_flight::decode::FlightRecordBatchStream;
3use chrono::DateTime;
4use datafusion::arrow::array::{
5    BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, TimestampNanosecondArray,
6};
7use futures::StreamExt;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::info;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14    dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
15    properties::{
16        properties_column_accessor::properties_column_by_name,
17        utils::extract_properties_from_properties_column,
18    },
19};
20async fn ingest_streams(
21    lake: Arc<DataLakeConnection>,
22    mut rb_stream: FlightRecordBatchStream,
23) -> Result<i64> {
24    let mut tr = lake.db_pool.begin().await?;
25    let mut nb_rows: i64 = 0;
26    while let Some(res) = rb_stream.next().await {
27        let b = res?;
28        nb_rows += b.num_rows() as i64;
29        let stream_id_column = string_column_by_name(&b, "stream_id")?;
30        let process_id_column = string_column_by_name(&b, "process_id")?;
31        let dependencies_metadata_column: &BinaryArray =
32            typed_column_by_name(&b, "dependencies_metadata")?;
33        let objects_metadata_column: &BinaryArray = typed_column_by_name(&b, "objects_metadata")?;
34        let tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "tags")?;
35        let properties_accessor = properties_column_by_name(&b, "properties")?;
36        let insert_time_column: &TimestampNanosecondArray =
37            typed_column_by_name(&b, "insert_time")?;
38
39        for row in 0..b.num_rows() {
40            let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
41            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
42            let tags: Vec<String> = tags_column
43                .value(row)
44                .as_any()
45                .downcast_ref::<StringArray>()
46                .with_context(|| "casting tags")?
47                .iter()
48                .map(|item| String::from(item.unwrap_or_default()))
49                .collect();
50            let properties_map =
51                extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
52            let properties = micromegas_telemetry::property::make_properties(&properties_map);
53
54            sqlx::query("INSERT INTO streams VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (stream_id) DO NOTHING;")
55                .bind(stream_id)
56                .bind(process_id)
57                .bind(dependencies_metadata_column.value(row))
58                .bind(objects_metadata_column.value(row))
59                .bind(tags)
60                .bind(properties)
61                .bind(DateTime::from_timestamp_nanos(
62                    insert_time_column.value(row),
63                ))
64                .execute(&mut *tr)
65                .await
66                .with_context(|| "inserting into streams")?;
67        }
68    }
69    tr.commit().await?;
70    info!("ingested {nb_rows} streams");
71    Ok(nb_rows)
72}
73
74async fn ingest_processes(
75    lake: Arc<DataLakeConnection>,
76    mut rb_stream: FlightRecordBatchStream,
77) -> Result<i64> {
78    let mut tr = lake.db_pool.begin().await?;
79    let mut nb_rows: i64 = 0;
80    while let Some(res) = rb_stream.next().await {
81        let b = res?;
82        nb_rows += b.num_rows() as i64;
83        let process_id_column = string_column_by_name(&b, "process_id")?;
84        let exe_column = string_column_by_name(&b, "exe")?;
85        let username_column = string_column_by_name(&b, "username")?;
86        let realname_column = string_column_by_name(&b, "realname")?;
87        let computer_column = string_column_by_name(&b, "computer")?;
88        let distro_column = string_column_by_name(&b, "distro")?;
89        let cpu_brand_column = string_column_by_name(&b, "cpu_brand")?;
90        let process_tsc_freq_column: &Int64Array = typed_column_by_name(&b, "tsc_frequency")?;
91        let start_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "start_time")?;
92        let start_ticks_column: &Int64Array = typed_column_by_name(&b, "start_ticks")?;
93        let insert_time_column: &TimestampNanosecondArray =
94            typed_column_by_name(&b, "insert_time")?;
95        let parent_process_id_column = string_column_by_name(&b, "parent_process_id")?;
96        let properties_accessor = properties_column_by_name(&b, "properties")?;
97        for row in 0..b.num_rows() {
98            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
99            let parent_process_str = parent_process_id_column.value(row)?;
100            let parent_process_id = if parent_process_str.is_empty() {
101                None
102            } else {
103                Some(Uuid::parse_str(parent_process_str)?)
104            };
105            let properties_map =
106                extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
107            let properties = micromegas_telemetry::property::make_properties(&properties_map);
108            sqlx::query(
109                "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
110            )
111            .bind(process_id)
112            .bind(exe_column.value(row)?)
113            .bind(username_column.value(row)?)
114            .bind(realname_column.value(row)?)
115            .bind(computer_column.value(row)?)
116            .bind(distro_column.value(row)?)
117            .bind(cpu_brand_column.value(row)?)
118            .bind(process_tsc_freq_column.value(row))
119            .bind(DateTime::from_timestamp_nanos(start_time_column.value(row)))
120            .bind(start_ticks_column.value(row))
121            .bind(DateTime::from_timestamp_nanos(
122                insert_time_column.value(row),
123            ))
124            .bind(parent_process_id)
125            .bind(properties)
126            .execute(&mut *tr)
127            .await
128            .with_context(|| "executing sql insert into processes")?;
129        }
130    }
131    tr.commit().await?;
132    info!("ingested {nb_rows} processes");
133    Ok(nb_rows)
134}
135
136async fn ingest_payloads(
137    lake: Arc<DataLakeConnection>,
138    mut rb_stream: FlightRecordBatchStream,
139) -> Result<i64> {
140    let mut nb_rows: i64 = 0;
141    while let Some(res) = rb_stream.next().await {
142        let b = res?;
143        nb_rows += b.num_rows() as i64;
144        let process_id_column = string_column_by_name(&b, "process_id")?;
145        let stream_id_column = string_column_by_name(&b, "stream_id")?;
146        let block_id_column = string_column_by_name(&b, "block_id")?;
147        let payload_column: &BinaryArray = typed_column_by_name(&b, "payload")?;
148        for row in 0..b.num_rows() {
149            let process_id = process_id_column.value(row)?;
150            let stream_id = stream_id_column.value(row)?;
151            let block_id = block_id_column.value(row)?;
152            let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
153            let payload = bytes::Bytes::copy_from_slice(payload_column.value(row));
154            lake.blob_storage
155                .put(&obj_path, payload)
156                .await
157                .with_context(|| "Error writing block to blob storage")?;
158        }
159    }
160    info!("ingested {nb_rows} payloads");
161    Ok(nb_rows)
162}
163
164async fn ingest_blocks(
165    lake: Arc<DataLakeConnection>,
166    mut rb_stream: FlightRecordBatchStream,
167) -> Result<i64> {
168    let mut tr = lake.db_pool.begin().await?;
169    let mut nb_rows: i64 = 0;
170    while let Some(res) = rb_stream.next().await {
171        let b = res?;
172        nb_rows += b.num_rows() as i64;
173        let block_id_column = string_column_by_name(&b, "block_id")?;
174        let stream_id_column = string_column_by_name(&b, "stream_id")?;
175        let process_id_column = string_column_by_name(&b, "process_id")?;
176        let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
177        let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
178        let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
179        let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
180        let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
181        let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
182        let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
183        let insert_time_column: &TimestampNanosecondArray =
184            typed_column_by_name(&b, "insert_time")?;
185        for row in 0..b.num_rows() {
186            let block_id = Uuid::parse_str(block_id_column.value(row)?)?;
187            let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
188            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
189            sqlx::query("INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;")
190                .bind(block_id)
191                .bind(stream_id)
192                .bind(process_id)
193                .bind(DateTime::from_timestamp_nanos(begin_time_column.value(row)))
194                .bind(begin_ticks_column.value(row))
195                .bind(DateTime::from_timestamp_nanos(end_time_column.value(row)))
196                .bind(end_ticks_column.value(row))
197                .bind(nb_objects_column.value(row))
198                .bind(object_offset_column.value(row))
199                .bind(payload_size_column.value(row))
200                .bind(DateTime::from_timestamp_nanos(
201                    insert_time_column.value(row),
202                ))
203                .execute(&mut *tr)
204                .await
205                .with_context(|| "executing sql insert into blocks")?;
206        }
207    }
208    tr.commit().await?;
209    info!("ingested {nb_rows} blocks");
210    Ok(nb_rows)
211}
212
213/// Ingests data from a FlightRecordBatchStream into the specified table.
214pub async fn bulk_ingest(
215    lake: Arc<DataLakeConnection>,
216    table_name: &str,
217    rb_stream: FlightRecordBatchStream,
218) -> Result<i64> {
219    match table_name {
220        "processes" => ingest_processes(lake, rb_stream).await,
221        "streams" => ingest_streams(lake, rb_stream).await,
222        "blocks" => ingest_blocks(lake, rb_stream).await,
223        "payloads" => ingest_payloads(lake, rb_stream).await,
224        other => anyhow::bail!("bulk ingest for table {other} not supported"),
225    }
226}