cat_gateway/service/common/auth/rbac/
scheme.rs1use std::{env, error::Error, sync::LazyLock, time::Duration};
3
4use anyhow::{anyhow, Context};
5use cardano_blockchain_types::{Network, Point, Slot, TxnIndex};
6use cardano_chain_follower::ChainFollower;
7use catalyst_types::id_uri::IdUri;
8use ed25519_dalek::VerifyingKey;
9use futures::{TryFutureExt, TryStreamExt};
10use moka::future::Cache;
11use poem::{error::ResponseError, http::StatusCode, IntoResponse, Request};
12use poem_openapi::{auth::Bearer, payload::Json, SecurityScheme};
13use rbac_registration::{
14 cardano::cip509::{Cip509, RoleNumber},
15 registration::cardano::RegistrationChain,
16};
17use tracing::{error, warn};
18
19use super::token::CatalystRBACTokenV1;
20use crate::{
21 db::index::{
22 queries::rbac::get_rbac_registrations::{Query, QueryParams},
23 session::CassandraSession,
24 },
25 service::common::{
26 responses::{
27 code_500_internal_server_error::InternalServerError,
28 code_503_service_unavailable::ServiceUnavailable, ErrorResponses,
29 },
30 types::headers::retry_after::RetryAfterHeader,
31 },
32};
33
34pub type EncodedAuthToken = String;
36
37pub(crate) const AUTHORIZATION_HEADER: &str = "Authorization";
39
40#[allow(dead_code)]
44static CACHE: LazyLock<Cache<EncodedAuthToken, CatalystRBACTokenV1>> = LazyLock::new(|| {
45 Cache::builder()
46 .time_to_live(Duration::from_secs(30 * 60))
48 .time_to_idle(Duration::from_secs(5 * 60))
50 .build()
52});
53
54#[derive(SecurityScheme)]
56#[oai(
57 ty = "bearer",
58 key_name = "Authorization", bearer_format = "catalyst-rbac-token",
60 checker = "checker_api_catalyst_auth"
61)]
62#[allow(dead_code, clippy::module_name_repetitions)]
63pub struct CatalystRBACSecurityScheme(CatalystRBACTokenV1);
64
65impl From<CatalystRBACSecurityScheme> for IdUri {
66 fn from(value: CatalystRBACSecurityScheme) -> Self {
67 value.0.into()
68 }
69}
70
71#[derive(Debug, thiserror::Error)]
75#[error("Invalid Catalyst RBAC Auth Token")]
76pub struct AuthTokenError;
77
78impl ResponseError for AuthTokenError {
79 fn status(&self) -> StatusCode {
80 StatusCode::UNAUTHORIZED
81 }
82
83 fn as_response(&self) -> poem::Response
85 where Self: Error + Send + Sync + 'static {
86 ErrorResponses::unauthorized().into_response()
87 }
88}
89
90#[derive(Debug, thiserror::Error)]
94#[error("Insufficient Permission for Catalyst RBAC Token")]
95pub struct AuthTokenAccessViolation(Vec<String>);
96
97impl ResponseError for AuthTokenAccessViolation {
98 fn status(&self) -> StatusCode {
99 StatusCode::FORBIDDEN
100 }
101
102 fn as_response(&self) -> poem::Response
104 where Self: Error + Send + Sync + 'static {
105 ErrorResponses::forbidden(Some(self.0.clone())).into_response()
107 }
108}
109
110const MAX_TOKEN_AGE: Duration = Duration::from_secs(60 * 60); const MAX_TOKEN_SKEW: Duration = Duration::from_secs(5 * 60); async fn checker_api_catalyst_auth(
121 _req: &Request, bearer: Bearer,
122) -> poem::Result<CatalystRBACTokenV1> {
123 const RBAC_OFF: &str = "RBAC_OFF";
125
126 let token = CatalystRBACTokenV1::parse(&bearer.token).map_err(|e| {
128 error!("Corrupt auth token: {e:?}");
129 AuthTokenError
130 })?;
131
132 if env::var(RBAC_OFF).is_ok() {
134 return Ok(token);
135 };
136
137 let registrations = indexed_registrations(token.catalyst_id()).await?;
138 if registrations.is_empty() {
140 error!(
141 "Unable to find registrations for {} Catalyst ID",
142 token.catalyst_id()
143 );
144 return Err(AuthTokenError.into());
145 }
146
147 if !token.is_young(MAX_TOKEN_AGE, MAX_TOKEN_SKEW) {
149 error!("Auth token expired: {token}");
151 Err(AuthTokenAccessViolation(vec!["EXPIRED".to_string()]))?;
152 }
153
154 let public_key = last_signing_key(token.network(), ®istrations)
169 .await
170 .map_err(|e| {
171 error!(
172 "Unable to get last signing key for {} Catalyst ID: {e:?}",
173 token.catalyst_id()
174 );
175 AuthTokenError
176 })?;
177
178 if let Err(error) = token.verify(&public_key) {
180 error!(error=%error, "Invalid signature for token: {token}");
181 Err(AuthTokenAccessViolation(vec![
182 "INVALID SIGNATURE".to_string()
183 ]))?;
184 }
185
186 Ok(token)
197}
198
199async fn indexed_registrations(catalyst_id: &IdUri) -> poem::Result<Vec<Query>> {
202 let session = CassandraSession::get(true).ok_or_else(|| {
203 error!("Failed to acquire db session");
204 service_unavailable()
205 })?;
206
207 let mut result: Vec<_> = Query::execute(&session, QueryParams {
208 catalyst_id: catalyst_id.clone().into(),
209 })
210 .and_then(|r| r.try_collect().map_err(Into::into))
211 .await
212 .map_err(|e| {
213 error!("Failed to get RBAC registrations for {catalyst_id} Catalyst ID: {e:?}");
214 if e.is::<bb8::RunError<tokio_postgres::Error>>() {
215 service_unavailable()
216 } else {
217 let error = InternalServerError::new(None);
218 error!(id=%error.id(), error=?e);
219 ErrorResponses::ServerError(Json(error)).into()
220 }
221 })?;
222
223 result.sort_by_key(|r| r.slot_no);
224 Ok(result)
225}
226
227fn service_unavailable() -> poem::Error {
229 let error = ServiceUnavailable::new(None);
230 ErrorResponses::ServiceUnavailable(Json(error), Some(RetryAfterHeader::default())).into()
231}
232
233async fn last_signing_key(
235 network: Network, indexed_registrations: &[Query],
236) -> anyhow::Result<VerifyingKey> {
237 let chain = registration_chain(network, indexed_registrations)
238 .await
239 .context("Failed to build registration chain")?;
240 chain
241 .get_latest_signing_pk_for_role(&RoleNumber::ROLE_0)
242 .ok_or(anyhow!("Cannot find latest role 0 public key"))
243 .map(|(pk, _)| pk)
244}
245
246async fn registration_chain(
248 network: Network, indexed_registrations: &[Query],
249) -> anyhow::Result<RegistrationChain> {
250 let mut indexed_registrations = indexed_registrations.iter();
251 let Some(root) = indexed_registrations.next() else {
252 return Err(anyhow!("Empty registrations list"));
254 };
255
256 let root = registration(network, root.slot_no.into(), root.txn_index.into())
257 .await
258 .context("Failed to get root registration")?;
259 let mut chain = RegistrationChain::new(root).context("Invalid root registration")?;
260
261 for reg in indexed_registrations {
262 let cip509 = registration(network, reg.slot_no.into(), reg.txn_index.into())
265 .await
266 .with_context(|| {
267 format!(
268 "Invalid or missing registration at {:?} block {:?} transaction",
269 reg.slot_no, reg.txn_index,
270 )
271 })?;
272 match chain.update(cip509) {
273 Ok(c) => chain = c,
274 Err(e) => {
275 warn!(
278 "Unable to apply registration from {:?} block {:?} txn index: {e:?}",
279 reg.slot_no, reg.txn_index
280 );
281 },
282 }
283 }
284
285 Ok(chain)
286}
287
288async fn registration(network: Network, slot: Slot, txn_index: TxnIndex) -> anyhow::Result<Cip509> {
290 let point = Point::fuzzy(slot);
291 let block = ChainFollower::get_block(network, point)
292 .await
293 .context("Unable to get block")?
294 .data;
295 if block.point().slot_or_default() != slot {
296 return Err(anyhow!("Unable to find exact block"));
299 }
300 Cip509::new(&block, txn_index, &[])
301 .context("Invalid RBAC registration")?
302 .context("No RBAC registration at this block and txn index")
303}