cat_gateway/service/utilities/middleware/
tracing_mw.rs1use std::time::Instant;
4
5use cpu_time::ProcessTime; use poem::{
7 http::{header, HeaderMap, StatusCode},
8 web::RealIp,
9 Endpoint, Error, FromRequest, IntoResponse, Middleware, PathPattern, Request, Response, Result,
10};
11use poem_openapi::OperationId;
12use tracing::{error, field, warn, Instrument, Level, Span};
13use ulid::Ulid;
14use uuid::Uuid;
15
16use crate::{
17 metrics::endpoint::{
18 CLIENT_REQUEST_COUNT, HTTP_REQUEST_COUNT, HTTP_REQ_CPU_TIME_MS, HTTP_REQ_DURATION_MS,
19 NOT_FOUND_COUNT,
20 },
21 settings::Settings,
22 utils::blake2b_hash::generate_uuid_string_from_data,
23};
24
25#[derive(Default)]
53pub(crate) struct Tracing;
54
55impl<E: Endpoint> Middleware<E> for Tracing {
56 type Output = TracingEndpoint<E>;
57
58 fn transform(&self, ep: E) -> Self::Output {
59 TracingEndpoint { inner: ep }
60 }
61}
62
63pub(crate) struct TracingEndpoint<E> {
65 inner: E,
67}
68
69fn anonymize_ip_address(remote_addr: &str) -> String {
71 let addr: Vec<String> = vec![remote_addr.to_string()];
72 generate_uuid_string_from_data(Settings::client_id_key(), &addr)
73}
74
75async fn anonymous_client_id(req: &Request) -> String {
83 let remote_addr = RealIp::from_request_without_body(req)
84 .await
85 .ok()
86 .and_then(|real_ip| real_ip.0)
87 .map_or_else(|| req.remote_addr().to_string(), |addr| addr.to_string());
88
89 anonymize_ip_address(&remote_addr)
90}
91
92struct ResponseData {
94 duration: f64,
96 cpu_time: f64,
98 status_code: u16,
100 endpoint: String,
102 }
104
105impl ResponseData {
106 fn new(
109 duration: f64, cpu_time: f64, resp: &Response, panic: Option<Uuid>, span: &Span,
110 ) -> Self {
111 let oid = resp
113 .data::<OperationId>()
114 .map_or("Unknown".to_string(), std::string::ToString::to_string);
115
116 let status = resp.status().as_u16();
117
118 let endpoint = resp.data::<PathPattern>();
120 let endpoint = endpoint.map_or("Unknown".to_string(), |endpoint| {
121 endpoint.0.trim_end_matches('/').to_string()
123 });
124
125 span.record("duration_ms", duration);
128 span.record("cpu_time_ms", cpu_time);
129 span.record("oid", &oid);
130 span.record("status", status);
131 span.record("endpoint", &endpoint);
132
133 if let Some(panic) = panic {
135 span.record("panic", panic.to_string());
136 }
137
138 add_interesting_headers_to_span(span, "resp", resp.headers());
139
140 Self {
141 duration,
142 cpu_time,
143 status_code: status,
144 endpoint,
145 }
147 }
148}
149
150fn add_interesting_headers_to_span(span: &Span, prefix: &str, headers: &HeaderMap) {
153 let size_field = prefix.to_string() + "_size";
154 let content_type_field = prefix.to_string() + "_content_type";
155 let encoding_field = prefix.to_string() + "_encoding";
156
157 if let Some(size) = headers.get(header::CONTENT_LENGTH) {
159 if let Ok(size) = size.to_str() {
160 span.record(size_field.as_str(), size);
161 }
162 }
163
164 if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
166 if let Ok(content_type) = content_type.to_str() {
167 span.record(content_type_field.as_str(), content_type);
168 }
169 }
170
171 if let Some(encoding) = headers.get(header::CONTENT_ENCODING) {
173 if let Ok(encoding) = encoding.to_str() {
174 span.record(encoding_field.as_str(), encoding);
175 }
176 }
177}
178
179async fn mk_request_span(req: &Request) -> (Span, String, String, String) {
181 let client_id = anonymous_client_id(req).await;
182 let conn_id = Ulid::new();
183
184 let uri_path = req.uri().path().to_string();
185 let uri_query = req.uri().query().unwrap_or("").to_string();
186
187 let method = req.method().to_string();
188
189 let span = tracing::span!(
190 target: "Endpoint",
191 Level::INFO,
192 "request",
193 client = %client_id,
194 conn = %conn_id,
195 version = ?req.version(),
196 method = %method,
197 path = %uri_path,
198 query_size = field::Empty,
199 req_size = field::Empty,
200 req_content_type = field::Empty,
201 req_encoding = field::Empty,
202 resp_size = field::Empty,
203 resp_content_type = field::Empty,
204 resp_encoding = field::Empty,
205 endpoint = field::Empty,
206 duration_ms = field::Empty,
207 cpu_time_ms = field::Empty,
208 oid = field::Empty,
209 status = field::Empty,
210 panic = field::Empty,
211 );
212
213 if !uri_query.is_empty() {
215 span.record("query_size", uri_query.len());
216 }
217
218 add_interesting_headers_to_span(&span, "req", req.headers());
219
220 if let Some(endpoint) = req.data::<PathPattern>() {
222 let endpoint = endpoint.0.trim_end_matches('/').to_string();
223 span.record("endpoint", endpoint);
224 }
225
226 if let Some(oid) = req.data::<OperationId>() {
228 span.record("oid", oid.0.to_string());
229 }
230
231 (span, uri_path, method, client_id)
232}
233
234impl<E: Endpoint> Endpoint for TracingEndpoint<E> {
235 type Output = Response;
236
237 async fn call(&self, req: Request) -> Result<Self::Output> {
238 let (span, uri_path, method, client_id) = mk_request_span(&req).await;
240
241 let inner_span = span.clone();
242
243 let (response, resp_data) = async move {
244 let now = Instant::now();
245 let now_proc = ProcessTime::now();
246
247 let resp = self.inner.call(req).await;
248
249 #[allow(clippy::cast_precision_loss)] let duration_proc = now_proc.elapsed().as_micros() as f64 / 1000.0;
251
252 #[allow(clippy::cast_precision_loss)] let duration = now.elapsed().as_micros() as f64 / 1000.0;
254
255 match resp {
256 Ok(resp) => {
257 let resp = resp.into_response();
265
266 let response_data =
267 ResponseData::new(duration, duration_proc, &resp, None, &inner_span);
268
269 (Ok(resp), response_data)
270 },
271 Err(err) => {
272 let panic: Option<Uuid> = None;
279
280 let error_message = err.to_string();
282 let resp = err.into_response();
283 let status = resp.status();
284
285 if status == StatusCode::NOT_FOUND {
287 if Settings::log_not_found() {
288 warn!(
289 %status);
290 }
291 } else {
293 error!(%error_message, %status, "HTTP Response Error");
294 }
295
296 let response_data =
297 ResponseData::new(duration, duration_proc, &resp, panic, &inner_span);
298
299 let mut error = Error::from_response(resp);
301 if !error_message.is_empty() {
302 error.set_error_message(&error_message);
303 }
304
305 (Err(error), response_data)
306 },
307 }
308 }
309 .instrument(span.clone())
310 .await;
311
312 span.in_scope(|| {
313 if resp_data.status_code == StatusCode::NOT_FOUND {
315 NOT_FOUND_COUNT.inc();
316 } else {
317 let path = if resp_data.endpoint.is_empty() {
320 uri_path
321 } else {
322 resp_data.endpoint
323 };
324
325 HTTP_REQ_DURATION_MS
326 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
327 .observe(resp_data.duration);
328 HTTP_REQ_CPU_TIME_MS
329 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
330 .observe(resp_data.cpu_time);
331 HTTP_REQUEST_COUNT
335 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
336 .inc();
337 CLIENT_REQUEST_COUNT
338 .with_label_values(&[&client_id, &resp_data.status_code.to_string()])
339 .inc();
340 }
344 });
345
346 response
347 }
348}