1use crate::{
2 arrow_utils::serialize_parquet_metadata, lakehouse::async_parquet_writer::AsyncParquetWriter,
3 response_writer::Logger, time::TimeRange,
4};
5use anyhow::{Context, Result};
6use chrono::{DateTime, TimeDelta, Utc};
7use datafusion::{
8 arrow::{array::RecordBatch, datatypes::Schema},
9 parquet::{
10 arrow::AsyncArrowWriter,
11 basic::Compression,
12 file::{
13 metadata::ParquetMetaData,
14 properties::{WriterProperties, WriterVersion},
15 },
16 },
17};
18use micromegas_ingestion::data_lake_connection::DataLakeConnection;
19use micromegas_tracing::prelude::*;
20use object_store::ObjectStoreExt;
21use object_store::buffered::BufWriter;
22use sqlx::Row;
23use std::collections::hash_map::DefaultHasher;
24use std::hash::{Hash, Hasher};
25use std::sync::{Arc, atomic::AtomicI64};
26use tokio::sync::mpsc::Receiver;
27
28use super::{partition::Partition, partition_source_data, view::ViewMetadata};
29
30pub async fn add_file_for_cleanup(
35 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
36 file_path: &str,
37 file_size: i64,
38) -> Result<()> {
39 let expiration = Utc::now()
40 + TimeDelta::try_hours(1)
41 .with_context(|| "calculating expiration time for temporary file")?;
42
43 sqlx::query("INSERT INTO temporary_files VALUES ($1, $2, $3)")
44 .bind(file_path)
45 .bind(file_size)
46 .bind(expiration)
47 .execute(&mut **transaction)
48 .await
49 .with_context(|| format!("adding file {file_path} to temporary files for cleanup"))?;
50
51 Ok(())
52}
53
54pub struct PartitionRowSet {
56 pub rows_time_range: TimeRange,
57 pub rows: RecordBatch,
58}
59
60impl PartitionRowSet {
61 pub fn new(rows_time_range: TimeRange, rows: RecordBatch) -> Self {
62 Self {
63 rows_time_range,
64 rows,
65 }
66 }
67}
68
69#[span_fn]
70async fn retire_expired_partitions_batch(
71 lake: &DataLakeConnection,
72 expiration: DateTime<Utc>,
73) -> Result<bool> {
74 let batch_size: i32 = 1000;
75 let mut transaction = lake.db_pool.begin().await?;
76 let rows = sqlx::query(
77 "DELETE FROM lakehouse_partitions
78 WHERE (view_set_name, view_instance_id, begin_insert_time, end_insert_time) IN (
79 SELECT view_set_name, view_instance_id, begin_insert_time, end_insert_time
80 FROM lakehouse_partitions
81 WHERE end_insert_time < $1
82 LIMIT $2
83 )
84 RETURNING file_path, file_size;",
85 )
86 .bind(expiration)
87 .bind(batch_size)
88 .fetch_all(&mut *transaction)
89 .await
90 .with_context(|| "deleting expired partitions batch")?;
91
92 if rows.is_empty() {
93 return Ok(false);
94 }
95 let count = rows.len();
96 for row in &rows {
97 let file_path: Option<String> = row.try_get("file_path")?;
98 let file_size: i64 = row.try_get("file_size")?;
99 if let Some(path) = file_path {
100 debug!("retiring expired partition file {path} ({file_size} bytes)");
101 add_file_for_cleanup(&mut transaction, &path, file_size).await?;
102 }
103 }
104 transaction.commit().await.with_context(|| "commit")?;
105 info!("retired {count} expired partitions");
106 Ok(count == batch_size as usize)
107}
108
109#[span_fn]
110pub async fn retire_expired_partitions(
111 lake: &DataLakeConnection,
112 expiration: DateTime<Utc>,
113) -> Result<()> {
114 while retire_expired_partitions_batch(lake, expiration).await? {}
115 Ok(())
116}
117
118pub async fn retire_partitions(
121 transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
122 view_set_name: &str,
123 view_instance_id: &str,
124 begin_insert_time: DateTime<Utc>,
125 end_insert_time: DateTime<Utc>,
126 logger: Arc<dyn Logger>,
127) -> Result<()> {
128 let old_partitions = if begin_insert_time == end_insert_time {
134 sqlx::query(
136 "SELECT file_path, file_size
137 FROM lakehouse_partitions
138 WHERE view_set_name = $1
139 AND view_instance_id = $2
140 AND begin_insert_time = $3
141 AND end_insert_time = $3
142 ;",
143 )
144 .bind(view_set_name)
145 .bind(view_instance_id)
146 .bind(begin_insert_time)
147 .fetch_all(&mut **transaction)
148 .await
149 .with_context(|| "listing old partitions (exact match)")?
150 } else {
151 sqlx::query(
153 "SELECT file_path, file_size
154 FROM lakehouse_partitions
155 WHERE view_set_name = $1
156 AND view_instance_id = $2
157 AND begin_insert_time >= $3
158 AND end_insert_time <= $4
159 ;",
160 )
161 .bind(view_set_name)
162 .bind(view_instance_id)
163 .bind(begin_insert_time)
164 .bind(end_insert_time)
165 .fetch_all(&mut **transaction)
166 .await
167 .with_context(|| "listing old partitions (range)")?
168 };
169
170 if !old_partitions.is_empty() {
172 logger
173 .write_log_entry(format!(
174 "[RETIRE_FOUND] view={}/{} time_range=[{}, {}] found_partitions={}",
175 view_set_name,
176 view_instance_id,
177 begin_insert_time,
178 end_insert_time,
179 old_partitions.len()
180 ))
181 .await?;
182 }
183
184 let mut file_paths = Vec::new();
185 for old_part in &old_partitions {
186 let file_path: Option<String> = old_part.try_get("file_path")?;
187 let file_size: i64 = old_part.try_get("file_size")?;
188 if let Some(path) = file_path {
189 logger
190 .write_log_entry(format!(
191 "adding out of date partition {path} to temporary files to be deleted"
192 ))
193 .await?;
194 add_file_for_cleanup(transaction, &path, file_size).await?;
195 file_paths.push(path);
196 }
197 }
198
199 if begin_insert_time == end_insert_time {
200 sqlx::query(
202 "DELETE from lakehouse_partitions
203 WHERE view_set_name = $1
204 AND view_instance_id = $2
205 AND begin_insert_time = $3
206 AND end_insert_time = $3
207 ;",
208 )
209 .bind(view_set_name)
210 .bind(view_instance_id)
211 .bind(begin_insert_time)
212 .execute(&mut **transaction)
213 .await
214 .with_context(|| "deleting out of date partitions (exact match)")?
215 } else {
216 sqlx::query(
218 "DELETE from lakehouse_partitions
219 WHERE view_set_name = $1
220 AND view_instance_id = $2
221 AND begin_insert_time >= $3
222 AND end_insert_time <= $4
223 ;",
224 )
225 .bind(view_set_name)
226 .bind(view_instance_id)
227 .bind(begin_insert_time)
228 .bind(end_insert_time)
229 .execute(&mut **transaction)
230 .await
231 .with_context(|| "deleting out of date partitions (range)")?
232 };
233 Ok(())
234}
235
236fn generate_partition_lock_key(
238 view_set_name: &str,
239 view_instance_id: &str,
240 begin_insert_time: DateTime<Utc>,
241 end_insert_time: DateTime<Utc>,
242) -> i64 {
243 let mut hasher = DefaultHasher::new();
244 view_set_name.hash(&mut hasher);
245 view_instance_id.hash(&mut hasher);
246 begin_insert_time.hash(&mut hasher);
247 end_insert_time.hash(&mut hasher);
248 hasher.finish() as i64
249}
250
251async fn insert_partition(
252 lake: &DataLakeConnection,
253 partition: &Partition,
254 file_metadata: Option<&Arc<ParquetMetaData>>,
255 logger: Arc<dyn Logger>,
256) -> Result<()> {
257 let lock_key = generate_partition_lock_key(
259 &partition.view_metadata.view_set_name,
260 &partition.view_metadata.view_instance_id,
261 partition.begin_insert_time(),
262 partition.end_insert_time(),
263 );
264
265 let mut transaction = lake.db_pool.begin().await?;
266
267 debug!(
268 "[PARTITION_LOCK] view={}/{} time_range=[{}, {}] lock_key={} - acquiring advisory lock",
269 &partition.view_metadata.view_set_name,
270 &partition.view_metadata.view_instance_id,
271 partition.begin_insert_time(),
272 partition.end_insert_time(),
273 lock_key
274 );
275
276 sqlx::query("SELECT pg_advisory_xact_lock($1);")
279 .bind(lock_key)
280 .execute(&mut *transaction)
281 .await
282 .with_context(|| "acquiring advisory lock")?;
283
284 let source_row_count = partition_source_data::hash_to_object_count(&partition.source_data_hash)
286 .with_context(|| "decoding source_data_hash to row count")?;
287
288 logger
290 .write_log_entry(format!(
291 "[PARTITION_WRITE_START] view={}/{} time_range=[{}, {}] source_rows={} - lock acquired",
292 &partition.view_metadata.view_set_name,
293 &partition.view_metadata.view_instance_id,
294 partition.begin_insert_time(),
295 partition.end_insert_time(),
296 source_row_count
297 ))
298 .await?;
299
300 retire_partitions(
303 &mut transaction,
304 &partition.view_metadata.view_set_name,
305 &partition.view_metadata.view_instance_id,
306 partition.begin_insert_time(),
307 partition.end_insert_time(),
308 logger.clone(),
309 )
310 .await
311 .with_context(|| "retire_partitions")?;
312
313 debug!(
314 "[PARTITION_INSERT_ATTEMPT] view={}/{} time_range=[{}, {}] source_rows={} file_path={:?}",
315 &partition.view_metadata.view_set_name,
316 &partition.view_metadata.view_instance_id,
317 partition.begin_insert_time(),
318 partition.end_insert_time(),
319 source_row_count,
320 partition.file_path
321 );
322
323 if let (Some(file_path), Some(metadata)) = (&partition.file_path, file_metadata) {
326 let metadata_bytes = serialize_parquet_metadata(metadata)
327 .with_context(|| "serializing parquet metadata for dedicated table")?;
328 let insert_time = sqlx::types::chrono::Utc::now();
329
330 sqlx::query(
331 "INSERT INTO partition_metadata (file_path, metadata, insert_time, partition_format_version)
332 VALUES ($1, $2, $3, 2)",
333 )
334 .bind(file_path)
335 .bind(metadata_bytes.as_ref())
336 .bind(insert_time)
337 .execute(&mut *transaction)
338 .await
339 .with_context(|| format!("inserting metadata for file: {}", file_path))?;
340 }
341
342 let insert_result = sqlx::query(
344 "INSERT INTO lakehouse_partitions VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 2);",
345 )
346 .bind(&*partition.view_metadata.view_set_name)
347 .bind(&*partition.view_metadata.view_instance_id)
348 .bind(partition.begin_insert_time())
349 .bind(partition.end_insert_time())
350 .bind(partition.min_event_time())
351 .bind(partition.max_event_time())
352 .bind(partition.updated)
353 .bind(&partition.file_path)
354 .bind(partition.file_size)
355 .bind(&partition.view_metadata.file_schema_hash)
356 .bind(&partition.source_data_hash)
357 .bind(partition.num_rows)
358 .execute(&mut *transaction)
359 .await;
360
361 match insert_result {
362 Ok(_) => {
363 debug!(
364 "[PARTITION_INSERT_SUCCESS] view={}/{} time_range=[{}, {}] source_rows={}",
365 &partition.view_metadata.view_set_name,
366 &partition.view_metadata.view_instance_id,
367 partition.begin_insert_time(),
368 partition.end_insert_time(),
369 source_row_count
370 );
371 }
372 Err(ref e) => {
373 logger
374 .write_log_entry(format!(
375 "[PARTITION_INSERT_ERROR] view={}/{} time_range=[{}, {}] source_rows={} error={}",
376 &partition.view_metadata.view_set_name,
377 &partition.view_metadata.view_instance_id,
378 partition.begin_insert_time(),
379 partition.end_insert_time(),
380 source_row_count,
381 e
382 ))
383 .await?;
384 return Err(insert_result.unwrap_err().into());
385 }
386 };
387
388 transaction.commit().await.with_context(|| "commit")?;
390
391 info!(
392 "[PARTITION_WRITE_COMMIT] view={}/{} time_range=[{}, {}] file_path={:?} - lock released",
393 &partition.view_metadata.view_set_name,
394 &partition.view_metadata.view_instance_id,
395 partition.begin_insert_time(),
396 partition.end_insert_time(),
397 partition.file_path
398 );
399 Ok(())
400}
401
402struct PartitionWriteResult {
404 num_rows: i64,
405 file_metadata: Option<Arc<ParquetMetaData>>,
406 file_path: Option<String>,
407 file_size: i64,
408 event_time_range: Option<TimeRange>,
409}
410
411pub async fn write_rows_and_track_times(
413 rb_stream: &mut Receiver<Result<PartitionRowSet, anyhow::Error>>,
414 arrow_writer: &mut AsyncArrowWriter<AsyncParquetWriter>,
415 logger: &Arc<dyn Logger>,
416 desc: &str,
417) -> Result<Option<TimeRange>> {
418 let mut min_event_time: Option<DateTime<Utc>> = None;
419 let mut max_event_time: Option<DateTime<Utc>> = None;
420 let mut write_progression = 0;
421
422 while let Some(msg) = rb_stream.recv().await {
423 let row_set = msg?;
424 min_event_time = Some(
425 min_event_time
426 .unwrap_or(row_set.rows_time_range.begin)
427 .min(row_set.rows_time_range.begin),
428 );
429 max_event_time = Some(
430 max_event_time
431 .unwrap_or(row_set.rows_time_range.end)
432 .max(row_set.rows_time_range.end),
433 );
434 arrow_writer
435 .write(&row_set.rows)
436 .await
437 .with_context(|| "arrow_writer.write")?;
438 if arrow_writer.in_progress_size() > 100 * 1024 * 1024 {
439 arrow_writer
440 .flush()
441 .await
442 .with_context(|| "arrow_writer.flush")?;
443 }
444
445 let progression = arrow_writer.bytes_written() / (10 * 1024 * 1024);
447 if progression != write_progression {
448 write_progression = progression;
449 let written = arrow_writer.bytes_written();
450 logger
451 .write_log_entry(format!("{desc}: written {written} bytes"))
452 .await
453 .with_context(|| "writing log entry")?;
454 }
455 }
456
457 Ok(match (min_event_time, max_event_time) {
458 (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
459 _ => None,
460 })
461}
462
463async fn finalize_partition_write(
465 event_time_range: Option<TimeRange>,
466 arrow_writer: AsyncArrowWriter<AsyncParquetWriter>,
467 file_path: String,
468 byte_counter: &Arc<AtomicI64>,
469 logger: &Arc<dyn Logger>,
470 desc: &str,
471 object_store: Arc<dyn object_store::ObjectStore>,
472) -> Result<PartitionWriteResult> {
473 if let Some(event_time_range) = event_time_range {
474 let close_result = arrow_writer.close().await;
476
477 match close_result {
478 Ok(parquet_metadata) => {
479 let num_rows = parquet_metadata.file_metadata().num_rows();
480
481 if num_rows == 0 {
484 logger
486 .write_log_entry(format!(
487 "created 0-row file, treating as empty partition for {desc}"
488 ))
489 .await
490 .with_context(|| "writing log entry")?;
491
492 let path = object_store::path::Path::from(file_path.as_str());
494 if let Err(delete_err) = object_store.delete(&path).await {
495 warn!("failed to delete empty file {}: {}", file_path, delete_err);
496 }
497
498 return Ok(PartitionWriteResult {
499 num_rows: 0,
500 file_metadata: None,
501 file_path: None,
502 file_size: 0,
503 event_time_range: None,
504 });
505 }
506
507 debug!(
509 "wrote nb_rows={} size={} path={file_path}",
510 num_rows,
511 byte_counter.load(std::sync::atomic::Ordering::Relaxed)
512 );
513 let file_metadata = Arc::new(parquet_metadata);
514 let file_size = byte_counter.load(std::sync::atomic::Ordering::Relaxed);
515 Ok(PartitionWriteResult {
516 num_rows,
517 file_metadata: Some(file_metadata),
518 file_path: Some(file_path),
519 file_size,
520 event_time_range: Some(event_time_range),
521 })
522 }
523 Err(e) => {
524 warn!(
526 "arrow_writer.close failed, attempting to delete partial file: {}",
527 file_path
528 );
529 let path = object_store::path::Path::from(file_path.as_str());
530 if let Err(delete_err) = object_store.delete(&path).await {
531 warn!(
532 "failed to delete partial file {}: {}",
533 file_path, delete_err
534 );
535 }
536 Err(e).with_context(|| "arrow_writer.close")
537 }
538 }
539 } else {
540 drop(arrow_writer);
543
544 logger
545 .write_log_entry(format!("creating empty partition record for {desc}"))
546 .await
547 .with_context(|| "writing log entry")?;
548
549 let path = object_store::path::Path::from(file_path.as_str());
552 let _ = object_store.delete(&path).await;
553
554 Ok(PartitionWriteResult {
555 num_rows: 0,
556 file_metadata: None,
557 file_path: None,
558 file_size: 0,
559 event_time_range: None,
560 })
561 }
562}
563
564pub async fn write_partition_from_rows(
566 lake: Arc<DataLakeConnection>,
567 view_metadata: ViewMetadata,
568 file_schema: Arc<Schema>,
569 insert_range: TimeRange,
570 source_data_hash: Vec<u8>,
571 mut rb_stream: Receiver<Result<PartitionRowSet, anyhow::Error>>,
572 logger: Arc<dyn Logger>,
573) -> Result<()> {
574 let file_id = uuid::Uuid::new_v4();
575 let file_path = format!(
576 "views/{}/{}/{}/{}_{file_id}.parquet",
577 &view_metadata.view_set_name,
578 &view_metadata.view_instance_id,
579 insert_range.begin.format("%Y-%m-%d"),
580 insert_range.begin.format("%H-%M-%S")
581 );
582 let byte_counter = Arc::new(AtomicI64::new(0));
583 let object_store_writer = AsyncParquetWriter::new(
584 BufWriter::new(
585 lake.blob_storage.inner(),
586 object_store::path::Path::parse(&file_path).with_context(|| "parsing path")?,
587 )
588 .with_max_concurrency(2),
589 byte_counter.clone(),
590 );
591
592 let props = WriterProperties::builder()
595 .set_writer_version(WriterVersion::PARQUET_2_0)
596 .set_compression(Compression::LZ4_RAW)
597 .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
600 .build();
601 let mut arrow_writer =
602 AsyncArrowWriter::try_new(object_store_writer, file_schema.clone(), Some(props))
603 .with_context(|| "allocating async arrow writer")?;
604
605 let desc = format!(
606 "[{}, {}] {} {}",
607 view_metadata.view_set_name,
608 view_metadata.view_instance_id,
609 insert_range.begin.to_rfc3339(),
610 insert_range.end.to_rfc3339()
611 );
612
613 let event_time_range =
615 match write_rows_and_track_times(&mut rb_stream, &mut arrow_writer, &logger, &desc).await {
616 Ok(range) => range,
617 Err(e) => {
618 drop(arrow_writer);
622 warn!(
623 "write_rows_and_track_times failed, attempting to delete partial file: {}",
624 file_path
625 );
626 let path = object_store::path::Path::from(file_path.as_str());
627 if let Err(delete_err) = lake.blob_storage.inner().delete(&path).await {
628 warn!(
629 "failed to delete partial file {}: {}",
630 file_path, delete_err
631 );
632 }
633 return Err(e).with_context(|| "write_rows_and_track_times");
634 }
635 };
636
637 let result = finalize_partition_write(
639 event_time_range,
640 arrow_writer,
641 file_path,
642 &byte_counter,
643 &logger,
644 &desc,
645 lake.blob_storage.inner(),
646 )
647 .await?;
648
649 insert_partition(
650 &lake,
651 &Partition {
652 view_metadata,
653 insert_time_range: insert_range,
654 event_time_range: result.event_time_range,
655 updated: sqlx::types::chrono::Utc::now(),
656 file_path: result.file_path,
657 file_size: result.file_size,
658 source_data_hash,
659 num_rows: result.num_rows,
660 },
661 result.file_metadata.as_ref(),
662 logger,
663 )
664 .await
665 .with_context(|| "insert_partition")?;
666 Ok(())
667}