cat_gateway/service/common/auth/rbac/
scheme.rs1use std::{env, error::Error, sync::LazyLock, time::Duration};
3
4use catalyst_types::id_uri::IdUri;
5use futures::{TryFutureExt, TryStreamExt};
6use moka::future::Cache;
7use poem::{error::ResponseError, http::StatusCode, IntoResponse, Request};
8use poem_openapi::{auth::Bearer, payload::Json, SecurityScheme};
9use tracing::error;
10
11use super::token::CatalystRBACTokenV1;
12use crate::{
13 db::index::{
14 queries::rbac::get_rbac_registrations::{Query, QueryParams},
15 session::CassandraSession,
16 },
17 service::common::{
18 responses::{code_503_service_unavailable::ServiceUnavailable, ErrorResponses},
19 types::headers::retry_after::RetryAfterHeader,
20 },
21};
22
23pub type EncodedAuthToken = String;
25
26pub(crate) const AUTHORIZATION_HEADER: &str = "Authorization";
28
29#[allow(dead_code)]
32static CACHE: LazyLock<Cache<EncodedAuthToken, CatalystRBACTokenV1>> = LazyLock::new(|| {
33 Cache::builder()
34 .time_to_live(Duration::from_secs(30 * 60))
36 .time_to_idle(Duration::from_secs(5 * 60))
38 .build()
40});
41
42#[derive(SecurityScheme)]
44#[oai(
45 ty = "bearer",
46 key_name = "Authorization", bearer_format = "catalyst-rbac-token",
48 checker = "checker_api_catalyst_auth"
49)]
50#[allow(dead_code, clippy::module_name_repetitions)]
51pub struct CatalystRBACSecurityScheme(pub CatalystRBACTokenV1);
52
53#[derive(Debug, thiserror::Error)]
57#[error("Invalid Catalyst RBAC Auth Token")]
58pub struct AuthTokenError;
59
60impl ResponseError for AuthTokenError {
61 fn status(&self) -> StatusCode {
62 StatusCode::UNAUTHORIZED
63 }
64
65 fn as_response(&self) -> poem::Response
67 where Self: Error + Send + Sync + 'static {
68 ErrorResponses::unauthorized().into_response()
69 }
70}
71
72#[derive(Debug, thiserror::Error)]
76#[error("Insufficient Permission for Catalyst RBAC Token")]
77pub struct AuthTokenAccessViolation(Vec<String>);
78
79impl ResponseError for AuthTokenAccessViolation {
80 fn status(&self) -> StatusCode {
81 StatusCode::FORBIDDEN
82 }
83
84 fn as_response(&self) -> poem::Response
86 where Self: Error + Send + Sync + 'static {
87 ErrorResponses::forbidden(Some(self.0.clone())).into_response()
89 }
90}
91
92const MAX_TOKEN_AGE: Duration = Duration::from_secs(60 * 60); const MAX_TOKEN_SKEW: Duration = Duration::from_secs(5 * 60); async fn checker_api_catalyst_auth(
101 _req: &Request, bearer: Bearer,
102) -> poem::Result<CatalystRBACTokenV1> {
103 const RBAC_OFF: &str = "RBAC_OFF";
105
106 let token = CatalystRBACTokenV1::parse(&bearer.token).map_err(|e| {
108 error!("Corrupt auth token: {e:?}");
109 AuthTokenError
110 })?;
111
112 if env::var(RBAC_OFF).is_ok() {
114 return Ok(token);
115 };
116
117 let registrations = registrations(token.catalyst_id()).await?;
118 if registrations.is_empty() {
119 error!(
120 "Unable to find registrations for {} Catalyst ID",
121 token.catalyst_id()
122 );
123 return Err(AuthTokenError.into());
124 }
125
126 if !token.is_young(MAX_TOKEN_AGE, MAX_TOKEN_SKEW) {
127 error!("Auth token expired: {:?}", token);
129 Err(AuthTokenAccessViolation(vec!["EXPIRED".to_string()]))?;
130 }
131
132 let public_key = token.catalyst_id().role0_pk();
160 if let Err(error) = token.verify(&public_key) {
161 error!(error=%error, "Token Invalidly Signed");
162 Err(AuthTokenAccessViolation(vec![
163 "INVALID SIGNATURE".to_string()
164 ]))?;
165 }
166
167 Ok(token)
173}
174
175async fn registrations(catalyst_id: &IdUri) -> poem::Result<Vec<Query>> {
177 let session = CassandraSession::get(true).ok_or_else(|| {
178 error!("Failed to acquire db session");
179 let error = ServiceUnavailable::new(None);
180 ErrorResponses::ServiceUnavailable(Json(error), Some(RetryAfterHeader::default()))
181 })?;
182 Query::execute(&session, QueryParams {
183 catalyst_id: catalyst_id.clone().into(),
184 })
185 .and_then(|r| r.try_collect().map_err(Into::into))
186 .await
187 .map_err(|e| {
188 error!("Failed to get RBAC registrations for {catalyst_id} Catalyst ID: {e:?}",);
189 AuthTokenError.into()
190 })
191}