Improve Timings middleware

This commit is contained in:
asonix 2022-11-23 10:44:01 -06:00
parent e987149757
commit ed0ea6521e
3 changed files with 82 additions and 25 deletions

1
Cargo.lock generated
View file

@ -308,6 +308,7 @@ dependencies = [
"mime", "mime",
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",
"pin-project-lite",
"quanta", "quanta",
"rand", "rand",
"rsa", "rsa",

View file

@ -46,6 +46,7 @@ metrics-util = "0.14.0"
mime = "0.3.16" mime = "0.3.16"
opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry = { version = "0.18", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11" opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.9"
quanta = "0.10.1" quanta = "0.10.1"
rand = "0.8" rand = "0.8"
rsa = "0.7" rsa = "0.7"

View file

@ -1,10 +1,10 @@
use actix_web::{ use actix_web::{
body::MessageBody,
dev::{Service, ServiceRequest, ServiceResponse, Transform}, dev::{Service, ServiceRequest, ServiceResponse, Transform},
http::StatusCode, http::StatusCode,
}; };
use futures_util::future::LocalBoxFuture;
use std::{ use std::{
future::{ready, Ready}, future::{ready, Future, Ready},
time::Instant, time::Instant,
}; };
@ -15,12 +15,30 @@ struct LogOnDrop {
begin: Instant, begin: Instant,
path: String, path: String,
method: String, method: String,
disarm: bool, arm: bool,
}
pin_project_lite::pin_project! {
pub(crate) struct TimingsFuture<F> {
#[pin]
future: F,
log_on_drop: Option<LogOnDrop>,
}
}
pin_project_lite::pin_project! {
pub(crate) struct TimingsBody<B> {
#[pin]
body: B,
log_on_drop: LogOnDrop,
}
} }
impl Drop for LogOnDrop { impl Drop for LogOnDrop {
fn drop(&mut self) { fn drop(&mut self) {
if !self.disarm { if self.arm {
let duration = self.begin.elapsed(); let duration = self.begin.elapsed();
metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone()); metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone());
} }
@ -32,7 +50,7 @@ where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>, S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static, S::Future: 'static,
{ {
type Response = S::Response; type Response = ServiceResponse<TimingsBody<B>>;
type Error = S::Error; type Error = S::Error;
type InitError = (); type InitError = ();
type Transform = TimingsMiddleware<S>; type Transform = TimingsMiddleware<S>;
@ -48,9 +66,9 @@ where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>, S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static, S::Future: 'static,
{ {
type Response = S::Response; type Response = ServiceResponse<TimingsBody<B>>;
type Error = S::Error; type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>; type Future = TimingsFuture<S::Future>;
fn poll_ready( fn poll_ready(
&self, &self,
@ -60,29 +78,66 @@ where
} }
fn call(&self, req: ServiceRequest) -> Self::Future { fn call(&self, req: ServiceRequest) -> Self::Future {
let mut logger = LogOnDrop { let log_on_drop = LogOnDrop {
begin: Instant::now(), begin: Instant::now(),
path: req.path().to_string(), path: req.path().to_string(),
method: req.method().to_string(), method: req.method().to_string(),
disarm: false, arm: false,
}; };
let fut = self.0.call(req);
Box::pin(async move { let future = self.0.call(req);
let res = fut.await;
let status = match &res { TimingsFuture {
Ok(res) => res.status(), future,
Err(e) => e.as_response_error().status_code(), log_on_drop: Some(log_on_drop),
}; }
if status == StatusCode::NOT_FOUND || status == StatusCode::METHOD_NOT_ALLOWED { }
logger.disarm = true; }
}
impl<F, B> Future for TimingsFuture<F>
// TODO: Drop after body write where
drop(logger); F: Future<Output = Result<ServiceResponse<B>, actix_web::Error>>,
{
res type Output = Result<ServiceResponse<TimingsBody<B>>, actix_web::Error>;
})
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
let res = std::task::ready!(this.future.poll(cx));
let mut log_on_drop = this
.log_on_drop
.take()
.expect("TimingsFuture polled after completion");
let status = match &res {
Ok(res) => res.status(),
Err(e) => e.as_response_error().status_code(),
};
log_on_drop.arm =
status != StatusCode::NOT_FOUND && status != StatusCode::METHOD_NOT_ALLOWED;
let res = res.map(|r| r.map_body(|_, body| TimingsBody { body, log_on_drop }));
std::task::Poll::Ready(res)
}
}
impl<B: MessageBody> MessageBody for TimingsBody<B> {
type Error = B::Error;
fn size(&self) -> actix_web::body::BodySize {
self.body.size()
}
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
self.project().body.poll_next(cx)
} }
} }