1use super::{
2 block_partition_spec::{BlockPartitionSpec, BlockProcessorMap},
3 blocks_view::BlocksView,
4 lakehouse_context::LakehouseContext,
5 partition_cache::{LivePartitionProvider, QueryPartitionProvider},
6 partition_source_data::{PartitionSourceBlock, SourceDataBlocksInMemory},
7 view::{View, ViewMetadata},
8};
9use crate::{
10 dfext::{
11 string_column_accessor::string_column_by_name,
12 typed_column::{get_single_row_primitive_value, typed_column_by_name},
13 },
14 lakehouse::{
15 partition_cache::PartitionCache, partition_source_data::hash_to_object_count,
16 query::query_partitions, view::PartitionSpec,
17 },
18 metadata::{ProcessMetadata, StreamMetadata, block_from_batch_row},
19 properties::properties_column_accessor::properties_column_by_name,
20 response_writer::ResponseWriter,
21 time::TimeRange,
22};
23use anyhow::{Context, Result};
24use chrono::DurationRound;
25use chrono::{DateTime, TimeDelta, Utc};
26use datafusion::arrow::array::{BinaryArray, GenericListArray, StringArray};
27use datafusion::arrow::datatypes::{Schema, TimestampNanosecondType};
28use micromegas_ingestion::data_lake_connection::DataLakeConnection;
29use micromegas_tracing::prelude::*;
30use sqlx::Row;
31use std::sync::Arc;
32use uuid::Uuid;
33
34pub struct JitPartitionConfig {
36 pub max_nb_objects: i64,
37 pub max_insert_time_slice: TimeDelta,
38}
39
40impl Default for JitPartitionConfig {
41 fn default() -> Self {
42 JitPartitionConfig {
43 max_nb_objects: 20 * 1024 * 1024,
44 max_insert_time_slice: TimeDelta::hours(1),
45 }
46 }
47}
48
49async fn get_insert_time_range(
50 lakehouse: Arc<LakehouseContext>,
51 blocks_view: &BlocksView,
52 query_time_range: &TimeRange,
53 stream: Arc<StreamMetadata>,
54) -> Result<Option<TimeRange>> {
55 let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
57 let partitions = part_provider
58 .fetch(
59 &blocks_view.get_view_set_name(),
60 &blocks_view.get_view_instance_id(),
61 Some(*query_time_range),
62 blocks_view.get_file_schema_hash(),
63 )
64 .await?;
65 let stream_id = &stream.stream_id;
66 let begin_range_iso = query_time_range.begin.to_rfc3339();
67 let end_range_iso = query_time_range.end.to_rfc3339();
68 let sql = format!(
69 r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
70 FROM source
71 WHERE stream_id = '{stream_id}'
72 AND begin_time <= '{end_range_iso}'
73 AND end_time >= '{begin_range_iso}';"#
74 );
75 let reader_factory = lakehouse.reader_factory().clone();
76 let rbs = query_partitions(
77 lakehouse.runtime().clone(),
78 reader_factory,
79 lakehouse.lake().blob_storage.inner(),
80 blocks_view.get_file_schema(),
81 Arc::new(partitions),
82 &sql,
83 )
84 .await?
85 .collect()
86 .await?;
87 if rbs.is_empty() {
88 return Ok(None);
89 }
90 if rbs[0].num_rows() == 0 {
91 return Ok(None);
92 }
93 let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
94 let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
95 Ok(Some(TimeRange::new(
96 DateTime::from_timestamp_nanos(min_insert_time),
97 DateTime::from_timestamp_nanos(max_insert_time),
98 )))
99}
100
101pub async fn generate_stream_jit_partitions_segment(
103 config: &JitPartitionConfig,
104 lakehouse: Arc<LakehouseContext>,
105 blocks_view: &BlocksView,
106 insert_time_range: &TimeRange,
107 stream: Arc<StreamMetadata>,
108 process: Arc<ProcessMetadata>,
109) -> Result<Vec<SourceDataBlocksInMemory>> {
110 let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
111 &lakehouse.lake().db_pool,
112 blocks_view.get_view_set_name(),
113 blocks_view.get_view_instance_id(),
114 *insert_time_range,
115 )
116 .await?;
117 let partitions = cache.partitions;
118
119 let stream_id = &stream.stream_id;
120 let begin_range_iso = insert_time_range.begin.to_rfc3339();
121 let end_range_iso = insert_time_range.end.to_rfc3339();
122 let sql = format!(
123 r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time, "streams.format"
124 FROM source
125 WHERE stream_id = '{stream_id}'
126 AND insert_time >= '{begin_range_iso}'
127 AND insert_time < '{end_range_iso}'
128 ORDER BY insert_time, block_id;"#
129 );
130
131 let reader_factory = lakehouse.reader_factory().clone();
132 let rbs = query_partitions(
133 lakehouse.runtime().clone(),
134 reader_factory,
135 lakehouse.lake().blob_storage.inner(),
136 blocks_view.get_file_schema(),
137 Arc::new(partitions),
138 &sql,
139 )
140 .await?
141 .collect()
142 .await?;
143
144 let mut partitions = vec![];
145 let mut partition_blocks = vec![];
146 let mut partition_nb_objects: i64 = 0;
147 for rb in rbs {
148 let format_column = string_column_by_name(&rb, "streams.format")?;
149 for ir in 0..rb.num_rows() {
150 let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
151 let block_nb_objects = block.nb_objects as i64;
152 let format = format_column.value(ir)?.to_string();
153
154 if partition_nb_objects + block_nb_objects > config.max_nb_objects
156 && !partition_blocks.is_empty()
157 {
158 partitions.push(SourceDataBlocksInMemory {
160 blocks: partition_blocks,
161 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
162 });
163 partition_blocks = vec![Arc::new(PartitionSourceBlock {
165 block,
166 stream: stream.clone(),
167 process: process.clone(),
168 format,
169 })];
170 partition_nb_objects = block_nb_objects;
171 } else {
172 partition_nb_objects += block_nb_objects;
174 partition_blocks.push(Arc::new(PartitionSourceBlock {
175 block,
176 stream: stream.clone(),
177 process: process.clone(),
178 format,
179 }));
180 }
181 }
182 }
183 if partition_nb_objects != 0 {
184 partitions.push(SourceDataBlocksInMemory {
185 blocks: partition_blocks,
186 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
187 });
188 }
189
190 Ok(partitions)
191}
192
193pub async fn generate_stream_jit_partitions(
197 config: &JitPartitionConfig,
198 lakehouse: Arc<LakehouseContext>,
199 blocks_view: &BlocksView,
200 query_time_range: &TimeRange,
201 stream: Arc<StreamMetadata>,
202 process: Arc<ProcessMetadata>,
203) -> Result<Vec<SourceDataBlocksInMemory>> {
204 let insert_time_range = get_insert_time_range(
205 lakehouse.clone(),
206 blocks_view,
207 query_time_range,
208 stream.clone(),
209 )
210 .await?;
211 if insert_time_range.is_none() {
212 return Ok(vec![]);
213 }
214 let insert_time_range = insert_time_range.with_context(|| "missing insert_time_range")?;
215 let insert_time_range = TimeRange::new(
216 insert_time_range
217 .begin
218 .duration_trunc(config.max_insert_time_slice)?,
219 insert_time_range
220 .end
221 .duration_trunc(config.max_insert_time_slice)?
222 + config.max_insert_time_slice,
223 );
224 let mut begin_segment = insert_time_range.begin;
225 let mut end_segment = begin_segment + config.max_insert_time_slice;
226 let mut partitions = vec![];
227 while end_segment <= insert_time_range.end {
228 let insert_time_range = TimeRange::new(begin_segment, end_segment);
229 let mut segment_partitions = generate_stream_jit_partitions_segment(
230 config,
231 lakehouse.clone(),
232 blocks_view,
233 &insert_time_range,
234 stream.clone(),
235 process.clone(),
236 )
237 .await?;
238 partitions.append(&mut segment_partitions);
239 begin_segment = end_segment;
240 end_segment = begin_segment + config.max_insert_time_slice;
241 }
242 Ok(partitions)
243}
244
245pub async fn generate_process_jit_partitions_segment(
247 config: &JitPartitionConfig,
248 lakehouse: Arc<LakehouseContext>,
249 blocks_view: &BlocksView,
250 insert_time_range: &TimeRange,
251 process: Arc<ProcessMetadata>,
252 stream_tag: &str,
253) -> Result<Vec<SourceDataBlocksInMemory>> {
254 let cache = PartitionCache::fetch_overlapping_insert_range_for_view(
255 &lakehouse.lake().db_pool,
256 blocks_view.get_view_set_name(),
257 blocks_view.get_view_instance_id(),
258 *insert_time_range,
259 )
260 .await?;
261 let partitions = cache.partitions;
262
263 let process_id = &process.process_id;
264 let begin_range_iso = insert_time_range.begin.to_rfc3339();
265 let end_range_iso = insert_time_range.end.to_rfc3339();
266 let sql = format!(
267 r#"SELECT block_id, stream_id, process_id, begin_time, end_time, begin_ticks, end_ticks, nb_objects, object_offset, payload_size, insert_time,
268 "streams.dependencies_metadata", "streams.objects_metadata", "streams.tags", "streams.properties", "streams.format"
269 FROM source
270 WHERE process_id = '{process_id}'
271 AND array_has( "streams.tags", '{stream_tag}' )
272 AND insert_time >= '{begin_range_iso}'
273 AND insert_time < '{end_range_iso}'
274 ORDER BY insert_time, block_id;"#
275 );
276
277 let reader_factory = lakehouse.reader_factory().clone();
278 let rbs = query_partitions(
279 lakehouse.runtime().clone(),
280 reader_factory,
281 lakehouse.lake().blob_storage.inner(),
282 blocks_view.get_file_schema(),
283 Arc::new(partitions),
284 &sql,
285 )
286 .await?
287 .collect()
288 .await?;
289
290 let mut partitions = vec![];
291 let mut partition_blocks = vec![];
292 let mut partition_nb_objects: i64 = 0;
293
294 for rb in rbs {
295 for ir in 0..rb.num_rows() {
296 let block = block_from_batch_row(&rb, ir).with_context(|| "block_from_batch_row")?;
297 let block_nb_objects = block.nb_objects as i64;
298
299 let stream_id_column = string_column_by_name(&rb, "stream_id")?;
301 let stream_process_id_column = string_column_by_name(&rb, "process_id")?;
302 let dependencies_metadata_column: &BinaryArray =
303 typed_column_by_name(&rb, "streams.dependencies_metadata")?;
304 let objects_metadata_column: &BinaryArray =
305 typed_column_by_name(&rb, "streams.objects_metadata")?;
306 let stream_tags_column: &GenericListArray<i32> =
307 typed_column_by_name(&rb, "streams.tags")?;
308 let stream_properties_accessor = properties_column_by_name(&rb, "streams.properties")?;
309 let stream_format_column = string_column_by_name(&rb, "streams.format")?;
310
311 let stream_id = Uuid::parse_str(stream_id_column.value(ir)?)
312 .with_context(|| "parsing stream_id")?;
313 let stream_process_id = Uuid::parse_str(stream_process_id_column.value(ir)?)
314 .with_context(|| "parsing stream process_id")?;
315
316 let dependencies_metadata = dependencies_metadata_column.value(ir);
317 let objects_metadata = objects_metadata_column.value(ir);
318 let stream_tags = stream_tags_column
319 .value(ir)
320 .as_any()
321 .downcast_ref::<StringArray>()
322 .with_context(|| "casting stream_tags")?
323 .iter()
324 .map(|item| String::from(item.unwrap_or_default()))
325 .collect();
326
327 let stream_properties_jsonb = stream_properties_accessor.jsonb_value(ir)?;
329
330 let stream = Arc::new(StreamMetadata {
331 stream_id,
332 process_id: stream_process_id,
333 dependencies_metadata: ciborium::from_reader(dependencies_metadata)
334 .with_context(|| "decoding dependencies_metadata")?,
335 objects_metadata: ciborium::from_reader(objects_metadata)
336 .with_context(|| "decoding objects_metadata")?,
337 tags: stream_tags,
338 properties: Arc::new(stream_properties_jsonb),
339 });
340
341 let format = stream_format_column.value(ir)?.to_string();
342
343 if partition_nb_objects + block_nb_objects > config.max_nb_objects
345 && !partition_blocks.is_empty()
346 {
347 partitions.push(SourceDataBlocksInMemory {
349 blocks: partition_blocks,
350 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
351 });
352 partition_blocks = vec![Arc::new(PartitionSourceBlock {
354 block,
355 stream: stream.clone(),
356 process: process.clone(),
357 format,
358 })];
359 partition_nb_objects = block_nb_objects;
360 } else {
361 partition_nb_objects += block_nb_objects;
363 partition_blocks.push(Arc::new(PartitionSourceBlock {
364 block,
365 stream: stream.clone(),
366 process: process.clone(),
367 format,
368 }));
369 }
370 }
371 }
372 if partition_nb_objects != 0 {
373 partitions.push(SourceDataBlocksInMemory {
374 blocks: partition_blocks,
375 block_ids_hash: partition_nb_objects.to_le_bytes().to_vec(),
376 });
377 }
378 Ok(partitions)
379}
380
381pub async fn generate_process_jit_partitions(
385 config: &JitPartitionConfig,
386 lakehouse: Arc<LakehouseContext>,
387 blocks_view: &BlocksView,
388 query_time_range: &TimeRange,
389 process: Arc<ProcessMetadata>,
390 stream_tag: &str,
391) -> Result<Vec<SourceDataBlocksInMemory>> {
392 let part_provider = LivePartitionProvider::new(lakehouse.lake().db_pool.clone());
394 let partitions = part_provider
395 .fetch(
396 &blocks_view.get_view_set_name(),
397 &blocks_view.get_view_instance_id(),
398 Some(*query_time_range),
399 blocks_view.get_file_schema_hash(),
400 )
401 .await?;
402
403 let process_id = &process.process_id;
404 let begin_range_iso = query_time_range.begin.to_rfc3339();
405 let end_range_iso = query_time_range.end.to_rfc3339();
406 let sql = format!(
407 r#"SELECT MIN(insert_time) as min_insert_time, MAX(insert_time) as max_insert_time
408 FROM source
409 WHERE process_id = '{process_id}'
410 AND array_has( "streams.tags", '{stream_tag}' )
411 AND begin_time <= '{end_range_iso}'
412 AND end_time >= '{begin_range_iso}';"#
413 );
414
415 let reader_factory = lakehouse.reader_factory().clone();
416 let rbs = query_partitions(
417 lakehouse.runtime().clone(),
418 reader_factory,
419 lakehouse.lake().blob_storage.inner(),
420 blocks_view.get_file_schema(),
421 Arc::new(partitions),
422 &sql,
423 )
424 .await?
425 .collect()
426 .await?;
427
428 if rbs.is_empty() || rbs[0].num_rows() == 0 {
429 return Ok(vec![]);
430 }
431
432 let min_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 0)?;
433 let max_insert_time = get_single_row_primitive_value::<TimestampNanosecondType>(&rbs, 1)?;
434
435 if min_insert_time == 0 || max_insert_time == 0 {
436 return Ok(vec![]);
437 }
438
439 let insert_time_range = TimeRange::new(
440 DateTime::from_timestamp_nanos(min_insert_time)
441 .duration_trunc(config.max_insert_time_slice)?,
442 DateTime::from_timestamp_nanos(max_insert_time)
443 .duration_trunc(config.max_insert_time_slice)?
444 + config.max_insert_time_slice,
445 );
446
447 let mut begin_segment = insert_time_range.begin;
448 let mut end_segment = begin_segment + config.max_insert_time_slice;
449 let mut partitions = vec![];
450
451 while end_segment <= insert_time_range.end {
452 let insert_time_range = TimeRange::new(begin_segment, end_segment);
453 let mut segment_partitions = generate_process_jit_partitions_segment(
454 config,
455 lakehouse.clone(),
456 blocks_view,
457 &insert_time_range,
458 process.clone(),
459 stream_tag,
460 )
461 .await?;
462 partitions.append(&mut segment_partitions);
463 begin_segment = end_segment;
464 end_segment = begin_segment + config.max_insert_time_slice;
465 }
466 Ok(partitions)
467}
468
469#[span_fn]
472pub async fn is_jit_partition_up_to_date(
473 pool: &sqlx::PgPool,
474 view_meta: ViewMetadata,
475 spec: &SourceDataBlocksInMemory,
476) -> Result<bool> {
477 let (min_insert_time, max_insert_time) =
478 get_part_insert_time_range(spec).with_context(|| "get_event_time_range")?;
479 let desc = format!(
480 "[{}, {}] {} {}",
481 min_insert_time.to_rfc3339(),
482 max_insert_time.to_rfc3339(),
483 &*view_meta.view_set_name,
484 &*view_meta.view_instance_id,
485 );
486
487 let rows = if min_insert_time == max_insert_time {
495 sqlx::query(
497 "SELECT file_schema_hash, source_data_hash
498 FROM lakehouse_partitions
499 WHERE view_set_name = $1
500 AND view_instance_id = $2
501 AND begin_insert_time = $3
502 AND end_insert_time = $3
503 ;",
504 )
505 .bind(&*view_meta.view_set_name)
506 .bind(&*view_meta.view_instance_id)
507 .bind(min_insert_time)
508 } else {
509 sqlx::query(
511 "SELECT file_schema_hash, source_data_hash
512 FROM lakehouse_partitions
513 WHERE view_set_name = $1
514 AND view_instance_id = $2
515 AND begin_insert_time <= $3
516 AND end_insert_time >= $4
517 ;",
518 )
519 .bind(&*view_meta.view_set_name)
520 .bind(&*view_meta.view_instance_id)
521 .bind(max_insert_time)
522 .bind(min_insert_time)
523 }
524 .fetch_all(pool)
525 .await
526 .with_context(|| "fetching matching partitions")?;
527 if rows.len() != 1 {
528 debug!("{desc}: found {} partitions (expected 1)", rows.len());
529 for (i, row) in rows.iter().enumerate() {
530 let part_file_schema: Vec<u8> = row.try_get("file_schema_hash")?;
531 let part_source_data: Vec<u8> = row.try_get("source_data_hash")?;
532 let source_row_count = hash_to_object_count(&part_source_data)?;
533 debug!(
534 "{desc}: partition {}: file_schema_hash={:?}, source_rows={}",
535 i, part_file_schema, source_row_count
536 );
537 }
538 info!("{desc}: found {} partitions", rows.len());
539 return Ok(false);
540 }
541 let r = &rows[0];
542 let part_file_schema: Vec<u8> = r.try_get("file_schema_hash")?;
543 if part_file_schema != view_meta.file_schema_hash {
544 warn!("{desc}: found matching partition with different file schema");
547 return Ok(false);
548 }
549 let part_source_data: Vec<u8> = r.try_get("source_data_hash")?;
550 let existing_count = hash_to_object_count(&part_source_data)?;
551 let required_count = hash_to_object_count(&spec.block_ids_hash)?;
552 if existing_count < required_count {
553 info!("{desc}: existing partition lacks source data: creating a new partition");
554 return Ok(false);
555 }
556 info!("{desc}: partition up to date");
557 Ok(true)
558}
559
560fn get_part_insert_time_range(
563 spec: &SourceDataBlocksInMemory,
564) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
565 if spec.blocks.is_empty() {
566 anyhow::bail!("empty partition should not exist");
567 }
568 let min_insert_time = spec.blocks[0].block.insert_time;
570 let max_insert_time = spec.blocks[spec.blocks.len() - 1].block.insert_time;
571 Ok((min_insert_time, max_insert_time))
572}
573
574#[span_fn]
579pub async fn write_partition_from_blocks(
580 lake: Arc<DataLakeConnection>,
581 view_metadata: ViewMetadata,
582 schema: Arc<Schema>,
583 source_data: SourceDataBlocksInMemory,
584 block_processors: Arc<BlockProcessorMap>,
585) -> Result<()> {
586 if source_data.blocks.is_empty() {
587 anyhow::bail!("empty partition spec");
588 }
589 let min_insert_time = source_data.blocks[0].block.insert_time;
591 let max_insert_time = source_data.blocks[source_data.blocks.len() - 1]
592 .block
593 .insert_time;
594 let block_spec = BlockPartitionSpec {
595 view_metadata,
596 schema,
597 insert_range: TimeRange::new(min_insert_time, max_insert_time),
598 source_data: Arc::new(source_data),
599 block_processors,
600 };
601 let null_response_writer = Arc::new(ResponseWriter::new(None));
602 block_spec
603 .write(lake, null_response_writer)
604 .await
605 .with_context(|| "block_spec.write")?;
606 Ok(())
607}