1use crate::time::TimeRange;
2
3use super::{
4 partition::Partition, partition_metadata::load_partition_metadata, view::ViewMetadata,
5};
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use datafusion::parquet::file::metadata::ParquetMetaData;
10use micromegas_tracing::prelude::*;
11use sqlx::{PgPool, Row};
12use std::{fmt, sync::Arc};
13
14#[derive(Clone, Debug)]
17pub struct PartitionWithMetadata {
18 pub partition: Partition,
19 pub file_metadata: Arc<ParquetMetaData>,
20}
21
22#[span_fn]
25pub async fn partition_with_metadata(
26 partition: Partition,
27 pool: &PgPool,
28) -> Result<PartitionWithMetadata> {
29 let file_path = partition
30 .file_path
31 .as_ref()
32 .ok_or_else(|| anyhow::anyhow!("cannot load metadata for empty partition"))?;
33 let file_metadata = load_partition_metadata(pool, file_path, None)
34 .await
35 .with_context(|| format!("loading metadata for partition: {}", file_path))?;
36 Ok(PartitionWithMetadata {
37 partition,
38 file_metadata,
39 })
40}
41
42#[async_trait]
44pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::Debug {
45 async fn fetch(
47 &self,
48 view_set_name: &str,
49 view_instance_id: &str,
50 query_range: Option<TimeRange>,
51 file_schema_hash: Vec<u8>,
52 ) -> Result<Vec<Partition>>;
53}
54
55#[derive(Debug)]
57pub struct PartitionCache {
58 pub partitions: Vec<Partition>,
59 insert_range: TimeRange,
60}
61
62impl fmt::Display for PartitionCache {
63 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
64 write!(f, "{self:?}")
65 }
66}
67
68impl PartitionCache {
69 pub fn len(&self) -> usize {
70 self.partitions.len()
71 }
72
73 pub fn is_empty(&self) -> bool {
74 self.partitions.is_empty()
75 }
76
77 #[span_fn]
81 pub async fn fetch_overlapping_insert_range(
82 pool: &sqlx::PgPool,
83 insert_range: TimeRange,
84 ) -> Result<Self> {
85 let rows = sqlx::query(
86 "SELECT view_set_name,
87 view_instance_id,
88 begin_insert_time,
89 end_insert_time,
90 min_event_time,
91 max_event_time,
92 updated,
93 file_path,
94 file_size,
95 file_schema_hash,
96 source_data_hash,
97 num_rows
98 FROM lakehouse_partitions
99 WHERE begin_insert_time < $1
100 AND end_insert_time > $2
101 ORDER BY begin_insert_time, file_path
102 ;",
103 )
104 .bind(insert_range.end)
105 .bind(insert_range.begin)
106 .fetch_all(pool)
107 .await
108 .with_context(|| "fetching partitions")?;
109 let mut partitions = vec![];
110 for r in rows {
111 let view_metadata = ViewMetadata {
112 view_set_name: Arc::new(r.try_get("view_set_name")?),
113 view_instance_id: Arc::new(r.try_get("view_instance_id")?),
114 file_schema_hash: r.try_get("file_schema_hash")?,
115 };
116 let insert_time_range = TimeRange {
118 begin: r.try_get("begin_insert_time")?,
119 end: r.try_get("end_insert_time")?,
120 };
121 let event_time_range = match (
122 r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
123 r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
124 ) {
125 (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
126 (None, None) => None, (Some(_), None) | (None, Some(_)) => {
128 anyhow::bail!(
129 "Corrupt partition record: only one of min/max_event_time is NULL"
130 );
131 }
132 };
133 let partition = Partition {
134 view_metadata,
135 insert_time_range,
136 event_time_range,
137 updated: r.try_get("updated")?,
138 file_path: r.try_get::<String, _>("file_path").ok(),
139 file_size: r.try_get("file_size")?,
140 source_data_hash: r.try_get("source_data_hash")?,
141 num_rows: r.try_get("num_rows")?,
142 };
143 partition
144 .validate()
145 .with_context(|| "validating partition from database")?;
146 partitions.push(partition);
147 }
148 Ok(Self {
149 partitions,
150 insert_range,
151 })
152 }
153
154 #[span_fn]
156 pub async fn fetch_overlapping_insert_range_for_view(
157 pool: &sqlx::PgPool,
158 view_set_name: Arc<String>,
159 view_instance_id: Arc<String>,
160 insert_range: TimeRange,
161 ) -> Result<Self> {
162 let rows = sqlx::query(
163 "SELECT begin_insert_time,
164 end_insert_time,
165 min_event_time,
166 max_event_time,
167 updated,
168 file_path,
169 file_size,
170 file_schema_hash,
171 source_data_hash,
172 num_rows
173 FROM lakehouse_partitions
174 WHERE begin_insert_time < $1
175 AND end_insert_time > $2
176 AND view_set_name = $3
177 AND view_instance_id = $4
178 ORDER BY begin_insert_time, file_path
179 ;",
180 )
181 .bind(insert_range.end)
182 .bind(insert_range.begin)
183 .bind(&*view_set_name)
184 .bind(&*view_instance_id)
185 .fetch_all(pool)
186 .await
187 .with_context(|| "fetching partitions")?;
188 let mut partitions = vec![];
189 for r in rows {
190 let view_metadata = ViewMetadata {
191 view_set_name: view_set_name.clone(),
192 view_instance_id: view_instance_id.clone(),
193 file_schema_hash: r.try_get("file_schema_hash")?,
194 };
195 let insert_time_range = TimeRange {
197 begin: r.try_get("begin_insert_time")?,
198 end: r.try_get("end_insert_time")?,
199 };
200 let event_time_range = match (
201 r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
202 r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
203 ) {
204 (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
205 (None, None) => None, (Some(_), None) | (None, Some(_)) => {
207 anyhow::bail!(
208 "Corrupt partition record: only one of min/max_event_time is NULL"
209 );
210 }
211 };
212 let partition = Partition {
213 view_metadata,
214 insert_time_range,
215 event_time_range,
216 updated: r.try_get("updated")?,
217 file_path: r.try_get::<String, _>("file_path").ok(),
218 file_size: r.try_get("file_size")?,
219 source_data_hash: r.try_get("source_data_hash")?,
220 num_rows: r.try_get("num_rows")?,
221 };
222 partition
223 .validate()
224 .with_context(|| "validating partition from database")?;
225 partitions.push(partition);
226 }
227 Ok(Self {
228 partitions,
229 insert_range,
230 })
231 }
232
233 pub fn filter(
235 &self,
236 view_set_name: &str,
237 view_instance_id: &str,
238 file_schema_hash: &[u8],
239 insert_range: TimeRange,
240 ) -> Self {
241 let mut partitions = vec![];
242 for part in &self.partitions {
243 if *part.view_metadata.view_set_name == view_set_name
244 && *part.view_metadata.view_instance_id == view_instance_id
245 && part.view_metadata.file_schema_hash == file_schema_hash
246 && part.begin_insert_time() < insert_range.end
247 && part.end_insert_time() > insert_range.begin
248 {
249 partitions.push(part.clone());
250 }
251 }
252 Self {
253 partitions,
254 insert_range,
255 }
256 }
257
258 pub fn filter_insert_range(&self, insert_range: TimeRange) -> Self {
260 let mut partitions = vec![];
261 for part in &self.partitions {
262 if part.begin_insert_time() < insert_range.end
263 && part.end_insert_time() > insert_range.begin
264 {
265 partitions.push(part.clone());
266 }
267 }
268 Self {
269 partitions,
270 insert_range,
271 }
272 }
273
274 pub fn filter_inside_range(
276 &self,
277 view_set_name: &str,
278 view_instance_id: &str,
279 insert_range: TimeRange,
280 ) -> Self {
281 let mut partitions = vec![];
282 for part in &self.partitions {
283 if *part.view_metadata.view_set_name == view_set_name
284 && *part.view_metadata.view_instance_id == view_instance_id
285 && part.begin_insert_time() >= insert_range.begin
286 && part.end_insert_time() <= insert_range.end
287 {
288 partitions.push(part.clone());
289 }
290 }
291 Self {
292 partitions,
293 insert_range,
294 }
295 }
296}
297
298#[async_trait]
299impl QueryPartitionProvider for PartitionCache {
300 #[span_fn]
302 async fn fetch(
303 &self,
304 view_set_name: &str,
305 view_instance_id: &str,
306 query_range: Option<TimeRange>,
307 file_schema_hash: Vec<u8>,
308 ) -> Result<Vec<Partition>> {
309 let mut partitions = vec![];
310 if let Some(range) = query_range {
311 if range.begin < self.insert_range.begin || range.end > self.insert_range.end {
312 anyhow::bail!("filtering from a result set that's not large enough");
313 }
314 for part in &self.partitions {
315 if *part.view_metadata.view_set_name == view_set_name
316 && *part.view_metadata.view_instance_id == view_instance_id
317 && part.begin_insert_time() < range.end
318 && part.end_insert_time() > range.begin
319 && part.view_metadata.file_schema_hash == file_schema_hash
320 {
321 partitions.push(part.clone());
322 }
323 }
324 } else {
325 for part in &self.partitions {
326 if *part.view_metadata.view_set_name == view_set_name
327 && *part.view_metadata.view_instance_id == view_instance_id
328 && part.view_metadata.file_schema_hash == file_schema_hash
329 {
330 partitions.push(part.clone());
331 }
332 }
333 }
334 Ok(partitions)
335 }
336}
337
338#[derive(Debug)]
340pub struct LivePartitionProvider {
341 db_pool: PgPool,
342}
343
344impl fmt::Display for LivePartitionProvider {
345 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
346 write!(f, "{self:?}")
347 }
348}
349
350impl LivePartitionProvider {
351 pub fn new(db_pool: PgPool) -> Self {
352 Self { db_pool }
353 }
354}
355
356#[async_trait]
357impl QueryPartitionProvider for LivePartitionProvider {
358 #[span_fn]
359 async fn fetch(
360 &self,
361 view_set_name: &str,
362 view_instance_id: &str,
363 query_range: Option<TimeRange>,
364 file_schema_hash: Vec<u8>,
365 ) -> Result<Vec<Partition>> {
366 let mut partitions = vec![];
367 let rows = if let Some(range) = query_range {
368 sqlx::query(
369 "SELECT view_set_name,
370 view_instance_id,
371 begin_insert_time,
372 end_insert_time,
373 min_event_time,
374 max_event_time,
375 updated,
376 file_path,
377 file_size,
378 file_schema_hash,
379 source_data_hash,
380 num_rows
381 FROM lakehouse_partitions
382 WHERE view_set_name = $1
383 AND view_instance_id = $2
384 AND min_event_time <= $3
385 AND max_event_time >= $4
386 AND file_schema_hash = $5
387 ORDER BY begin_insert_time, file_path
388 ;",
389 )
390 .bind(view_set_name)
391 .bind(view_instance_id)
392 .bind(range.end)
393 .bind(range.begin)
394 .bind(file_schema_hash)
395 .fetch_all(&self.db_pool)
396 .await
397 .with_context(|| "listing lakehouse partitions")?
398 } else {
399 sqlx::query(
400 "SELECT view_set_name,
401 view_instance_id,
402 begin_insert_time,
403 end_insert_time,
404 min_event_time,
405 max_event_time,
406 updated,
407 file_path,
408 file_size,
409 file_schema_hash,
410 source_data_hash,
411 num_rows
412 FROM lakehouse_partitions
413 WHERE view_set_name = $1
414 AND view_instance_id = $2
415 AND file_schema_hash = $3
416 ORDER BY begin_insert_time, file_path
417 ;",
418 )
419 .bind(view_set_name)
420 .bind(view_instance_id)
421 .bind(file_schema_hash)
422 .fetch_all(&self.db_pool)
423 .await
424 .with_context(|| "listing lakehouse partitions")?
425 };
426 for r in rows {
427 let view_metadata = ViewMetadata {
428 view_set_name: Arc::new(r.try_get("view_set_name")?),
429 view_instance_id: Arc::new(r.try_get("view_instance_id")?),
430 file_schema_hash: r.try_get("file_schema_hash")?,
431 };
432 let insert_time_range = TimeRange {
434 begin: r.try_get("begin_insert_time")?,
435 end: r.try_get("end_insert_time")?,
436 };
437 let event_time_range = match (
438 r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
439 r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
440 ) {
441 (Some(begin), Some(end)) => Some(TimeRange { begin, end }),
442 (None, None) => None, (Some(_), None) | (None, Some(_)) => {
444 anyhow::bail!(
445 "Corrupt partition record: only one of min/max_event_time is NULL"
446 );
447 }
448 };
449 let partition = Partition {
450 view_metadata,
451 insert_time_range,
452 event_time_range,
453 updated: r.try_get("updated")?,
454 file_path: r.try_get::<String, _>("file_path").ok(),
455 file_size: r.try_get("file_size")?,
456 source_data_hash: r.try_get("source_data_hash")?,
457 num_rows: r.try_get("num_rows")?,
458 };
459 partition
460 .validate()
461 .with_context(|| "validating partition from database")?;
462 partitions.push(partition);
463 }
464 Ok(partitions)
465 }
466}
467
468#[derive(Debug)]
470pub struct NullPartitionProvider {}
471
472impl fmt::Display for NullPartitionProvider {
473 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
474 write!(f, "{self:?}")
475 }
476}
477
478#[async_trait]
479impl QueryPartitionProvider for NullPartitionProvider {
480 async fn fetch(
481 &self,
482 _view_set_name: &str,
483 _view_instance_id: &str,
484 _query_range: Option<TimeRange>,
485 _file_schema_hash: Vec<u8>,
486 ) -> Result<Vec<Partition>> {
487 Ok(vec![])
488 }
489}