micromegas_analytics/
time.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, TimeDelta, Utc};
5use datafusion::scalar::ScalarValue;
6use micromegas_telemetry::types::block::BlockMetadata;
7use sqlx::Row;
8
9use crate::metadata::ProcessMetadata;
10
11const NANOS_PER_SEC: f64 = 1000.0 * 1000.0 * 1000.0;
12
13/// A time range, with a beginning and an end.
14#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
15pub struct TimeRange {
16    pub begin: DateTime<Utc>,
17    pub end: DateTime<Utc>,
18}
19
20impl TimeRange {
21    pub fn new(begin: DateTime<Utc>, end: DateTime<Utc>) -> Self {
22        Self { begin, end }
23    }
24}
25
26/// Creates a `ConvertTicks` from a database connection.
27pub async fn make_time_converter_from_db(
28    pool: &sqlx::Pool<sqlx::Postgres>,
29    process: &ProcessMetadata,
30) -> Result<ConvertTicks> {
31    if process.tsc_frequency > 0 {
32        // we have a good tsc freq provided
33        return ConvertTicks::from_meta_data(
34            process.start_ticks,
35            process.start_time.timestamp_nanos_opt().unwrap_or_default(),
36            process.tsc_frequency,
37        );
38    }
39    // we need to estimate the tsc frequency
40    let row = sqlx::query(
41        "SELECT end_time, end_ticks
42         FROM blocks
43         WHERE process_id = $1
44         ORDER BY end_time DESC
45         LIMIT 1",
46    )
47    .bind(process.process_id)
48    .fetch_one(pool)
49    .await
50    .with_context(|| "getting last block end time for tsc estimation")?;
51    let end_time: chrono::DateTime<chrono::Utc> = row.try_get("end_time")?;
52    let relative_end_ticks: i64 = row.try_get("end_ticks")?;
53    let delta_time = end_time - process.start_time;
54    let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
55    let ticks_per_second = relative_end_ticks as f64 / nb_seconds;
56    ConvertTicks::from_meta_data(
57        process.start_ticks,
58        process.start_time.timestamp_nanos_opt().unwrap_or_default(),
59        ticks_per_second.round() as i64,
60    )
61}
62
63/// Creates a `ConvertTicks` from a block's metadata.
64pub fn make_time_converter_from_block_meta(
65    process: &ProcessMetadata,
66    block: &BlockMetadata,
67) -> Result<ConvertTicks> {
68    if process.tsc_frequency > 0 {
69        // we have a good tsc freq provided
70        return ConvertTicks::from_meta_data(
71            process.start_ticks,
72            process.start_time.timestamp_nanos_opt().unwrap_or_default(),
73            process.tsc_frequency,
74        );
75    }
76    let delta_time = block.end_time - process.start_time;
77    let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
78    let ticks_per_second = block.end_ticks as f64 / nb_seconds;
79    ConvertTicks::from_meta_data(
80        process.start_ticks,
81        process.start_time.timestamp_nanos_opt().unwrap_or_default(),
82        ticks_per_second.round() as i64,
83    )
84}
85
86/// Creates a `ConvertTicks` using the latest timing information from the process.
87/// This should be used instead of per-block timing to ensure consistent tick conversion
88/// across all blocks from the same process.
89pub fn make_time_converter_from_latest_timing(
90    process: &ProcessMetadata,
91    last_block_end_ticks: i64,
92    last_block_end_time: chrono::DateTime<chrono::Utc>,
93) -> Result<ConvertTicks> {
94    if process.tsc_frequency > 0 {
95        // we have a good tsc freq provided
96        return ConvertTicks::from_meta_data(
97            process.start_ticks,
98            process.start_time.timestamp_nanos_opt().unwrap_or_default(),
99            process.tsc_frequency,
100        );
101    }
102    // Calculate frequency using the latest timing data from the process
103    let delta_time = last_block_end_time - process.start_time;
104    let nb_seconds = delta_time.num_nanoseconds().unwrap_or_default() as f64 / 1_000_000_000.0;
105    let ticks_per_second = last_block_end_ticks as f64 / nb_seconds;
106    ConvertTicks::from_meta_data(
107        process.start_ticks,
108        process.start_time.timestamp_nanos_opt().unwrap_or_default(),
109        ticks_per_second.round() as i64,
110    )
111}
112
113/// ConvertTicks helps converting between a process's tick count and more convenient date/time representations
114#[derive(Debug, Clone)]
115pub struct ConvertTicks {
116    tick_offset: i64,
117    process_start_ns: i64,
118    frequency: i64, // ticks per second
119    inv_tsc_frequency_ns: f64,
120    inv_tsc_frequency_ms: f64,
121}
122
123impl ConvertTicks {
124    pub fn from_meta_data(start_ticks: i64, process_start_ns: i64, frequency: i64) -> Result<Self> {
125        if frequency <= 0 {
126            anyhow::bail!("invalid frequency")
127        }
128        Ok(Self {
129            tick_offset: start_ticks,
130            process_start_ns,
131            frequency,
132            inv_tsc_frequency_ns: get_tsc_frequency_inverse_ns(frequency),
133            inv_tsc_frequency_ms: get_tsc_frequency_inverse_ms(frequency),
134        })
135    }
136
137    /// Get the frequency used for tick conversion
138    pub fn get_frequency(&self) -> i64 {
139        self.frequency
140    }
141
142    /// from relative time to relative tick count
143    pub fn to_ticks(&self, delta: TimeDelta) -> i64 {
144        let mut seconds = delta.num_seconds() as f64;
145        seconds += delta.subsec_nanos() as f64 / NANOS_PER_SEC;
146        let freq = self.frequency as f64;
147        (seconds * freq).round() as i64
148    }
149
150    /// from absolute ticks to absolute nanoseconds
151    pub fn ticks_to_nanoseconds(&self, ticks: i64) -> i64 {
152        let delta = (ticks - self.tick_offset) as f64;
153        let ns_since_process_start = (delta * self.inv_tsc_frequency_ns).round() as i64;
154        self.process_start_ns + ns_since_process_start
155    }
156
157    /// from relative ticks to absolute date/time
158    pub fn delta_ticks_to_time(&self, delta: i64) -> DateTime<Utc> {
159        let ns_since_process_start = (delta as f64 * self.inv_tsc_frequency_ns).round() as i64;
160        DateTime::from_timestamp_nanos(self.process_start_ns + ns_since_process_start)
161    }
162
163    /// from relative ticks to absolute nanoseconds
164    pub fn delta_ticks_to_ns(&self, delta: i64) -> i64 {
165        let ns_since_process_start = (delta as f64 * self.inv_tsc_frequency_ns).round() as i64;
166        self.process_start_ns + ns_since_process_start
167    }
168
169    /// from relative ticks to relative milliseconds
170    pub fn delta_ticks_to_ms(&self, delta_ticks: i64) -> f64 {
171        let delta = delta_ticks as f64;
172        delta * self.inv_tsc_frequency_ms
173    }
174
175    /// from time to relative ticks
176    pub fn time_to_delta_ticks(&self, time: DateTime<Utc>) -> i64 {
177        self.to_ticks(time - DateTime::from_timestamp_nanos(self.process_start_ns))
178    }
179}
180
181/// Returns the inverse of the TSC frequency in milliseconds.
182#[allow(clippy::cast_precision_loss)]
183pub fn get_tsc_frequency_inverse_ms(tsc_frequency: i64) -> f64 {
184    1000.0 / tsc_frequency as f64
185}
186
187/// Returns the inverse of the TSC frequency in nanoseconds.
188#[allow(clippy::cast_precision_loss)]
189pub fn get_tsc_frequency_inverse_ns(tsc_frequency: i64) -> f64 {
190    NANOS_PER_SEC / tsc_frequency as f64
191}
192
193/// Converts a `DateTime<Utc>` to a `ScalarValue`.
194pub fn datetime_to_scalar(v: DateTime<Utc>) -> ScalarValue {
195    lazy_static::lazy_static! {
196        static ref UTC_OFFSET: Arc<str> = Arc::from("+00:00");
197    }
198    ScalarValue::TimestampNanosecond(v.timestamp_nanos_opt(), Some(UTC_OFFSET.clone()))
199}