1use anyhow::{Context, Result};
2use arrow_flight::decode::FlightRecordBatchStream;
3use chrono::DateTime;
4use datafusion::arrow::array::{
5 BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, TimestampNanosecondArray,
6};
7use futures::StreamExt;
8use micromegas_ingestion::data_lake_connection::DataLakeConnection;
9use micromegas_tracing::info;
10use std::sync::Arc;
11use uuid::Uuid;
12
13use crate::{
14 dfext::{string_column_accessor::string_column_by_name, typed_column::typed_column_by_name},
15 properties::{
16 properties_column_accessor::properties_column_by_name,
17 utils::extract_properties_from_properties_column,
18 },
19};
20async fn ingest_streams(
21 lake: Arc<DataLakeConnection>,
22 mut rb_stream: FlightRecordBatchStream,
23) -> Result<i64> {
24 let mut tr = lake.db_pool.begin().await?;
25 let mut nb_rows: i64 = 0;
26 while let Some(res) = rb_stream.next().await {
27 let b = res?;
28 nb_rows += b.num_rows() as i64;
29 let stream_id_column = string_column_by_name(&b, "stream_id")?;
30 let process_id_column = string_column_by_name(&b, "process_id")?;
31 let dependencies_metadata_column: &BinaryArray =
32 typed_column_by_name(&b, "dependencies_metadata")?;
33 let objects_metadata_column: &BinaryArray = typed_column_by_name(&b, "objects_metadata")?;
34 let tags_column: &GenericListArray<i32> = typed_column_by_name(&b, "tags")?;
35 let properties_accessor = properties_column_by_name(&b, "properties")?;
36 let insert_time_column: &TimestampNanosecondArray =
37 typed_column_by_name(&b, "insert_time")?;
38
39 for row in 0..b.num_rows() {
40 let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
41 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
42 let tags: Vec<String> = tags_column
43 .value(row)
44 .as_any()
45 .downcast_ref::<StringArray>()
46 .with_context(|| "casting tags")?
47 .iter()
48 .map(|item| String::from(item.unwrap_or_default()))
49 .collect();
50 let properties_map =
51 extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
52 let properties = micromegas_telemetry::property::make_properties(&properties_map);
53
54 sqlx::query("INSERT INTO streams VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (stream_id) DO NOTHING;")
55 .bind(stream_id)
56 .bind(process_id)
57 .bind(dependencies_metadata_column.value(row))
58 .bind(objects_metadata_column.value(row))
59 .bind(tags)
60 .bind(properties)
61 .bind(DateTime::from_timestamp_nanos(
62 insert_time_column.value(row),
63 ))
64 .execute(&mut *tr)
65 .await
66 .with_context(|| "inserting into streams")?;
67 }
68 }
69 tr.commit().await?;
70 info!("ingested {nb_rows} streams");
71 Ok(nb_rows)
72}
73
74async fn ingest_processes(
75 lake: Arc<DataLakeConnection>,
76 mut rb_stream: FlightRecordBatchStream,
77) -> Result<i64> {
78 let mut tr = lake.db_pool.begin().await?;
79 let mut nb_rows: i64 = 0;
80 while let Some(res) = rb_stream.next().await {
81 let b = res?;
82 nb_rows += b.num_rows() as i64;
83 let process_id_column = string_column_by_name(&b, "process_id")?;
84 let exe_column = string_column_by_name(&b, "exe")?;
85 let username_column = string_column_by_name(&b, "username")?;
86 let realname_column = string_column_by_name(&b, "realname")?;
87 let computer_column = string_column_by_name(&b, "computer")?;
88 let distro_column = string_column_by_name(&b, "distro")?;
89 let cpu_brand_column = string_column_by_name(&b, "cpu_brand")?;
90 let process_tsc_freq_column: &Int64Array = typed_column_by_name(&b, "tsc_frequency")?;
91 let start_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "start_time")?;
92 let start_ticks_column: &Int64Array = typed_column_by_name(&b, "start_ticks")?;
93 let insert_time_column: &TimestampNanosecondArray =
94 typed_column_by_name(&b, "insert_time")?;
95 let parent_process_id_column = string_column_by_name(&b, "parent_process_id")?;
96 let properties_accessor = properties_column_by_name(&b, "properties")?;
97 for row in 0..b.num_rows() {
98 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
99 let parent_process_str = parent_process_id_column.value(row)?;
100 let parent_process_id = if parent_process_str.is_empty() {
101 None
102 } else {
103 Some(Uuid::parse_str(parent_process_str)?)
104 };
105 let properties_map =
106 extract_properties_from_properties_column(properties_accessor.as_ref(), row)?;
107 let properties = micromegas_telemetry::property::make_properties(&properties_map);
108 sqlx::query(
109 "INSERT INTO processes VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) ON CONFLICT (process_id) DO NOTHING;",
110 )
111 .bind(process_id)
112 .bind(exe_column.value(row)?)
113 .bind(username_column.value(row)?)
114 .bind(realname_column.value(row)?)
115 .bind(computer_column.value(row)?)
116 .bind(distro_column.value(row)?)
117 .bind(cpu_brand_column.value(row)?)
118 .bind(process_tsc_freq_column.value(row))
119 .bind(DateTime::from_timestamp_nanos(start_time_column.value(row)))
120 .bind(start_ticks_column.value(row))
121 .bind(DateTime::from_timestamp_nanos(
122 insert_time_column.value(row),
123 ))
124 .bind(parent_process_id)
125 .bind(properties)
126 .execute(&mut *tr)
127 .await
128 .with_context(|| "executing sql insert into processes")?;
129 }
130 }
131 tr.commit().await?;
132 info!("ingested {nb_rows} processes");
133 Ok(nb_rows)
134}
135
136async fn ingest_payloads(
137 lake: Arc<DataLakeConnection>,
138 mut rb_stream: FlightRecordBatchStream,
139) -> Result<i64> {
140 let mut nb_rows: i64 = 0;
141 while let Some(res) = rb_stream.next().await {
142 let b = res?;
143 nb_rows += b.num_rows() as i64;
144 let process_id_column = string_column_by_name(&b, "process_id")?;
145 let stream_id_column = string_column_by_name(&b, "stream_id")?;
146 let block_id_column = string_column_by_name(&b, "block_id")?;
147 let payload_column: &BinaryArray = typed_column_by_name(&b, "payload")?;
148 for row in 0..b.num_rows() {
149 let process_id = process_id_column.value(row)?;
150 let stream_id = stream_id_column.value(row)?;
151 let block_id = block_id_column.value(row)?;
152 let obj_path = format!("blobs/{process_id}/{stream_id}/{block_id}");
153 let payload = bytes::Bytes::copy_from_slice(payload_column.value(row));
154 lake.blob_storage
155 .put(&obj_path, payload)
156 .await
157 .with_context(|| "Error writing block to blob storage")?;
158 }
159 }
160 info!("ingested {nb_rows} payloads");
161 Ok(nb_rows)
162}
163
164async fn ingest_blocks(
165 lake: Arc<DataLakeConnection>,
166 mut rb_stream: FlightRecordBatchStream,
167) -> Result<i64> {
168 let mut tr = lake.db_pool.begin().await?;
169 let mut nb_rows: i64 = 0;
170 while let Some(res) = rb_stream.next().await {
171 let b = res?;
172 nb_rows += b.num_rows() as i64;
173 let block_id_column = string_column_by_name(&b, "block_id")?;
174 let stream_id_column = string_column_by_name(&b, "stream_id")?;
175 let process_id_column = string_column_by_name(&b, "process_id")?;
176 let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?;
177 let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?;
178 let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?;
179 let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?;
180 let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?;
181 let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?;
182 let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?;
183 let insert_time_column: &TimestampNanosecondArray =
184 typed_column_by_name(&b, "insert_time")?;
185 for row in 0..b.num_rows() {
186 let block_id = Uuid::parse_str(block_id_column.value(row)?)?;
187 let stream_id = Uuid::parse_str(stream_id_column.value(row)?)?;
188 let process_id = Uuid::parse_str(process_id_column.value(row)?)?;
189 sqlx::query("INSERT INTO blocks VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (block_id) DO NOTHING;")
190 .bind(block_id)
191 .bind(stream_id)
192 .bind(process_id)
193 .bind(DateTime::from_timestamp_nanos(begin_time_column.value(row)))
194 .bind(begin_ticks_column.value(row))
195 .bind(DateTime::from_timestamp_nanos(end_time_column.value(row)))
196 .bind(end_ticks_column.value(row))
197 .bind(nb_objects_column.value(row))
198 .bind(object_offset_column.value(row))
199 .bind(payload_size_column.value(row))
200 .bind(DateTime::from_timestamp_nanos(
201 insert_time_column.value(row),
202 ))
203 .execute(&mut *tr)
204 .await
205 .with_context(|| "executing sql insert into blocks")?;
206 }
207 }
208 tr.commit().await?;
209 info!("ingested {nb_rows} blocks");
210 Ok(nb_rows)
211}
212
213pub async fn bulk_ingest(
215 lake: Arc<DataLakeConnection>,
216 table_name: &str,
217 rb_stream: FlightRecordBatchStream,
218) -> Result<i64> {
219 match table_name {
220 "processes" => ingest_processes(lake, rb_stream).await,
221 "streams" => ingest_streams(lake, rb_stream).await,
222 "blocks" => ingest_blocks(lake, rb_stream).await,
223 "payloads" => ingest_payloads(lake, rb_stream).await,
224 other => anyhow::bail!("bulk ingest for table {other} not supported"),
225 }
226}