1use crate::{
2 arrow_utils::serialize_parquet_metadata, lakehouse::async_parquet_writer::AsyncParquetWriter,
3 response_writer::Logger, time::TimeRange,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, TimeDelta, Utc};
7use datafusion::{
8 arrow::{array::RecordBatch, datatypes::Schema},
9 parquet::{
10 arrow::AsyncArrowWriter,
11 basic::Compression,
12 file::{
13 metadata::ParquetMetaData,
14 properties::{WriterProperties, WriterVersion},
15 },
16 },
17};
18use micromegas_ingestion::data_lake_connection::DataLakeConnection;
19use micromegas_tracing::prelude::*;
20use object_store::buffered::BufWriter;
21use sqlx::Row;
22use std::collections::hash_map::DefaultHasher;
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, atomic::AtomicI64};
25use tokio::sync::mpsc::Receiver;
26
27use super::{partition::Partition, partition_source_data, view::ViewMetadata};
28
29pub async fn add_file_for_cleanup(
34 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
35 file_path: &str,
36 file_size: i64,
37) -> Result<()> {
38 let expiration = Utc::now()
39 + TimeDelta::try_hours(1)
40 .with_context(|| "calculating expiration time for temporary file")?;
41
42 sqlx::query("INSERT INTO temporary_files VALUES ($1, $2, $3)")
43 .bind(file_path)
44 .bind(file_size)
45 .bind(expiration)
46 .execute(&mut **transaction)
47 .await
48 .with_context(|| format!("adding file {file_path} to temporary files for cleanup"))?;
49
50 Ok(())
51}
52
53pub struct PartitionRowSet {
55 pub rows_time_range: TimeRange,
56 pub rows: RecordBatch,
57}
58
59impl PartitionRowSet {
60 pub fn new(rows_time_range: TimeRange, rows: RecordBatch) -> Self {
61 Self {
62 rows_time_range,
63 rows,
64 }
65 }
66}
67
68pub async fn retire_expired_partitions(
70 lake: &DataLakeConnection,
71 expiration: DateTime<Utc>,
72) -> Result<()> {
73 let mut transaction = lake.db_pool.begin().await?;
74 let old_partitions = sqlx::query(
75 "SELECT file_path, file_size
76 FROM lakehouse_partitions
77 WHERE end_insert_time < $1
78 ;",
79 )
80 .bind(expiration)
81 .fetch_all(&mut *transaction)
82 .await
83 .with_context(|| "listing expired partitions")?;
84
85 let mut file_paths = Vec::new();
86 for old_part in &old_partitions {
87 let file_path: Option<String> = old_part.try_get("file_path")?;
88 let file_size: i64 = old_part.try_get("file_size")?;
89 if let Some(path) = file_path {
90 info!("adding out of date partition {path} to temporary files to be deleted");
91 add_file_for_cleanup(&mut transaction, &path, file_size).await?;
92 file_paths.push(path);
93 }
94 }
95
96 sqlx::query(
97 "DELETE from lakehouse_partitions
98 WHERE end_insert_time < $1
99 ;",
100 )
101 .bind(expiration)
102 .execute(&mut *transaction)
103 .await
104 .with_context(|| "deleting expired partitions")?;
105 transaction.commit().await.with_context(|| "commit")?;
106 Ok(())
107}
108
109pub async fn retire_partitions(
112 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
113 view_set_name: &str,
114 view_instance_id: &str,
115 begin_insert_time: DateTime<Utc>,
116 end_insert_time: DateTime<Utc>,
117 logger: Arc<dyn Logger>,
118) -> Result<()> {
119 let old_partitions = if begin_insert_time == end_insert_time {
125 sqlx::query(
127 "SELECT file_path, file_size
128 FROM lakehouse_partitions
129 WHERE view_set_name = $1
130 AND view_instance_id = $2
131 AND begin_insert_time = $3
132 AND end_insert_time = $3
133 ;",
134 )
135 .bind(view_set_name)
136 .bind(view_instance_id)
137 .bind(begin_insert_time)
138 .fetch_all(&mut **transaction)
139 .await
140 .with_context(|| "listing old partitions (exact match)")?
141 } else {
142 sqlx::query(
144 "SELECT file_path, file_size
145 FROM lakehouse_partitions
146 WHERE view_set_name = $1
147 AND view_instance_id = $2
148 AND begin_insert_time >= $3
149 AND end_insert_time <= $4
150 ;",
151 )
152 .bind(view_set_name)
153 .bind(view_instance_id)
154 .bind(begin_insert_time)
155 .bind(end_insert_time)
156 .fetch_all(&mut **transaction)
157 .await
158 .with_context(|| "listing old partitions (range)")?
159 };
160
161 if !old_partitions.is_empty() {
163 logger
164 .write_log_entry(format!(
165 "[RETIRE_FOUND] view={}/{} time_range=[{}, {}] found_partitions={}",
166 view_set_name,
167 view_instance_id,
168 begin_insert_time,
169 end_insert_time,
170 old_partitions.len()
171 ))
172 .await?;
173 }
174
175 let mut file_paths = Vec::new();
176 for old_part in &old_partitions {
177 let file_path: Option<String> = old_part.try_get("file_path")?;
178 let file_size: i64 = old_part.try_get("file_size")?;
179 if let Some(path) = file_path {
180 logger
181 .write_log_entry(format!(
182 "adding out of date partition {path} to temporary files to be deleted"
183 ))
184 .await?;
185 add_file_for_cleanup(transaction, &path, file_size).await?;
186 file_paths.push(path);
187 }
188 }
189
190 if begin_insert_time == end_insert_time {
191 sqlx::query(
193 "DELETE from lakehouse_partitions
194 WHERE view_set_name = $1
195 AND view_instance_id = $2
196 AND begin_insert_time = $3
197 AND end_insert_time = $3
198 ;",
199 )
200 .bind(view_set_name)
201 .bind(view_instance_id)
202 .bind(begin_insert_time)
203 .execute(&mut **transaction)
204 .await
205 .with_context(|| "deleting out of date partitions (exact match)")?
206 } else {
207 sqlx::query(
209 "DELETE from lakehouse_partitions
210 WHERE view_set_name = $1
211 AND view_instance_id = $2
212 AND begin_insert_time >= $3
213 AND end_insert_time <= $4
214 ;",
215 )
216 .bind(view_set_name)
217 .bind(view_instance_id)
218 .bind(begin_insert_time)
219 .bind(end_insert_time)
220 .execute(&mut **transaction)
221 .await
222 .with_context(|| "deleting out of date partitions (range)")?
223 };
224 Ok(())
225}
226
227fn generate_partition_lock_key(
229 view_set_name: &str,
230 view_instance_id: &str,
231 begin_insert_time: DateTime<Utc>,
232 end_insert_time: DateTime<Utc>,
233) -> i64 {
234 let mut hasher = DefaultHasher::new();
235 view_set_name.hash(&mut hasher);
236 view_instance_id.hash(&mut hasher);
237 begin_insert_time.hash(&mut hasher);
238 end_insert_time.hash(&mut hasher);
239 hasher.finish() as i64
240}
241
242async fn insert_partition(
243 lake: &DataLakeConnection,
244 partition: &Partition,
245 file_metadata: Option<&Arc<ParquetMetaData>>,
246 logger: Arc<dyn Logger>,
247) -> Result<()> {
248 let lock_key = generate_partition_lock_key(
250 &partition.view_metadata.view_set_name,
251 &partition.view_metadata.view_instance_id,
252 partition.begin_insert_time(),
253 partition.end_insert_time(),
254 );
255
256 let mut transaction = lake.db_pool.begin().await?;
257
258 debug!(
259 "[PARTITION_LOCK] view={}/{} time_range=[{}, {}] lock_key={} - acquiring advisory lock",
260 &partition.view_metadata.view_set_name,
261 &partition.view_metadata.view_instance_id,
262 partition.begin_insert_time(),
263 partition.end_insert_time(),
264 lock_key
265 );
266
267 sqlx::query("SELECT pg_advisory_xact_lock($1);")
270 .bind(lock_key)
271 .execute(&mut *transaction)
272 .await
273 .with_context(|| "acquiring advisory lock")?;
274
275 let source_row_count = partition_source_data::hash_to_object_count(&partition.source_data_hash)
277 .with_context(|| "decoding source_data_hash to row count")?;
278
279 logger
281 .write_log_entry(format!(
282 "[PARTITION_WRITE_START] view={}/{} time_range=[{}, {}] source_rows={} - lock acquired",
283 &partition.view_metadata.view_set_name,
284 &partition.view_metadata.view_instance_id,
285 partition.begin_insert_time(),
286 partition.end_insert_time(),
287 source_row_count
288 ))
289 .await?;
290
291 retire_partitions(
294 &mut transaction,
295 &partition.view_metadata.view_set_name,
296 &partition.view_metadata.view_instance_id,
297 partition.begin_insert_time(),
298 partition.end_insert_time(),
299 logger.clone(),
300 )
301 .await
302 .with_context(|| "retire_partitions")?;
303
304 debug!(
305 "[PARTITION_INSERT_ATTEMPT] view={}/{} time_range=[{}, {}] source_rows={} file_path={:?}",
306 &partition.view_metadata.view_set_name,
307 &partition.view_metadata.view_instance_id,
308 partition.begin_insert_time(),
309 partition.end_insert_time(),
310 source_row_count,
311 partition.file_path
312 );
313
314 if let (Some(file_path), Some(metadata)) = (&partition.file_path, file_metadata) {
317 let metadata_bytes = serialize_parquet_metadata(metadata)
318 .with_context(|| "serializing parquet metadata for dedicated table")?;
319 let insert_time = sqlx::types::chrono::Utc::now();
320
321 sqlx::query(
322 "INSERT INTO partition_metadata (file_path, metadata, insert_time, partition_format_version)
323 VALUES ($1, $2, $3, 2)",
324 )
325 .bind(file_path)
326 .bind(metadata_bytes.as_ref())
327 .bind(insert_time)
328 .execute(&mut *transaction)
329 .await
330 .with_context(|| format!("inserting metadata for file: {}", file_path))?;
331 }
332
333 let insert_result = sqlx::query(
335 "INSERT INTO lakehouse_partitions VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 2);",
336 )
337 .bind(&*partition.view_metadata.view_set_name)
338 .bind(&*partition.view_metadata.view_instance_id)
339 .bind(partition.begin_insert_time())
340 .bind(partition.end_insert_time())
341 .bind(partition.min_event_time())
342 .bind(partition.max_event_time())
343 .bind(partition.updated)
344 .bind(&partition.file_path)
345 .bind(partition.file_size)
346 .bind(&partition.view_metadata.file_schema_hash)
347 .bind(&partition.source_data_hash)
348 .bind(partition.num_rows)
349 .execute(&mut *transaction)
350 .await;
351
352 match insert_result {
353 Ok(_) => {
354 debug!(
355 "[PARTITION_INSERT_SUCCESS] view={}/{} time_range=[{}, {}] source_rows={}",
356 &partition.view_metadata.view_set_name,
357 &partition.view_metadata.view_instance_id,
358 partition.begin_insert_time(),
359 partition.end_insert_time(),
360 source_row_count
361 );
362 }
363 Err(ref e) => {
364 logger
365 .write_log_entry(format!(
366 "[PARTITION_INSERT_ERROR] view={}/{} time_range=[{}, {}] source_rows={} error={}",
367 &partition.view_metadata.view_set_name,
368 &partition.view_metadata.view_instance_id,
369 partition.begin_insert_time(),
370 partition.end_insert_time(),
371 source_row_count,
372 e
373 ))
374 .await?;
375 return Err(insert_result.unwrap_err().into());
376 }
377 };
378
379 transaction.commit().await.with_context(|| "commit")?;
381
382 info!(
383 "[PARTITION_WRITE_COMMIT] view={}/{} time_range=[{}, {}] file_path={:?} - lock released",
384 &partition.view_metadata.view_set_name,
385 &partition.view_metadata.view_instance_id,
386 partition.begin_insert_time(),
387 partition.end_insert_time(),
388 partition.file_path
389 );
390 Ok(())
391}
392
393struct PartitionWriteResult {
395 num_rows: i64,
396 file_metadata: Option<Arc<ParquetMetaData>>,
397 file_path: Option<String>,
398 file_size: i64,
399 event_time_range: Option<TimeRange>,
400}
401
402async fn write_rows_and_track_times(
404 rb_stream: &mut Receiver<PartitionRowSet>,
405 arrow_writer: &mut AsyncArrowWriter<AsyncParquetWriter>,
406 logger: &Arc<dyn Logger>,
407 desc: &str,
408) -> Result<Option<TimeRange>> {
409 let mut min_event_time: Option<DateTime<Utc>> = None;
410 let mut max_event_time: Option<DateTime<Utc>> = None;
411 let mut write_progression = 0;
412
413 while let Some(row_set) = rb_stream.recv().await {
414 min_event_time = Some(
415 min_event_time
416 .unwrap_or(row_set.rows_time_range.begin)
417 .min(row_set.rows_time_range.begin),
418 );
419 max_event_time = Some(
420 max_event_time
421 .unwrap_or(row_set.rows_time_range.end)
422 .max(row_set.rows_time_range.end),
423 );
424 arrow_writer
425 .write(&row_set.rows)
426 .await
427 .with_context(|| "arrow_writer.write")?;
428 if arrow_writer.in_progress_size() > 100 * 1024 * 1024 {
429 arrow_writer
430 .flush()
431 .await
432 .with_context(|| "arrow_writer.flush")?;
433 }
434
435 let progression = arrow_writer.bytes_written() / (10 * 1024 * 1024);
437 if progression != write_progression {
438 write_progression = progression;
439 let written = arrow_writer.bytes_written();
440 logger
441 .write_log_entry(format!("{desc}: written {written} bytes"))
442 .await
443 .with_context(|| "writing log entry")?;
444 }
445 }
446
447 Ok(match (min_event_time, max_event_time) {
448 (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
449 _ => None,
450 })
451}
452
453async fn finalize_partition_write(
455 event_time_range: Option<TimeRange>,
456 arrow_writer: AsyncArrowWriter<AsyncParquetWriter>,
457 file_path: String,
458 byte_counter: &Arc<AtomicI64>,
459 logger: &Arc<dyn Logger>,
460 desc: &str,
461 object_store: Arc<dyn object_store::ObjectStore>,
462) -> Result<PartitionWriteResult> {
463 if let Some(event_time_range) = event_time_range {
464 let close_result = arrow_writer.close().await;
466
467 match close_result {
468 Ok(parquet_metadata) => {
469 let num_rows = parquet_metadata.file_metadata().num_rows();
470
471 if num_rows == 0 {
474 logger
476 .write_log_entry(format!(
477 "created 0-row file, treating as empty partition for {desc}"
478 ))
479 .await
480 .with_context(|| "writing log entry")?;
481
482 let path = object_store::path::Path::from(file_path.as_str());
484 if let Err(delete_err) = object_store.delete(&path).await {
485 warn!("failed to delete empty file {}: {}", file_path, delete_err);
486 }
487
488 return Ok(PartitionWriteResult {
489 num_rows: 0,
490 file_metadata: None,
491 file_path: None,
492 file_size: 0,
493 event_time_range: None,
494 });
495 }
496
497 debug!(
499 "wrote nb_rows={} size={} path={file_path}",
500 num_rows,
501 byte_counter.load(std::sync::atomic::Ordering::Relaxed)
502 );
503 let file_metadata = Arc::new(parquet_metadata);
504 let file_size = byte_counter.load(std::sync::atomic::Ordering::Relaxed);
505 Ok(PartitionWriteResult {
506 num_rows,
507 file_metadata: Some(file_metadata),
508 file_path: Some(file_path),
509 file_size,
510 event_time_range: Some(event_time_range),
511 })
512 }
513 Err(e) => {
514 warn!(
516 "arrow_writer.close failed, attempting to delete partial file: {}",
517 file_path
518 );
519 let path = object_store::path::Path::from(file_path.as_str());
520 if let Err(delete_err) = object_store.delete(&path).await {
521 warn!(
522 "failed to delete partial file {}: {}",
523 file_path, delete_err
524 );
525 }
526 Err(e).with_context(|| "arrow_writer.close")
527 }
528 }
529 } else {
530 drop(arrow_writer);
533
534 logger
535 .write_log_entry(format!("creating empty partition record for {desc}"))
536 .await
537 .with_context(|| "writing log entry")?;
538
539 let path = object_store::path::Path::from(file_path.as_str());
542 let _ = object_store.delete(&path).await;
543
544 Ok(PartitionWriteResult {
545 num_rows: 0,
546 file_metadata: None,
547 file_path: None,
548 file_size: 0,
549 event_time_range: None,
550 })
551 }
552}
553
554pub async fn write_partition_from_rows(
556 lake: Arc<DataLakeConnection>,
557 view_metadata: ViewMetadata,
558 file_schema: Arc<Schema>,
559 insert_range: TimeRange,
560 source_data_hash: Vec<u8>,
561 mut rb_stream: Receiver<PartitionRowSet>,
562 logger: Arc<dyn Logger>,
563) -> Result<()> {
564 let file_id = uuid::Uuid::new_v4();
565 let file_path = format!(
566 "views/{}/{}/{}/{}_{file_id}.parquet",
567 &view_metadata.view_set_name,
568 &view_metadata.view_instance_id,
569 insert_range.begin.format("%Y-%m-%d"),
570 insert_range.begin.format("%H-%M-%S")
571 );
572 let byte_counter = Arc::new(AtomicI64::new(0));
573 let object_store_writer = AsyncParquetWriter::new(
574 BufWriter::new(
575 lake.blob_storage.inner(),
576 object_store::path::Path::parse(&file_path).with_context(|| "parsing path")?,
577 )
578 .with_max_concurrency(2),
579 byte_counter.clone(),
580 );
581
582 let props = WriterProperties::builder()
585 .set_writer_version(WriterVersion::PARQUET_2_0)
586 .set_compression(Compression::LZ4_RAW)
587 .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
590 .build();
591 let mut arrow_writer =
592 AsyncArrowWriter::try_new(object_store_writer, file_schema.clone(), Some(props))
593 .with_context(|| "allocating async arrow writer")?;
594
595 let desc = format!(
596 "[{}, {}] {} {}",
597 view_metadata.view_set_name,
598 view_metadata.view_instance_id,
599 insert_range.begin.to_rfc3339(),
600 insert_range.end.to_rfc3339()
601 );
602
603 let event_time_range =
605 write_rows_and_track_times(&mut rb_stream, &mut arrow_writer, &logger, &desc).await?;
606
607 let result = finalize_partition_write(
609 event_time_range,
610 arrow_writer,
611 file_path,
612 &byte_counter,
613 &logger,
614 &desc,
615 lake.blob_storage.inner(),
616 )
617 .await?;
618
619 insert_partition(
620 &lake,
621 &Partition {
622 view_metadata,
623 insert_time_range: insert_range,
624 event_time_range: result.event_time_range,
625 updated: sqlx::types::chrono::Utc::now(),
626 file_path: result.file_path,
627 file_size: result.file_size,
628 source_data_hash,
629 num_rows: result.num_rows,
630 },
631 result.file_metadata.as_ref(),
632 logger,
633 )
634 .await
635 .with_context(|| "insert_partition")?;
636 Ok(())
637}