cat_gateway/service/common/auth/rbac/
scheme.rs

1//! Catalyst RBAC Security Scheme
2use std::{env, error::Error, sync::LazyLock, time::Duration};
3
4use catalyst_types::id_uri::IdUri;
5use futures::{TryFutureExt, TryStreamExt};
6use moka::future::Cache;
7use poem::{error::ResponseError, http::StatusCode, IntoResponse, Request};
8use poem_openapi::{auth::Bearer, payload::Json, SecurityScheme};
9use tracing::error;
10
11use super::token::CatalystRBACTokenV1;
12use crate::{
13    db::index::{
14        queries::rbac::get_rbac_registrations::{Query, QueryParams},
15        session::CassandraSession,
16    },
17    service::common::{
18        responses::{code_503_service_unavailable::ServiceUnavailable, ErrorResponses},
19        types::headers::retry_after::RetryAfterHeader,
20    },
21};
22
23/// Auth token in the form of catv1..
24pub type EncodedAuthToken = String;
25
26/// The header name that holds the authorization RBAC token
27pub(crate) const AUTHORIZATION_HEADER: &str = "Authorization";
28
29/// Cached auth tokens
30// TODO: Caching is currently disabled because we want to measure the performance without it.
31#[allow(dead_code)]
32static CACHE: LazyLock<Cache<EncodedAuthToken, CatalystRBACTokenV1>> = LazyLock::new(|| {
33    Cache::builder()
34        // Time to live (TTL): 30 minutes
35        .time_to_live(Duration::from_secs(30 * 60))
36        // Time to idle (TTI):  5 minutes
37        .time_to_idle(Duration::from_secs(5 * 60))
38        // Create the cache.
39        .build()
40});
41
42/// Catalyst RBAC Access Token
43#[derive(SecurityScheme)]
44#[oai(
45    ty = "bearer",
46    key_name = "Authorization", // MUST match the `AUTHORIZATION_HEADER` constant.
47    bearer_format = "catalyst-rbac-token",
48    checker = "checker_api_catalyst_auth"
49)]
50#[allow(dead_code, clippy::module_name_repetitions)]
51pub struct CatalystRBACSecurityScheme(pub CatalystRBACTokenV1);
52
53/// Error with the Authorization Token
54///
55/// We can not parse it, so its a 401 response.
56#[derive(Debug, thiserror::Error)]
57#[error("Invalid Catalyst RBAC Auth Token")]
58pub struct AuthTokenError;
59
60impl ResponseError for AuthTokenError {
61    fn status(&self) -> StatusCode {
62        StatusCode::UNAUTHORIZED
63    }
64
65    /// Convert this error to a HTTP response.
66    fn as_response(&self) -> poem::Response
67    where Self: Error + Send + Sync + 'static {
68        ErrorResponses::unauthorized().into_response()
69    }
70}
71
72/// Token does not have required access rights
73///
74/// Not enough access rights, so its a 403 response.
75#[derive(Debug, thiserror::Error)]
76#[error("Insufficient Permission for Catalyst RBAC Token")]
77pub struct AuthTokenAccessViolation(Vec<String>);
78
79impl ResponseError for AuthTokenAccessViolation {
80    fn status(&self) -> StatusCode {
81        StatusCode::FORBIDDEN
82    }
83
84    /// Convert this error to a HTTP response.
85    fn as_response(&self) -> poem::Response
86    where Self: Error + Send + Sync + 'static {
87        // TODO: Actually check permissions needed for an endpoint.
88        ErrorResponses::forbidden(Some(self.0.clone())).into_response()
89    }
90}
91
92/// Time in the past the Token can be valid for.
93const MAX_TOKEN_AGE: Duration = Duration::from_secs(60 * 60); // 1 hour.
94
95/// Time in the future the Token can be valid for.
96const MAX_TOKEN_SKEW: Duration = Duration::from_secs(5 * 60); // 5 minutes
97
98/// When added to an endpoint, this hook is called per request to verify the bearer token
99/// is valid.
100async fn checker_api_catalyst_auth(
101    _req: &Request, bearer: Bearer,
102) -> poem::Result<CatalystRBACTokenV1> {
103    /// Temporary: Conditional RBAC for testing
104    const RBAC_OFF: &str = "RBAC_OFF";
105
106    // First check the token can be deserialized.
107    let token = CatalystRBACTokenV1::parse(&bearer.token).map_err(|e| {
108        error!("Corrupt auth token: {e:?}");
109        AuthTokenError
110    })?;
111
112    // If env var explicitly set by SRE, switch off full verification
113    if env::var(RBAC_OFF).is_ok() {
114        return Ok(token);
115    };
116
117    let registrations = registrations(token.catalyst_id()).await?;
118    if registrations.is_empty() {
119        error!(
120            "Unable to find registrations for {} Catalyst ID",
121            token.catalyst_id()
122        );
123        return Err(AuthTokenError.into());
124    }
125
126    if !token.is_young(MAX_TOKEN_AGE, MAX_TOKEN_SKEW) {
127        // Token is too old or too far in the future.
128        error!("Auth token expired: {:?}", token);
129        Err(AuthTokenAccessViolation(vec!["EXPIRED".to_string()]))?;
130    }
131
132    // TODO: Caching is currently disabled because we want to measure the performance without
133    // it.
134    // // Its valid and young enough, check if its in the auth cache.
135    // // This get() will extend the entry life for another 5 minutes.
136    // // Even though we keep calling get(), the entry will expire
137    // // after 30 minutes (TTL) from the origin insert().
138    // // This is an optimization which saves us constantly looking up registrations we have
139    // // already validated.
140    // if let Some(token) = CACHE.get(&bearer.token).await {
141    //     return Ok(token);
142    // }
143
144    // TODO: These steps must be implemented.
145    // - Get the latest stable signing certificate registered for Role 0.
146    // - Verify the signature against the Role 0 Public Key and Algorithm identified by the
147    //   certificate. Check signature length is correct for the defined algorithm, before
148    //   checking if the signature is valid. If this fails, return 403.
149    // - OPTIONAL IF authorization against latest unstable is supported:
150    //     1. Get the latest unstable signing certificate registered for Role 0.
151    //     2. Verify the signature against the Role 0 Public Key and Algorithm identified by
152    //        the certificate. If this fails, return 403.
153
154    // TODO: The following is incorrect because while a Catalyst ID is strictly identifies the
155    // initial Role 0 public key. However, the token is signed with the latest ACTIVE Role 0
156    // Public Key.
157
158    // Verify the token signature using the public key.
159    let public_key = token.catalyst_id().role0_pk();
160    if let Err(error) = token.verify(&public_key) {
161        error!(error=%error, "Token Invalidly Signed");
162        Err(AuthTokenAccessViolation(vec![
163            "INVALID SIGNATURE".to_string()
164        ]))?;
165    }
166
167    // TODO: Caching is currently disabled because we want to measure the performance without
168    // it.
169    // // This entry will expire after 5 minutes (TTI) if there is no more ().
170    // CACHE.insert(bearer.token, token.clone()).await;
171
172    Ok(token)
173}
174
175/// Returns a list of all registrations for the given Catalyst ID.
176async fn registrations(catalyst_id: &IdUri) -> poem::Result<Vec<Query>> {
177    let session = CassandraSession::get(true).ok_or_else(|| {
178        error!("Failed to acquire db session");
179        let error = ServiceUnavailable::new(None);
180        ErrorResponses::ServiceUnavailable(Json(error), Some(RetryAfterHeader::default()))
181    })?;
182    Query::execute(&session, QueryParams {
183        catalyst_id: catalyst_id.clone().into(),
184    })
185    .and_then(|r| r.try_collect().map_err(Into::into))
186    .await
187    .map_err(|e| {
188        error!("Failed to get RBAC registrations for {catalyst_id} Catalyst ID: {e:?}",);
189        AuthTokenError.into()
190    })
191}