diff --git a/src/lib.rs b/src/lib.rs index ad18041..b0e5680 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ use actix_web::{ web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; use details::{ApiDetails, HumanDate}; +use future::WithTimeout; use futures_core::Stream; use metrics_exporter_prometheus::PrometheusBuilder; use middleware::Metrics; @@ -432,7 +433,11 @@ async fn claim_upload( ) -> Result { let upload_id = Serde::into_inner(query.into_inner().upload_id); - match actix_rt::time::timeout(Duration::from_secs(10), repo.wait(upload_id)).await { + match repo + .wait(upload_id) + .with_timeout(Duration::from_secs(10)) + .await + { Ok(wait_res) => { let upload_result = wait_res?; repo.claim(upload_id).await?; diff --git a/src/middleware.rs b/src/middleware.rs index bbc2bdf..95c8846 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -12,6 +12,8 @@ use std::{ task::{Context, Poll}, }; +use crate::future::WithTimeout; + pub(crate) use self::metrics::Metrics; pub(crate) struct Deadline; @@ -149,8 +151,12 @@ impl actix_web::error::ResponseError for DeadlineExceeded { HttpResponse::build(self.status_code()) .content_type("application/json") .body( - serde_json::to_string(&serde_json::json!({ "msg": self.to_string() })) - .unwrap_or_else(|_| r#"{"msg":"request timeout"}"#.to_string()), + serde_json::to_string( + &serde_json::json!({ "msg": self.to_string(), "code": "request-timeout" }), + ) + .unwrap_or_else(|_| { + r#"{"msg":"request timeout","code":"request-timeout"}"#.to_string() + }), ) } } @@ -163,7 +169,7 @@ where DeadlineFuture { inner: match timeout { Some(duration) => DeadlineFutureInner::Timed { - timeout: actix_rt::time::timeout(duration, future), + timeout: future.with_timeout(duration), }, None => DeadlineFutureInner::Untimed { future }, }, diff --git a/src/process.rs b/src/process.rs index 1cb58d5..61cd538 100644 --- a/src/process.rs +++ b/src/process.rs @@ -14,7 +14,7 @@ use tokio::{ }; use tracing::{Instrument, Span}; -use crate::error_code::ErrorCode; +use crate::{error_code::ErrorCode, future::WithTimeout}; struct MetricsGuard { start: Instant, @@ -159,7 +159,7 @@ impl Process { timeout, } = self; - let res = actix_rt::time::timeout(timeout, child.wait()).await; + let res = child.wait().with_timeout(timeout).await; match res { Ok(Ok(status)) if status.success() => { @@ -220,7 +220,7 @@ impl Process { child.wait().await }; - let error = match actix_rt::time::timeout(timeout, child_fut).await { + let error = match child_fut.with_timeout(timeout).await { Ok(Ok(status)) if status.success() => { guard.disarm(); return; diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 51b18a5..3117ecd 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -266,11 +266,12 @@ impl Inner { impl UploadInterest { async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> { - actix_rt::time::timeout( - timeout, - self.interest.as_ref().expect("interest exists").notified(), - ) - .await + self.interest + .as_ref() + .expect("interest exists") + .notified() + .with_timeout(timeout) + .await } } @@ -1214,7 +1215,9 @@ impl QueueRepo for PostgresRepo { } drop(conn); - if actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()) + if notifier + .notified() + .with_timeout(Duration::from_secs(5)) .await .is_ok() {