micromegas_analytics/
replication.rs

1use anyhow::{Context, Result};
2use arrow_flight::decode::FlightRecordBatchStream;
3use chrono::DateTime;
4use datafusion::arrow::array::{
5    BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, TimestampNanosecondArray,
6};
7use futures::StreamExt;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::info;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14    dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
15    properties::{
16        properties_column_accessor::properties_column_by_name,
17        utils::extract_properties_from_properties_column,
18    },
19};
20async fn ingest_streams(
21    lake: Arc<DataLakeConnection>,
22    mut rb_stream: FlightRecordBatchStream,
23) -> Result<i64> {
24    let mut tr = lake.db_pool.begin().await?;
25    let mut nb_rows: i64 = 0;
26    while let Some(res) = rb_stream.next().await {
27        let b = res?;
28        nb_rows += b.num_rows() as i64;
29        let stream_id_column = string_column_by_name(&b, "stream_id")?;
30        let process_id_column = string_column_by_name(&b, "process_id")?;
31        let dependencies_metadata_column: &BinaryArray =
32            typed_column_by_name(&b, "dependencies_metadata")?;
33        let objects_metadata_column: &BinaryArray = typed_column_by_name(&b, "objects_metadata")?;
34        let tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "tags")?;
35        let properties_accessor = properties_column_by_name(&b, "properties")?;
36        let insert_time_column: &TimestampNanosecondArray =
37            typed_column_by_name(&b, "insert_time")?;
38        // Hard failure (rather than a silent default) on missing `format` so a
39        // v3 source replicating into a v4 target surfaces the schema mismatch loudly.
40        let format_column = string_column_by_name(&b, "format")?;
41
42        for row in 0..b.num_rows() {
43            let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
44            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
45            let tags: Vec<String> = tags_column
46                .value(row)
47                .as_any()
48                .downcast_ref::<StringArray>()
49                .with_context(|| "casting tags")?
50                .iter()
51                .map(|item| String::from(item.unwrap_or_default()))
52                .collect();
53            let properties_map =
54                extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
55            let properties = micromegas_telemetry::property::make_properties(&properties_map);
56
57            sqlx::query(
58                "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
59                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
60                 ON CONFLICT (stream_id) DO NOTHING;",
61            )
62                .bind(stream_id)
63                .bind(process_id)
64                .bind(dependencies_metadata_column.value(row))
65                .bind(objects_metadata_column.value(row))
66                .bind(tags)
67                .bind(properties)
68                .bind(DateTime::from_timestamp_nanos(
69                    insert_time_column.value(row),
70                ))
71                .bind(format_column.value(row)?)
72                .execute(&mut *tr)
73                .await
74                .with_context(|| "inserting into streams")?;
75        }
76    }
77    tr.commit().await?;
78    info!("ingested {nb_rows} streams");
79    Ok(nb_rows)
80}
81
82async fn ingest_processes(
83    lake: Arc<DataLakeConnection>,
84    mut rb_stream: FlightRecordBatchStream,
85) -> Result<i64> {
86    let mut tr = lake.db_pool.begin().await?;
87    let mut nb_rows: i64 = 0;
88    while let Some(res) = rb_stream.next().await {
89        let b = res?;
90        nb_rows += b.num_rows() as i64;
91        let process_id_column = string_column_by_name(&b, "process_id")?;
92        let exe_column = string_column_by_name(&b, "exe")?;
93        let username_column = string_column_by_name(&b, "username")?;
94        let realname_column = string_column_by_name(&b, "realname")?;
95        let computer_column = string_column_by_name(&b, "computer")?;
96        let distro_column = string_column_by_name(&b, "distro")?;
97        let cpu_brand_column = string_column_by_name(&b, "cpu_brand")?;
98        let process_tsc_freq_column: &Int64Array = typed_column_by_name(&b, "tsc_frequency")?;
99        let start_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "start_time")?;
100        let start_ticks_column: &Int64Array = typed_column_by_name(&b, "start_ticks")?;
101        let insert_time_column: &TimestampNanosecondArray =
102            typed_column_by_name(&b, "insert_time")?;
103        let parent_process_id_column = string_column_by_name(&b, "parent_process_id")?;
104        let properties_accessor = properties_column_by_name(&b, "properties")?;
105        for row in 0..b.num_rows() {
106            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
107            let parent_process_str = parent_process_id_column.value(row)?;
108            let parent_process_id = if parent_process_str.is_empty() {
109                None
110            } else {
111                Some(Uuid::parse_str(parent_process_str)?)
112            };
113            let properties_map =
114                extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
115            let properties = micromegas_telemetry::property::make_properties(&properties_map);
116            sqlx::query(
117                "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
118            )
119            .bind(process_id)
120            .bind(exe_column.value(row)?)
121            .bind(username_column.value(row)?)
122            .bind(realname_column.value(row)?)
123            .bind(computer_column.value(row)?)
124            .bind(distro_column.value(row)?)
125            .bind(cpu_brand_column.value(row)?)
126            .bind(process_tsc_freq_column.value(row))
127            .bind(DateTime::from_timestamp_nanos(start_time_column.value(row)))
128            .bind(start_ticks_column.value(row))
129            .bind(DateTime::from_timestamp_nanos(
130                insert_time_column.value(row),
131            ))
132            .bind(parent_process_id)
133            .bind(properties)
134            .execute(&mut *tr)
135            .await
136            .with_context(|| "executing sql insert into processes")?;
137        }
138    }
139    tr.commit().await?;
140    info!("ingested {nb_rows} processes");
141    Ok(nb_rows)
142}
143
144async fn ingest_payloads(
145    lake: Arc<DataLakeConnection>,
146    mut rb_stream: FlightRecordBatchStream,
147) -> Result<i64> {
148    let mut nb_rows: i64 = 0;
149    while let Some(res) = rb_stream.next().await {
150        let b = res?;
151        nb_rows += b.num_rows() as i64;
152        let process_id_column = string_column_by_name(&b, "process_id")?;
153        let stream_id_column = string_column_by_name(&b, "stream_id")?;
154        let block_id_column = string_column_by_name(&b, "block_id")?;
155        let payload_column: &BinaryArray = typed_column_by_name(&b, "payload")?;
156        for row in 0..b.num_rows() {
157            let process_id = process_id_column.value(row)?;
158            let stream_id = stream_id_column.value(row)?;
159            let block_id = block_id_column.value(row)?;
160            let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
161            let payload = bytes::Bytes::copy_from_slice(payload_column.value(row));
162            lake.blob_storage
163                .put(&obj_path, payload)
164                .await
165                .with_context(|| "Error writing block to blob storage")?;
166        }
167    }
168    info!("ingested {nb_rows} payloads");
169    Ok(nb_rows)
170}
171
172async fn ingest_blocks(
173    lake: Arc<DataLakeConnection>,
174    mut rb_stream: FlightRecordBatchStream,
175) -> Result<i64> {
176    let mut tr = lake.db_pool.begin().await?;
177    let mut nb_rows: i64 = 0;
178    while let Some(res) = rb_stream.next().await {
179        let b = res?;
180        nb_rows += b.num_rows() as i64;
181        let block_id_column = string_column_by_name(&b, "block_id")?;
182        let stream_id_column = string_column_by_name(&b, "stream_id")?;
183        let process_id_column = string_column_by_name(&b, "process_id")?;
184        let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
185        let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
186        let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
187        let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
188        let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
189        let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
190        let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
191        let insert_time_column: &TimestampNanosecondArray =
192            typed_column_by_name(&b, "insert_time")?;
193        for row in 0..b.num_rows() {
194            let block_id = Uuid::parse_str(block_id_column.value(row)?)?;
195            let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
196            let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
197            sqlx::query("INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;")
198                .bind(block_id)
199                .bind(stream_id)
200                .bind(process_id)
201                .bind(DateTime::from_timestamp_nanos(begin_time_column.value(row)))
202                .bind(begin_ticks_column.value(row))
203                .bind(DateTime::from_timestamp_nanos(end_time_column.value(row)))
204                .bind(end_ticks_column.value(row))
205                .bind(nb_objects_column.value(row))
206                .bind(object_offset_column.value(row))
207                .bind(payload_size_column.value(row))
208                .bind(DateTime::from_timestamp_nanos(
209                    insert_time_column.value(row),
210                ))
211                .execute(&mut *tr)
212                .await
213                .with_context(|| "executing sql insert into blocks")?;
214        }
215    }
216    tr.commit().await?;
217    info!("ingested {nb_rows} blocks");
218    Ok(nb_rows)
219}
220
221/// Ingests data from a FlightRecordBatchStream into the specified table.
222pub async fn bulk_ingest(
223    lake: Arc<DataLakeConnection>,
224    table_name: &str,
225    rb_stream: FlightRecordBatchStream,
226) -> Result<i64> {
227    match table_name {
228        "processes" => ingest_processes(lake, rb_stream).await,
229        "streams" => ingest_streams(lake, rb_stream).await,
230        "blocks" => ingest_blocks(lake, rb_stream).await,
231        "payloads" => ingest_payloads(lake, rb_stream).await,
232        other => anyhow::bail!("bulk ingest for table {other} not supported"),
233    }
234}