cat_gateway/service/common/auth/rbac/
scheme.rs

1//! Catalyst RBAC Security Scheme
2use std::{env, error::Error, sync::LazyLock, time::Duration};
3
4use anyhow::{anyhow, Context};
5use cardano_blockchain_types::{Network, Point, Slot, TxnIndex};
6use cardano_chain_follower::ChainFollower;
7use catalyst_types::id_uri::IdUri;
8use ed25519_dalek::VerifyingKey;
9use futures::{TryFutureExt, TryStreamExt};
10use moka::future::Cache;
11use poem::{error::ResponseError, http::StatusCode, IntoResponse, Request};
12use poem_openapi::{auth::Bearer, payload::Json, SecurityScheme};
13use rbac_registration::{
14    cardano::cip509::{Cip509, RoleNumber},
15    registration::cardano::RegistrationChain,
16};
17use tracing::{error, warn};
18
19use super::token::CatalystRBACTokenV1;
20use crate::{
21    db::index::{
22        queries::rbac::get_rbac_registrations::{Query, QueryParams},
23        session::CassandraSession,
24    },
25    service::common::{
26        responses::{
27            code_500_internal_server_error::InternalServerError,
28            code_503_service_unavailable::ServiceUnavailable, ErrorResponses,
29        },
30        types::headers::retry_after::RetryAfterHeader,
31    },
32};
33
34/// Auth token in the form of catv1..
35pub type EncodedAuthToken = String;
36
37/// The header name that holds the authorization RBAC token
38pub(crate) const AUTHORIZATION_HEADER: &str = "Authorization";
39
40/// Cached auth tokens
41// TODO: Caching is currently disabled because we want to measure the performance without it. See
42// https://github.com/input-output-hk/catalyst-voices/issues/1940 for more details.
43#[allow(dead_code)]
44static CACHE: LazyLock<Cache<EncodedAuthToken, CatalystRBACTokenV1>> = LazyLock::new(|| {
45    Cache::builder()
46        // Time to live (TTL): 30 minutes
47        .time_to_live(Duration::from_secs(30 * 60))
48        // Time to idle (TTI):  5 minutes
49        .time_to_idle(Duration::from_secs(5 * 60))
50        // Create the cache.
51        .build()
52});
53
54/// Catalyst RBAC Access Token
55#[derive(SecurityScheme)]
56#[oai(
57    ty = "bearer",
58    key_name = "Authorization", // MUST match the `AUTHORIZATION_HEADER` constant.
59    bearer_format = "catalyst-rbac-token",
60    checker = "checker_api_catalyst_auth"
61)]
62#[allow(dead_code, clippy::module_name_repetitions)]
63pub struct CatalystRBACSecurityScheme(CatalystRBACTokenV1);
64
65impl From<CatalystRBACSecurityScheme> for IdUri {
66    fn from(value: CatalystRBACSecurityScheme) -> Self {
67        value.0.into()
68    }
69}
70
71/// Error with the Authorization Token
72///
73/// We can not parse it, so its a 401 response.
74#[derive(Debug, thiserror::Error)]
75#[error("Invalid Catalyst RBAC Auth Token")]
76pub struct AuthTokenError;
77
78impl ResponseError for AuthTokenError {
79    fn status(&self) -> StatusCode {
80        StatusCode::UNAUTHORIZED
81    }
82
83    /// Convert this error to a HTTP response.
84    fn as_response(&self) -> poem::Response
85    where Self: Error + Send + Sync + 'static {
86        ErrorResponses::unauthorized().into_response()
87    }
88}
89
90/// Token does not have required access rights
91///
92/// Not enough access rights, so its a 403 response.
93#[derive(Debug, thiserror::Error)]
94#[error("Insufficient Permission for Catalyst RBAC Token")]
95pub struct AuthTokenAccessViolation(Vec<String>);
96
97impl ResponseError for AuthTokenAccessViolation {
98    fn status(&self) -> StatusCode {
99        StatusCode::FORBIDDEN
100    }
101
102    /// Convert this error to a HTTP response.
103    fn as_response(&self) -> poem::Response
104    where Self: Error + Send + Sync + 'static {
105        // TODO: Actually check permissions needed for an endpoint.
106        ErrorResponses::forbidden(Some(self.0.clone())).into_response()
107    }
108}
109
110/// Time in the past the Token can be valid for.
111const MAX_TOKEN_AGE: Duration = Duration::from_secs(60 * 60); // 1 hour.
112
113/// Time in the future the Token can be valid for.
114const MAX_TOKEN_SKEW: Duration = Duration::from_secs(5 * 60); // 5 minutes
115
116/// When added to an endpoint, this hook is called per request to verify the bearer token
117/// is valid. The performed validation is described [here].
118///
119/// [here]: https://github.com/input-output-hk/catalyst-voices/blob/main/docs/src/catalyst-standards/permissionless-auth/auth-header.md#backend-processing-of-the-token
120async fn checker_api_catalyst_auth(
121    _req: &Request, bearer: Bearer,
122) -> poem::Result<CatalystRBACTokenV1> {
123    /// Temporary: Conditional RBAC for testing
124    const RBAC_OFF: &str = "RBAC_OFF";
125
126    // Deserialize the token: this performs the 1-5 steps of the validation.
127    let token = CatalystRBACTokenV1::parse(&bearer.token).map_err(|e| {
128        error!("Corrupt auth token: {e:?}");
129        AuthTokenError
130    })?;
131
132    // If env var explicitly set by SRE, switch off full verification
133    if env::var(RBAC_OFF).is_ok() {
134        return Ok(token);
135    };
136
137    let registrations = indexed_registrations(token.catalyst_id()).await?;
138    // Step 6: return 401 if the token isn't known.
139    if registrations.is_empty() {
140        error!(
141            "Unable to find registrations for {} Catalyst ID",
142            token.catalyst_id()
143        );
144        return Err(AuthTokenError.into());
145    }
146
147    // Step 7: Verify that the nonce is in the acceptable range.
148    if !token.is_young(MAX_TOKEN_AGE, MAX_TOKEN_SKEW) {
149        // Token is too old or too far in the future.
150        error!("Auth token expired: {token}");
151        Err(AuthTokenAccessViolation(vec!["EXPIRED".to_string()]))?;
152    }
153
154    // TODO: Caching is currently disabled because we want to measure the performance without
155    // it.
156    // // Its valid and young enough, check if its in the auth cache.
157    // // This get() will extend the entry life for another 5 minutes.
158    // // Even though we keep calling get(), the entry will expire
159    // // after 30 minutes (TTL) from the origin insert().
160    // // This is an optimization which saves us constantly looking up registrations we have
161    // // already validated.
162    // if let Some(token) = CACHE.get(&bearer.token).await {
163    //     return Ok(token);
164    // }
165
166    // Step 8: get the latest stable signing certificate registered for Role 0.
167
168    let public_key = last_signing_key(token.network(), &registrations)
169        .await
170        .map_err(|e| {
171            error!(
172                "Unable to get last signing key for {} Catalyst ID: {e:?}",
173                token.catalyst_id()
174            );
175            AuthTokenError
176        })?;
177
178    // Step 9: Verify the signature.
179    if let Err(error) = token.verify(&public_key) {
180        error!(error=%error, "Invalid signature for token: {token}");
181        Err(AuthTokenAccessViolation(vec![
182            "INVALID SIGNATURE".to_string()
183        ]))?;
184    }
185
186    // Step 10 is optional and isn't currently implemented.
187    //   - Get the latest unstable signing certificate registered for Role 0.
188    //   - Verify the signature against the Role 0 Public Key and Algorithm identified by the
189    //     certificate. If this fails, return 403.
190
191    // TODO: Caching is currently disabled because we want to measure the performance without
192    // it.
193    // // This entry will expire after 5 minutes (TTI) if there is no more ().
194    // CACHE.insert(bearer.token, token.clone()).await;
195
196    Ok(token)
197}
198
199/// Returns a sorted list of all registrations for the given Catalyst ID from the
200/// database.
201async fn indexed_registrations(catalyst_id: &IdUri) -> poem::Result<Vec<Query>> {
202    let session = CassandraSession::get(true).ok_or_else(|| {
203        error!("Failed to acquire db session");
204        service_unavailable()
205    })?;
206
207    let mut result: Vec<_> = Query::execute(&session, QueryParams {
208        catalyst_id: catalyst_id.clone().into(),
209    })
210    .and_then(|r| r.try_collect().map_err(Into::into))
211    .await
212    .map_err(|e| {
213        error!("Failed to get RBAC registrations for {catalyst_id} Catalyst ID: {e:?}");
214        if e.is::<bb8::RunError<tokio_postgres::Error>>() {
215            service_unavailable()
216        } else {
217            let error = InternalServerError::new(None);
218            error!(id=%error.id(), error=?e);
219            ErrorResponses::ServerError(Json(error)).into()
220        }
221    })?;
222
223    result.sort_by_key(|r| r.slot_no);
224    Ok(result)
225}
226
227/// Returns a 503 error instance.
228fn service_unavailable() -> poem::Error {
229    let error = ServiceUnavailable::new(None);
230    ErrorResponses::ServiceUnavailable(Json(error), Some(RetryAfterHeader::default())).into()
231}
232
233/// Returns the last signing key from the registration chain.
234async fn last_signing_key(
235    network: Network, indexed_registrations: &[Query],
236) -> anyhow::Result<VerifyingKey> {
237    let chain = registration_chain(network, indexed_registrations)
238        .await
239        .context("Failed to build registration chain")?;
240    chain
241        .get_latest_signing_pk_for_role(&RoleNumber::ROLE_0)
242        .ok_or(anyhow!("Cannot find latest role 0 public key"))
243        .map(|(pk, _)| pk)
244}
245
246/// Build a registration chain from the given indexed data.
247async fn registration_chain(
248    network: Network, indexed_registrations: &[Query],
249) -> anyhow::Result<RegistrationChain> {
250    let mut indexed_registrations = indexed_registrations.iter();
251    let Some(root) = indexed_registrations.next() else {
252        // We already checked that the registrations aren't empty, so we shouldn't get there.
253        return Err(anyhow!("Empty registrations list"));
254    };
255
256    let root = registration(network, root.slot_no.into(), root.txn_index.into())
257        .await
258        .context("Failed to get root registration")?;
259    let mut chain = RegistrationChain::new(root).context("Invalid root registration")?;
260
261    for reg in indexed_registrations {
262        // We only store valid registrations in this table, so an error here indicates a bug in
263        // our indexing logic.
264        let cip509 = registration(network, reg.slot_no.into(), reg.txn_index.into())
265            .await
266            .with_context(|| {
267                format!(
268                    "Invalid or missing registration at {:?} block {:?} transaction",
269                    reg.slot_no, reg.txn_index,
270                )
271            })?;
272        match chain.update(cip509) {
273            Ok(c) => chain = c,
274            Err(e) => {
275                // This isn't a hard error because while the individual registration can be valid it
276                // can be invalid in the context of the whole registration chain.
277                warn!(
278                    "Unable to apply registration from {:?} block {:?} txn index: {e:?}",
279                    reg.slot_no, reg.txn_index
280                );
281            },
282        }
283    }
284
285    Ok(chain)
286}
287
288/// Returns a RBAC registration from the given block and slot.
289async fn registration(network: Network, slot: Slot, txn_index: TxnIndex) -> anyhow::Result<Cip509> {
290    let point = Point::fuzzy(slot);
291    let block = ChainFollower::get_block(network, point)
292        .await
293        .context("Unable to get block")?
294        .data;
295    if block.point().slot_or_default() != slot {
296        // The `ChainFollower::get_block` function can return the next consecutive block if it
297        // cannot find the exact one. This shouldn't happen, but we need to check anyway.
298        return Err(anyhow!("Unable to find exact block"));
299    }
300    Cip509::new(&block, txn_index, &[])
301        .context("Invalid RBAC registration")?
302        .context("No RBAC registration at this block and txn index")
303}