1use anyhow::{Context, Result};
2use arrow_flight::decode::FlightRecordBatchStream;
3use chrono::DateTime;
4use datafusion::arrow::array::{
5 BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, TimestampNanosecondArray,
6};
7use futures::StreamExt;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::info;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14 dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
15 properties::{
16 properties_column_accessor::properties_column_by_name,
17 utils::extract_properties_from_properties_column,
18 },
19};
20async fn ingest_streams(
21 lake: Arc<DataLakeConnection>,
22 mut rb_stream: FlightRecordBatchStream,
23) -> Result<i64> {
24 let mut tr = lake.db_pool.begin().await?;
25 let mut nb_rows: i64 = 0;
26 while let Some(res) = rb_stream.next().await {
27 let b = res?;
28 nb_rows += b.num_rows() as i64;
29 let stream_id_column = string_column_by_name(&b, "stream_id")?;
30 let process_id_column = string_column_by_name(&b, "process_id")?;
31 let dependencies_metadata_column: &BinaryArray =
32 typed_column_by_name(&b, "dependencies_metadata")?;
33 let objects_metadata_column: &BinaryArray = typed_column_by_name(&b, "objects_metadata")?;
34 let tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "tags")?;
35 let properties_accessor = properties_column_by_name(&b, "properties")?;
36 let insert_time_column: &TimestampNanosecondArray =
37 typed_column_by_name(&b, "insert_time")?;
38 let format_column = string_column_by_name(&b, "format")?;
41
42 for row in 0..b.num_rows() {
43 let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
44 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
45 let tags: Vec<String> = tags_column
46 .value(row)
47 .as_any()
48 .downcast_ref::<StringArray>()
49 .with_context(|| "casting tags")?
50 .iter()
51 .map(|item| String::from(item.unwrap_or_default()))
52 .collect();
53 let properties_map =
54 extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
55 let properties = micromegas_telemetry::property::make_properties(&properties_map);
56
57 sqlx::query(
58 "INSERT INTO streams (stream_id, process_id, dependencies_metadata, objects_metadata, tags, properties, insert_time, format)
59 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
60 ON CONFLICT (stream_id) DO NOTHING;",
61 )
62 .bind(stream_id)
63 .bind(process_id)
64 .bind(dependencies_metadata_column.value(row))
65 .bind(objects_metadata_column.value(row))
66 .bind(tags)
67 .bind(properties)
68 .bind(DateTime::from_timestamp_nanos(
69 insert_time_column.value(row),
70 ))
71 .bind(format_column.value(row)?)
72 .execute(&mut *tr)
73 .await
74 .with_context(|| "inserting into streams")?;
75 }
76 }
77 tr.commit().await?;
78 info!("ingested {nb_rows} streams");
79 Ok(nb_rows)
80}
81
82async fn ingest_processes(
83 lake: Arc<DataLakeConnection>,
84 mut rb_stream: FlightRecordBatchStream,
85) -> Result<i64> {
86 let mut tr = lake.db_pool.begin().await?;
87 let mut nb_rows: i64 = 0;
88 while let Some(res) = rb_stream.next().await {
89 let b = res?;
90 nb_rows += b.num_rows() as i64;
91 let process_id_column = string_column_by_name(&b, "process_id")?;
92 let exe_column = string_column_by_name(&b, "exe")?;
93 let username_column = string_column_by_name(&b, "username")?;
94 let realname_column = string_column_by_name(&b, "realname")?;
95 let computer_column = string_column_by_name(&b, "computer")?;
96 let distro_column = string_column_by_name(&b, "distro")?;
97 let cpu_brand_column = string_column_by_name(&b, "cpu_brand")?;
98 let process_tsc_freq_column: &Int64Array = typed_column_by_name(&b, "tsc_frequency")?;
99 let start_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "start_time")?;
100 let start_ticks_column: &Int64Array = typed_column_by_name(&b, "start_ticks")?;
101 let insert_time_column: &TimestampNanosecondArray =
102 typed_column_by_name(&b, "insert_time")?;
103 let parent_process_id_column = string_column_by_name(&b, "parent_process_id")?;
104 let properties_accessor = properties_column_by_name(&b, "properties")?;
105 for row in 0..b.num_rows() {
106 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
107 let parent_process_str = parent_process_id_column.value(row)?;
108 let parent_process_id = if parent_process_str.is_empty() {
109 None
110 } else {
111 Some(Uuid::parse_str(parent_process_str)?)
112 };
113 let properties_map =
114 extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
115 let properties = micromegas_telemetry::property::make_properties(&properties_map);
116 sqlx::query(
117 "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
118 )
119 .bind(process_id)
120 .bind(exe_column.value(row)?)
121 .bind(username_column.value(row)?)
122 .bind(realname_column.value(row)?)
123 .bind(computer_column.value(row)?)
124 .bind(distro_column.value(row)?)
125 .bind(cpu_brand_column.value(row)?)
126 .bind(process_tsc_freq_column.value(row))
127 .bind(DateTime::from_timestamp_nanos(start_time_column.value(row)))
128 .bind(start_ticks_column.value(row))
129 .bind(DateTime::from_timestamp_nanos(
130 insert_time_column.value(row),
131 ))
132 .bind(parent_process_id)
133 .bind(properties)
134 .execute(&mut *tr)
135 .await
136 .with_context(|| "executing sql insert into processes")?;
137 }
138 }
139 tr.commit().await?;
140 info!("ingested {nb_rows} processes");
141 Ok(nb_rows)
142}
143
144async fn ingest_payloads(
145 lake: Arc<DataLakeConnection>,
146 mut rb_stream: FlightRecordBatchStream,
147) -> Result<i64> {
148 let mut nb_rows: i64 = 0;
149 while let Some(res) = rb_stream.next().await {
150 let b = res?;
151 nb_rows += b.num_rows() as i64;
152 let process_id_column = string_column_by_name(&b, "process_id")?;
153 let stream_id_column = string_column_by_name(&b, "stream_id")?;
154 let block_id_column = string_column_by_name(&b, "block_id")?;
155 let payload_column: &BinaryArray = typed_column_by_name(&b, "payload")?;
156 for row in 0..b.num_rows() {
157 let process_id = process_id_column.value(row)?;
158 let stream_id = stream_id_column.value(row)?;
159 let block_id = block_id_column.value(row)?;
160 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
161 let payload = bytes::Bytes::copy_from_slice(payload_column.value(row));
162 lake.blob_storage
163 .put(&obj_path, payload)
164 .await
165 .with_context(|| "Error writing block to blob storage")?;
166 }
167 }
168 info!("ingested {nb_rows} payloads");
169 Ok(nb_rows)
170}
171
172async fn ingest_blocks(
173 lake: Arc<DataLakeConnection>,
174 mut rb_stream: FlightRecordBatchStream,
175) -> Result<i64> {
176 let mut tr = lake.db_pool.begin().await?;
177 let mut nb_rows: i64 = 0;
178 while let Some(res) = rb_stream.next().await {
179 let b = res?;
180 nb_rows += b.num_rows() as i64;
181 let block_id_column = string_column_by_name(&b, "block_id")?;
182 let stream_id_column = string_column_by_name(&b, "stream_id")?;
183 let process_id_column = string_column_by_name(&b, "process_id")?;
184 let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
185 let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
186 let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
187 let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
188 let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
189 let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
190 let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
191 let insert_time_column: &TimestampNanosecondArray =
192 typed_column_by_name(&b, "insert_time")?;
193 for row in 0..b.num_rows() {
194 let block_id = Uuid::parse_str(block_id_column.value(row)?)?;
195 let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
196 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
197 sqlx::query("INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;")
198 .bind(block_id)
199 .bind(stream_id)
200 .bind(process_id)
201 .bind(DateTime::from_timestamp_nanos(begin_time_column.value(row)))
202 .bind(begin_ticks_column.value(row))
203 .bind(DateTime::from_timestamp_nanos(end_time_column.value(row)))
204 .bind(end_ticks_column.value(row))
205 .bind(nb_objects_column.value(row))
206 .bind(object_offset_column.value(row))
207 .bind(payload_size_column.value(row))
208 .bind(DateTime::from_timestamp_nanos(
209 insert_time_column.value(row),
210 ))
211 .execute(&mut *tr)
212 .await
213 .with_context(|| "executing sql insert into blocks")?;
214 }
215 }
216 tr.commit().await?;
217 info!("ingested {nb_rows} blocks");
218 Ok(nb_rows)
219}
220
221pub async fn bulk_ingest(
223 lake: Arc<DataLakeConnection>,
224 table_name: &str,
225 rb_stream: FlightRecordBatchStream,
226) -> Result<i64> {
227 match table_name {
228 "processes" => ingest_processes(lake, rb_stream).await,
229 "streams" => ingest_streams(lake, rb_stream).await,
230 "blocks" => ingest_blocks(lake, rb_stream).await,
231 "payloads" => ingest_payloads(lake, rb_stream).await,
232 other => anyhow::bail!("bulk ingest for table {other} not supported"),
233 }
234}