micromegas_telemetry_sink/
oidc_client_credentials_decorator.rs

1//! OIDC Client Credentials request decorator for service authentication
2//!
3//! Implements OAuth 2.0 client credentials flow for service-to-service authentication.
4//! Fetches access tokens from OIDC provider and caches them until expiration.
5
6use crate::request_decorator::{RequestDecorator, RequestDecoratorError, Result};
7use async_trait::async_trait;
8use serde::Deserialize;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use tokio::sync::Mutex;
12
13/// OIDC token response from client credentials flow
14#[derive(Debug, Clone, Deserialize)]
15struct TokenResponse {
16    access_token: String,
17    #[serde(default)]
18    expires_in: u64, // seconds, defaults to 0 if not present
19}
20
21/// Cached token with expiration
22#[derive(Debug, Clone)]
23struct CachedToken {
24    access_token: String,
25    expires_at: u64, // Unix timestamp
26}
27
28/// Request decorator that uses OIDC client credentials flow
29///
30/// Fetches access tokens from OIDC provider using client_id + client_secret,
31/// caches tokens until expiration, and adds them as Bearer tokens.
32pub struct OidcClientCredentialsDecorator {
33    token_endpoint: String,
34    client_id: String,
35    client_secret: String,
36    audience: Option<String>,
37    buffer_seconds: u64, // Token expiration buffer in seconds
38    client: reqwest::Client,
39    cached_token: Arc<Mutex<Option<CachedToken>>>,
40}
41
42impl OidcClientCredentialsDecorator {
43    /// Create from environment variables
44    ///
45    /// Reads:
46    /// - `MICROMEGAS_OIDC_TOKEN_ENDPOINT` - Token endpoint URL
47    /// - `MICROMEGAS_OIDC_CLIENT_ID` - Client ID
48    /// - `MICROMEGAS_OIDC_CLIENT_SECRET` - Client secret
49    /// - `MICROMEGAS_OIDC_AUDIENCE` - Audience (optional, required for Auth0/Azure AD)
50    /// - `MICROMEGAS_OIDC_TOKEN_BUFFER_SECONDS` - Token expiration buffer in seconds (optional, default: 180)
51    pub fn from_env() -> Result<Self> {
52        let token_endpoint = std::env::var("MICROMEGAS_OIDC_TOKEN_ENDPOINT").map_err(|_| {
53            RequestDecoratorError::Permanent("MICROMEGAS_OIDC_TOKEN_ENDPOINT not set".to_string())
54        })?;
55
56        let client_id = std::env::var("MICROMEGAS_OIDC_CLIENT_ID").map_err(|_| {
57            RequestDecoratorError::Permanent("MICROMEGAS_OIDC_CLIENT_ID not set".to_string())
58        })?;
59
60        let client_secret = std::env::var("MICROMEGAS_OIDC_CLIENT_SECRET").map_err(|_| {
61            RequestDecoratorError::Permanent("MICROMEGAS_OIDC_CLIENT_SECRET not set".to_string())
62        })?;
63
64        let audience = std::env::var("MICROMEGAS_OIDC_AUDIENCE").ok();
65
66        let buffer_seconds = std::env::var("MICROMEGAS_OIDC_TOKEN_BUFFER_SECONDS")
67            .ok()
68            .and_then(|s| s.parse().ok())
69            .unwrap_or(180); // Default: 3 minutes
70
71        Ok(Self::new(
72            token_endpoint,
73            client_id,
74            client_secret,
75            audience,
76            buffer_seconds,
77        ))
78    }
79
80    /// Create with explicit credentials
81    pub fn new(
82        token_endpoint: String,
83        client_id: String,
84        client_secret: String,
85        audience: Option<String>,
86        buffer_seconds: u64,
87    ) -> Self {
88        Self {
89            token_endpoint,
90            client_id,
91            client_secret,
92            audience,
93            buffer_seconds,
94            client: reqwest::Client::new(),
95            cached_token: Arc::new(Mutex::new(None)),
96        }
97    }
98
99    /// Fetch fresh token from OIDC provider
100    async fn fetch_token(&self) -> Result<CachedToken> {
101        let mut params = vec![
102            ("grant_type", "client_credentials"),
103            ("client_id", self.client_id.as_str()),
104            ("client_secret", self.client_secret.as_str()),
105        ];
106
107        // Add audience if provided (required for Auth0/Azure AD)
108        let audience_str;
109        if let Some(ref audience) = self.audience {
110            audience_str = audience.clone();
111            params.push(("audience", audience_str.as_str()));
112        }
113
114        let response = self
115            .client
116            .post(&self.token_endpoint)
117            .form(&params)
118            .send()
119            .await
120            .map_err(|e| {
121                RequestDecoratorError::Transient(format!("Failed to fetch token: {}", e))
122            })?;
123
124        if !response.status().is_success() {
125            let status = response.status();
126            let body = response.text().await.unwrap_or_default();
127            return Err(RequestDecoratorError::Permanent(format!(
128                "Token request failed with status {}: {}",
129                status, body
130            )));
131        }
132
133        let token_response: TokenResponse = response.json().await.map_err(|e| {
134            RequestDecoratorError::Permanent(format!("Failed to parse token response: {}", e))
135        })?;
136
137        // Calculate expiration time (with buffer)
138        let now = SystemTime::now()
139            .duration_since(UNIX_EPOCH)
140            .expect("time")
141            .as_secs();
142
143        // Apply buffer to avoid using tokens near expiration
144        let expires_in = token_response
145            .expires_in
146            .saturating_sub(self.buffer_seconds);
147        let expires_at = now + expires_in;
148
149        Ok(CachedToken {
150            access_token: token_response.access_token,
151            expires_at,
152        })
153    }
154
155    /// Get valid token (from cache or fetch new)
156    async fn get_token(&self) -> Result<String> {
157        // Check cache first
158        {
159            let cached = self.cached_token.lock().await;
160            if let Some(token) = &*cached {
161                let now = SystemTime::now()
162                    .duration_since(UNIX_EPOCH)
163                    .expect("time")
164                    .as_secs();
165                if token.expires_at > now {
166                    // Token still valid
167                    return Ok(token.access_token.clone());
168                }
169            }
170        }
171
172        // Token expired or not cached - fetch new one
173        let new_token = self.fetch_token().await?;
174        let access_token = new_token.access_token.clone();
175
176        // Update cache
177        {
178            let mut cached = self.cached_token.lock().await;
179            *cached = Some(new_token);
180        }
181
182        Ok(access_token)
183    }
184}
185
186#[async_trait]
187impl RequestDecorator for OidcClientCredentialsDecorator {
188    async fn decorate(&self, request: &mut reqwest::Request) -> Result<()> {
189        let token = self.get_token().await?;
190        let auth_value = format!("Bearer {}", token);
191
192        request.headers_mut().insert(
193            reqwest::header::AUTHORIZATION,
194            reqwest::header::HeaderValue::from_str(&auth_value).map_err(|e| {
195                RequestDecoratorError::Permanent(format!("Invalid token format: {}", e))
196            })?,
197        );
198
199        Ok(())
200    }
201}