1use super::{
2 block_partition_spec::{BlockPartitionSpec, BlockProcessor},
3 blocks_view::BlocksView,
4 lakehouse_context::LakehouseContext,
5 partition_cache::{LivePartitionProvider, QueryPartitionProvider},
6 partition_source_data::{PartitionSourceBlock, SourceDataBlocksInMemory},
7 view::{View, ViewMetadata},
8};
9use crate::{
10 dfext::typed_column::get_single_row_primitive_value,
11 lakehouse::{partition_cache::PartitionCache, view::PartitionSpec},
12 metadata::{ProcessMetadata, StreamMetadata, block_from_batch_row},
13 time::TimeRange,
14};
15use crate::{
16 lakehouse::{partition_source_data::hash_to_object_count, query::query_partitions},
17 response_writer::ResponseWriter,
18};
19use anyhow::{Context, Result};
20use chrono::DurationRound;
21use chrono::{DateTime, TimeDelta, Utc};
22use datafusion::arrow::datatypes::{Schema, TimestampNanosecondType};
23use micromegas_ingestion::data_lake_connection::DataLakeConnection;
24use micromegas_tracing::prelude::*;
25use sqlx::Row;
26use std::sync::Arc;
27
28pub struct JitPartitionConfig {
30 pub max_nb_objects: i64,
31 pub max_insert_time_slice: TimeDelta,
32}
33
34impl Default for JitPartitionConfig {
35 fn default() -> Self {
36 JitPartitionConfig {
37 max_nb_objects: 20 * 1024 * 1024,
38 max_insert_time_slice: TimeDelta::hours(1),
39 }
40 }
41}
42
43async fn get_insert_time_range(
44 lakehouse: Arc<LakehouseContext>,
45 blocks_view: &BlocksView,
46 query_time_range: &TimeRange,
47 stream: Arc<StreamMetadata>,
48) -> Result<Option<TimeRange>> {
49 let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
51 let partitions = part_provider
52 .fetch(
53 &blocks_view.get_view_set_name(),
54 &blocks_view.get_view_instance_id(),
55 Some(*query_time_range),
56 blocks_view.get_file_schema_hash(),
57 )
58 .await?;
59 let stream_id = &stream.stream_id;
60 let begin_range_iso = query_time_range.begin.to_rfc3339();
61 let end_range_iso = query_time_range.end.to_rfc3339();
62 let sql = format!(
63 r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
64 FROM source
65 WHERE stream_id = '{stream_id}'
66 AND begin_time <= '{end_range_iso}'
67 AND end_time >= '{begin_range_iso}';"#
68 );
69 let reader_factory = lakehouse.reader_factory().clone();
70 let rbs = query_partitions(
71 lakehouse.runtime().clone(),
72 reader_factory,
73 lakehouse.lake().blob_storage.inner(),
74 blocks_view.get_file_schema(),
75 Arc::new(partitions),
76 &sql,
77 )
78 .await?
79 .collect()
80 .await?;
81 if rbs.is_empty() {
82 return Ok(None);
83 }
84 if rbs[0].num_rows() == 0 {
85 return Ok(None);
86 }
87 let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
88 let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
89 Ok(Some(TimeRange::new(
90 DateTime::from_timestamp_nanos(min_insert_time),
91 DateTime::from_timestamp_nanos(max_insert_time),
92 )))
93}
94
95pub async fn generate_stream_jit_partitions_segment(
97 config: &JitPartitionConfig,
98 lakehouse: Arc<LakehouseContext>,
99 blocks_view: &BlocksView,
100 insert_time_range: &TimeRange,
101 stream: Arc<StreamMetadata>,
102 process: Arc<ProcessMetadata>,
103) -> Result<Vec<SourceDataBlocksInMemory>> {
104 let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
105 &lakehouse.lake().db_pool,
106 blocks_view.get_view_set_name(),
107 blocks_view.get_view_instance_id(),
108 *insert_time_range,
109 )
110 .await?;
111 let partitions = cache.partitions;
112
113 let stream_id = &stream.stream_id;
114 let begin_range_iso = insert_time_range.begin.to_rfc3339();
115 let end_range_iso = insert_time_range.end.to_rfc3339();
116 let sql = format!(
117 r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time
118 FROM source
119 WHERE stream_id = '{stream_id}'
120 AND insert_time >= '{begin_range_iso}'
121 AND insert_time < '{end_range_iso}'
122 ORDER BY insert_time, block_id;"#
123 );
124
125 let reader_factory = lakehouse.reader_factory().clone();
126 let rbs = query_partitions(
127 lakehouse.runtime().clone(),
128 reader_factory,
129 lakehouse.lake().blob_storage.inner(),
130 blocks_view.get_file_schema(),
131 Arc::new(partitions),
132 &sql,
133 )
134 .await?
135 .collect()
136 .await?;
137
138 let mut partitions = vec![];
139 let mut partition_blocks = vec![];
140 let mut partition_nb_objects: i64 = 0;
141 for rb in rbs {
142 for ir in 0..rb.num_rows() {
143 let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
144 let block_nb_objects = block.nb_objects as i64;
145
146 if partition_nb_objects + block_nb_objects > config.max_nb_objects
148 && !partition_blocks.is_empty()
149 {
150 partitions.push(SourceDataBlocksInMemory {
152 blocks: partition_blocks,
153 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
154 });
155 partition_blocks = vec![Arc::new(PartitionSourceBlock {
157 block,
158 stream: stream.clone(),
159 process: process.clone(),
160 })];
161 partition_nb_objects = block_nb_objects;
162 } else {
163 partition_nb_objects += block_nb_objects;
165 partition_blocks.push(Arc::new(PartitionSourceBlock {
166 block,
167 stream: stream.clone(),
168 process: process.clone(),
169 }));
170 }
171 }
172 }
173 if partition_nb_objects != 0 {
174 partitions.push(SourceDataBlocksInMemory {
175 blocks: partition_blocks,
176 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
177 });
178 }
179
180 Ok(partitions)
181}
182
183pub async fn generate_stream_jit_partitions(
187 config: &JitPartitionConfig,
188 lakehouse: Arc<LakehouseContext>,
189 blocks_view: &BlocksView,
190 query_time_range: &TimeRange,
191 stream: Arc<StreamMetadata>,
192 process: Arc<ProcessMetadata>,
193) -> Result<Vec<SourceDataBlocksInMemory>> {
194 let insert_time_range = get_insert_time_range(
195 lakehouse.clone(),
196 blocks_view,
197 query_time_range,
198 stream.clone(),
199 )
200 .await?;
201 if insert_time_range.is_none() {
202 return Ok(vec![]);
203 }
204 let insert_time_range = insert_time_range.with_context(|| "missing insert_time_range")?;
205 let insert_time_range = TimeRange::new(
206 insert_time_range
207 .begin
208 .duration_trunc(config.max_insert_time_slice)?,
209 insert_time_range
210 .end
211 .duration_trunc(config.max_insert_time_slice)?
212 + config.max_insert_time_slice,
213 );
214 let mut begin_segment = insert_time_range.begin;
215 let mut end_segment = begin_segment + config.max_insert_time_slice;
216 let mut partitions = vec![];
217 while end_segment <= insert_time_range.end {
218 let insert_time_range = TimeRange::new(begin_segment, end_segment);
219 let mut segment_partitions = generate_stream_jit_partitions_segment(
220 config,
221 lakehouse.clone(),
222 blocks_view,
223 &insert_time_range,
224 stream.clone(),
225 process.clone(),
226 )
227 .await?;
228 partitions.append(&mut segment_partitions);
229 begin_segment = end_segment;
230 end_segment = begin_segment + config.max_insert_time_slice;
231 }
232 Ok(partitions)
233}
234
235pub async fn generate_process_jit_partitions_segment(
237 config: &JitPartitionConfig,
238 lakehouse: Arc<LakehouseContext>,
239 blocks_view: &BlocksView,
240 insert_time_range: &TimeRange,
241 process: Arc<ProcessMetadata>,
242 stream_tag: &str,
243) -> Result<Vec<SourceDataBlocksInMemory>> {
244 let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
245 &lakehouse.lake().db_pool,
246 blocks_view.get_view_set_name(),
247 blocks_view.get_view_instance_id(),
248 *insert_time_range,
249 )
250 .await?;
251 let partitions = cache.partitions;
252
253 let process_id = &process.process_id;
254 let begin_range_iso = insert_time_range.begin.to_rfc3339();
255 let end_range_iso = insert_time_range.end.to_rfc3339();
256 let sql = format!(
257 r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time,
258 "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties"
259 FROM source
260 WHERE process_id = '{process_id}'
261 AND array_has( "streams.tags", '{stream_tag}' )
262 AND insert_time >= '{begin_range_iso}'
263 AND insert_time < '{end_range_iso}'
264 ORDER BY insert_time, block_id;"#
265 );
266
267 let reader_factory = lakehouse.reader_factory().clone();
268 let rbs = query_partitions(
269 lakehouse.runtime().clone(),
270 reader_factory,
271 lakehouse.lake().blob_storage.inner(),
272 blocks_view.get_file_schema(),
273 Arc::new(partitions),
274 &sql,
275 )
276 .await?
277 .collect()
278 .await?;
279
280 let mut partitions = vec![];
281 let mut partition_blocks = vec![];
282 let mut partition_nb_objects: i64 = 0;
283
284 for rb in rbs {
285 for ir in 0..rb.num_rows() {
286 let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
287 let block_nb_objects = block.nb_objects as i64;
288
289 use crate::dfext::{
291 string_column_accessor::string_column_by_name, typed_column::typed_column_by_name,
292 };
293 use crate::properties::properties_column_accessor::properties_column_by_name;
294 use datafusion::arrow::array::{BinaryArray, GenericListArray, StringArray};
295 use uuid::Uuid;
296
297 let stream_id_column = string_column_by_name(&rb, "stream_id")?;
298 let stream_process_id_column = string_column_by_name(&rb, "process_id")?;
299 let dependencies_metadata_column: &BinaryArray =
300 typed_column_by_name(&rb, "streams.dependencies_metadata")?;
301 let objects_metadata_column: &BinaryArray =
302 typed_column_by_name(&rb, "streams.objects_metadata")?;
303 let stream_tags_column: &GenericListArray<i32> =
304 typed_column_by_name(&rb, "streams.tags")?;
305 let stream_properties_accessor = properties_column_by_name(&rb, "streams.properties")?;
306
307 let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)
308 .with_context(|| "parsing stream_id")?;
309 let stream_process_id = Uuid::parse_str(stream_process_id_column.value(ir)?)
310 .with_context(|| "parsing stream process_id")?;
311
312 let dependencies_metadata = dependencies_metadata_column.value(ir);
313 let objects_metadata = objects_metadata_column.value(ir);
314 let stream_tags = stream_tags_column
315 .value(ir)
316 .as_any()
317 .downcast_ref::<StringArray>()
318 .with_context(|| "casting stream_tags")?
319 .iter()
320 .map(|item| String::from(item.unwrap_or_default()))
321 .collect();
322
323 let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
325
326 let stream = Arc::new(StreamMetadata {
327 stream_id,
328 process_id: stream_process_id,
329 dependencies_metadata: ciborium::from_reader(dependencies_metadata)
330 .with_context(|| "decoding dependencies_metadata")?,
331 objects_metadata: ciborium::from_reader(objects_metadata)
332 .with_context(|| "decoding objects_metadata")?,
333 tags: stream_tags,
334 properties: Arc::new(stream_properties_jsonb),
335 });
336
337 if partition_nb_objects + block_nb_objects > config.max_nb_objects
339 && !partition_blocks.is_empty()
340 {
341 partitions.push(SourceDataBlocksInMemory {
343 blocks: partition_blocks,
344 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
345 });
346 partition_blocks = vec![Arc::new(PartitionSourceBlock {
348 block,
349 stream: stream.clone(),
350 process: process.clone(),
351 })];
352 partition_nb_objects = block_nb_objects;
353 } else {
354 partition_nb_objects += block_nb_objects;
356 partition_blocks.push(Arc::new(PartitionSourceBlock {
357 block,
358 stream: stream.clone(),
359 process: process.clone(),
360 }));
361 }
362 }
363 }
364 if partition_nb_objects != 0 {
365 partitions.push(SourceDataBlocksInMemory {
366 blocks: partition_blocks,
367 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
368 });
369 }
370 Ok(partitions)
371}
372
373pub async fn generate_process_jit_partitions(
377 config: &JitPartitionConfig,
378 lakehouse: Arc<LakehouseContext>,
379 blocks_view: &BlocksView,
380 query_time_range: &TimeRange,
381 process: Arc<ProcessMetadata>,
382 stream_tag: &str,
383) -> Result<Vec<SourceDataBlocksInMemory>> {
384 let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
386 let partitions = part_provider
387 .fetch(
388 &blocks_view.get_view_set_name(),
389 &blocks_view.get_view_instance_id(),
390 Some(*query_time_range),
391 blocks_view.get_file_schema_hash(),
392 )
393 .await?;
394
395 let process_id = &process.process_id;
396 let begin_range_iso = query_time_range.begin.to_rfc3339();
397 let end_range_iso = query_time_range.end.to_rfc3339();
398 let sql = format!(
399 r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
400 FROM source
401 WHERE process_id = '{process_id}'
402 AND array_has( "streams.tags", '{stream_tag}' )
403 AND begin_time <= '{end_range_iso}'
404 AND end_time >= '{begin_range_iso}';"#
405 );
406
407 let reader_factory = lakehouse.reader_factory().clone();
408 let rbs = query_partitions(
409 lakehouse.runtime().clone(),
410 reader_factory,
411 lakehouse.lake().blob_storage.inner(),
412 blocks_view.get_file_schema(),
413 Arc::new(partitions),
414 &sql,
415 )
416 .await?
417 .collect()
418 .await?;
419
420 if rbs.is_empty() || rbs[0].num_rows() == 0 {
421 return Ok(vec![]);
422 }
423
424 let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
425 let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
426
427 if min_insert_time == 0 || max_insert_time == 0 {
428 return Ok(vec![]);
429 }
430
431 let insert_time_range = TimeRange::new(
432 DateTime::from_timestamp_nanos(min_insert_time)
433 .duration_trunc(config.max_insert_time_slice)?,
434 DateTime::from_timestamp_nanos(max_insert_time)
435 .duration_trunc(config.max_insert_time_slice)?
436 + config.max_insert_time_slice,
437 );
438
439 let mut begin_segment = insert_time_range.begin;
440 let mut end_segment = begin_segment + config.max_insert_time_slice;
441 let mut partitions = vec![];
442
443 while end_segment <= insert_time_range.end {
444 let insert_time_range = TimeRange::new(begin_segment, end_segment);
445 let mut segment_partitions = generate_process_jit_partitions_segment(
446 config,
447 lakehouse.clone(),
448 blocks_view,
449 &insert_time_range,
450 process.clone(),
451 stream_tag,
452 )
453 .await?;
454 partitions.append(&mut segment_partitions);
455 begin_segment = end_segment;
456 end_segment = begin_segment + config.max_insert_time_slice;
457 }
458 Ok(partitions)
459}
460
461#[span_fn]
464pub async fn is_jit_partition_up_to_date(
465 pool: &sqlx::PgPool,
466 view_meta: ViewMetadata,
467 spec: &SourceDataBlocksInMemory,
468) -> Result<bool> {
469 let (min_insert_time, max_insert_time) =
470 get_part_insert_time_range(spec).with_context(|| "get_event_time_range")?;
471 let desc = format!(
472 "[{}, {}] {} {}",
473 min_insert_time.to_rfc3339(),
474 max_insert_time.to_rfc3339(),
475 &*view_meta.view_set_name,
476 &*view_meta.view_instance_id,
477 );
478
479 let rows = if min_insert_time == max_insert_time {
487 sqlx::query(
489 "SELECT file_schema_hash, source_data_hash
490 FROM lakehouse_partitions
491 WHERE view_set_name = $1
492 AND view_instance_id = $2
493 AND begin_insert_time = $3
494 AND end_insert_time = $3
495 ;",
496 )
497 .bind(&*view_meta.view_set_name)
498 .bind(&*view_meta.view_instance_id)
499 .bind(min_insert_time)
500 } else {
501 sqlx::query(
503 "SELECT file_schema_hash, source_data_hash
504 FROM lakehouse_partitions
505 WHERE view_set_name = $1
506 AND view_instance_id = $2
507 AND begin_insert_time <= $3
508 AND end_insert_time >= $4
509 ;",
510 )
511 .bind(&*view_meta.view_set_name)
512 .bind(&*view_meta.view_instance_id)
513 .bind(max_insert_time)
514 .bind(min_insert_time)
515 }
516 .fetch_all(pool)
517 .await
518 .with_context(|| "fetching matching partitions")?;
519 if rows.len() != 1 {
520 debug!("{desc}: found {} partitions (expected 1)", rows.len());
521 for (i, row) in rows.iter().enumerate() {
522 let part_file_schema: Vec<u8> = row.try_get("file_schema_hash")?;
523 let part_source_data: Vec<u8> = row.try_get("source_data_hash")?;
524 let source_row_count = hash_to_object_count(&part_source_data)?;
525 debug!(
526 "{desc}: partition {}: file_schema_hash={:?}, source_rows={}",
527 i, part_file_schema, source_row_count
528 );
529 }
530 info!("{desc}: found {} partitions", rows.len());
531 return Ok(false);
532 }
533 let r = &rows[0];
534 let part_file_schema: Vec<u8> = r.try_get("file_schema_hash")?;
535 if part_file_schema != view_meta.file_schema_hash {
536 warn!("{desc}: found matching partition with different file schema");
539 return Ok(false);
540 }
541 let part_source_data: Vec<u8> = r.try_get("source_data_hash")?;
542 let existing_count = hash_to_object_count(&part_source_data)?;
543 let required_count = hash_to_object_count(&spec.block_ids_hash)?;
544 if existing_count < required_count {
545 info!("{desc}: existing partition lacks source data: creating a new partition");
546 return Ok(false);
547 }
548 info!("{desc}: partition up to date");
549 Ok(true)
550}
551
552fn get_part_insert_time_range(
555 spec: &SourceDataBlocksInMemory,
556) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
557 if spec.blocks.is_empty() {
558 anyhow::bail!("empty partition should not exist");
559 }
560 let min_insert_time = spec.blocks[0].block.insert_time;
562 let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
563 Ok((min_insert_time, max_insert_time))
564}
565
566#[span_fn]
568pub async fn write_partition_from_blocks(
569 lake: Arc<DataLakeConnection>,
570 view_metadata: ViewMetadata,
571 schema: Arc<Schema>,
572 source_data: SourceDataBlocksInMemory,
573 block_processor: Arc<dyn BlockProcessor>,
574) -> Result<()> {
575 if source_data.blocks.is_empty() {
576 anyhow::bail!("empty partition spec");
577 }
578 let min_insert_time = source_data.blocks[0].block.insert_time;
580 let max_insert_time = source_data.blocks[source_data.blocks.len() - 1]
581 .block
582 .insert_time;
583 let block_spec = BlockPartitionSpec {
584 view_metadata,
585 schema,
586 insert_range: TimeRange::new(min_insert_time, max_insert_time),
587 source_data: Arc::new(source_data),
588 block_processor,
589 };
590 let null_response_writer = Arc::new(ResponseWriter::new(None));
591 block_spec
592 .write(lake, null_response_writer)
593 .await
594 .with_context(|| "block_spec.write")?;
595 Ok(())
596}