From cb198f6922c335d5dc703aaeb438673ec53a34c1 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 26 Jul 2023 17:17:21 -0500 Subject: [PATCH] Add configurable blocking spawner for client --- actix/Cargo.toml | 2 +- actix/src/digest/mod.rs | 14 +++-- actix/src/digest/sign.rs | 38 ++++++++------ actix/src/lib.rs | 109 ++++++++++++++++++++++++++++++++++++--- actix/src/sign.rs | 31 +++++++---- 5 files changed, 157 insertions(+), 37 deletions(-) diff --git a/actix/Cargo.toml b/actix/Cargo.toml index c001082..97b0858 100644 --- a/actix/Cargo.toml +++ b/actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "http-signature-normalization-actix" description = "An HTTP Signatures library that leaves the signing to you" -version = "0.8.0" +version = "0.9.0" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" diff --git a/actix/src/digest/mod.rs b/actix/src/digest/mod.rs index 358ed2e..fdb0432 100644 --- a/actix/src/digest/mod.rs +++ b/actix/src/digest/mod.rs @@ -26,7 +26,7 @@ pub trait DigestName { #[cfg(feature = "client")] mod client { - use crate::{Config, PrepareSignError, Sign}; + use crate::{Config, PrepareSignError, Sign, Spawn}; use actix_http::header::InvalidHeaderValue; use actix_rt::task::JoinError; use awc::{ClientRequest, SendClientRequest}; @@ -47,9 +47,9 @@ mod client { /// a malicious entity pub trait SignExt: Sign { /// Set the Digest and Authorization headers on the request - fn authorization_signature_with_digest( + fn authorization_signature_with_digest( self, - config: Config, + config: Config, key_id: K, digest: D, v: V, @@ -59,19 +59,21 @@ mod client { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, D: DigestCreate + Send + 'static, V: AsRef<[u8]> + Send + 'static, Self: Sized; /// Set the Digest and Signature headers on the request - fn signature_with_digest( + fn signature_with_digest( self, - config: Config, + config: Config, key_id: K, digest: D, v: V, @@ -81,11 +83,13 @@ mod client { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, D: DigestCreate + Send + 'static, V: AsRef<[u8]> + Send + 'static, Self: Sized; diff --git a/actix/src/digest/sign.rs b/actix/src/digest/sign.rs index 67acbea..cca68c9 100644 --- a/actix/src/digest/sign.rs +++ b/actix/src/digest/sign.rs @@ -5,13 +5,13 @@ use std::{fmt::Display, future::Future, pin::Pin}; use crate::{ digest::{DigestClient, DigestCreate, SignExt}, - Config, PrepareSignError, Sign, + Config, PrepareSignError, Sign, Spawn, }; impl SignExt for ClientRequest { - fn authorization_signature_with_digest( + fn authorization_signature_with_digest( self, - config: Config, + config: Config, key_id: K, mut digest: D, v: V, @@ -21,21 +21,25 @@ impl SignExt for ClientRequest { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, D: DigestCreate + Send + 'static, V: AsRef<[u8]> + Send + 'static, Self: Sized, { Box::pin(async move { - let (d, v) = actix_rt::task::spawn_blocking(move || { - let d = digest.compute(v.as_ref()); - Ok((d, v)) as Result<(String, V), E> - }) - .await??; + let (d, v) = config + .spawner + .spawn_blocking(move || { + let d = digest.compute(v.as_ref()); + Ok((d, v)) as Result<(String, V), E> + }) + .await??; let c = self .insert_header(("Digest", format!("{}={}", D::NAME, d))) @@ -46,9 +50,9 @@ impl SignExt for ClientRequest { }) } - fn signature_with_digest( + fn signature_with_digest( self, - config: Config, + config: Config, key_id: K, mut digest: D, v: V, @@ -58,21 +62,25 @@ impl SignExt for ClientRequest { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, D: DigestCreate + Send + 'static, V: AsRef<[u8]> + Send + 'static, Self: Sized, { Box::pin(async move { - let (d, v) = actix_rt::task::spawn_blocking(move || { - let d = digest.compute(v.as_ref()); - Ok((d, v)) as Result<(String, V), E> - }) - .await??; + let (d, v) = config + .spawner + .spawn_blocking(move || { + let d = digest.compute(v.as_ref()); + Ok((d, v)) as Result<(String, V), E> + }) + .await??; let c = self .insert_header(("Digest", format!("{}={}", D::NAME, d))) diff --git a/actix/src/lib.rs b/actix/src/lib.rs index 9f86af1..e6f22dc 100644 --- a/actix/src/lib.rs +++ b/actix/src/lib.rs @@ -257,7 +257,7 @@ pub mod verify { } #[cfg(feature = "client")] -pub use self::client::{PrepareSignError, Sign}; +pub use self::client::{Canceled, PrepareSignError, Sign, Spawn}; #[cfg(feature = "server")] pub use self::server::{PrepareVerifyError, SignatureVerify}; @@ -267,7 +267,7 @@ pub use self::server::{PrepareVerifyError, SignatureVerify}; /// /// By default, the config is set up to create and verify signatures that expire after 10 /// seconds, and use the `(created)` and `(expires)` fields that were introduced in draft 11 -pub struct Config { +pub struct Config { /// The inner config type config: http_signature_normalization::Config, @@ -276,11 +276,18 @@ pub struct Config { /// Whether to set the Date header set_date: bool, + + /// The spawner used to create blocking operations + spawner: Spawner, } +/// A default implementation of Spawner for spawning blocking operations +#[derive(Clone, Copy, Debug, Default)] +pub struct DefaultSpawner; + #[cfg(feature = "client")] mod client { - use super::{Config, RequiredError}; + use super::{Config, DefaultSpawner, RequiredError}; use actix_http::header::{InvalidHeaderValue, ToStrError}; use actix_rt::task::JoinError; use std::{fmt::Display, future::Future, pin::Pin}; @@ -288,9 +295,9 @@ mod client { /// A trait implemented by the awc ClientRequest type to add an HTTP signature to the request pub trait Sign { /// Add an Authorization Signature to the request - fn authorization_signature( + fn authorization_signature( self, - config: Config, + config: Config, key_id: K, f: F, ) -> Pin>>> @@ -298,17 +305,19 @@ mod client { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, Self: Sized; /// Add a Signature to the request - fn signature( + fn signature( self, - config: Config, + config: Config, key_id: K, f: F, ) -> Pin>>> @@ -316,11 +325,13 @@ mod client { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, Self: Sized; } @@ -343,6 +354,65 @@ mod client { /// Invalid Date header InvalidHeader(#[from] actix_http::header::InvalidHeaderValue), } + + /// An error that indicates a blocking operation panicked and cannot return a response + #[derive(Debug)] + pub struct Canceled; + + impl std::fmt::Display for Canceled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Operation was canceled") + } + } + + impl std::error::Error for Canceled {} + + /// A trait dictating how to spawn a future onto a blocking threadpool. By default, + /// http-signature-normalization-actix will use actix_rt's built-in blocking threadpool, but this + /// can be customized + pub trait Spawn { + /// The future type returned by spawn_blocking + type Future: std::future::Future>; + + /// Spawn the blocking function onto the threadpool + fn spawn_blocking(&self, func: Func) -> Self::Future + where + Func: FnOnce() -> Out + Send + 'static, + Out: Send + 'static; + } + + /// The future returned by DefaultSpawner when spawning blocking operations on the actix_rt + /// blocking threadpool + pub struct DefaultSpawnerFuture { + inner: actix_rt::task::JoinHandle, + } + + impl Spawn for DefaultSpawner { + type Future = DefaultSpawnerFuture; + + fn spawn_blocking(&self, func: Func) -> Self::Future + where + Func: FnOnce() -> Out + Send + 'static, + Out: Send + 'static, + { + DefaultSpawnerFuture { + inner: actix_rt::task::spawn_blocking(func), + } + } + } + + impl std::future::Future for DefaultSpawnerFuture { + type Output = Result; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let res = std::task::ready!(std::pin::Pin::new(&mut self.inner).poll(cx)); + + std::task::Poll::Ready(res.map_err(|_| Canceled)) + } + } } #[cfg(feature = "server")] @@ -422,7 +492,9 @@ impl Config { pub fn new() -> Self { Config::default() } +} +impl Config { /// Since manually setting the Host header doesn't work so well in AWC, you can use this method /// to enable setting the Host header for signing requests without breaking client /// functionality @@ -431,6 +503,24 @@ impl Config { config: self.config, set_host: true, set_date: self.set_date, + spawner: self.spawner, + } + } + + #[cfg(client)] + /// Set the spawner for spawning blocking tasks + /// + /// http-signature-normalization-actix offloads signing messages and generating hashes to a + /// blocking threadpool, which can be configured by providing a custom spawner. + pub fn spawner(self, spawner: S) -> Config + where + S: Spawn, + { + Config { + config: self.config, + set_host: self.set_host, + set_date: self.set_date, + spawner, } } @@ -443,6 +533,7 @@ impl Config { config: self.config.mastodon_compat(), set_host: true, set_date: true, + spawner: self.spawner, } } @@ -454,6 +545,7 @@ impl Config { config: self.config.require_digest(), set_host: self.set_host, set_date: self.set_date, + spawner: self.spawner, } } @@ -466,6 +558,7 @@ impl Config { config: self.config.dont_use_created_field(), set_host: self.set_host, set_date: self.set_date, + spawner: self.spawner, } } @@ -475,6 +568,7 @@ impl Config { config: self.config.set_expiration(expires_after), set_host: self.set_host, set_date: self.set_date, + spawner: self.spawner, } } @@ -484,6 +578,7 @@ impl Config { config: self.config.require_header(header), set_host: self.set_host, set_date: self.set_date, + spawner: self.spawner, } } diff --git a/actix/src/sign.rs b/actix/src/sign.rs index 05339b6..f34e94f 100644 --- a/actix/src/sign.rs +++ b/actix/src/sign.rs @@ -1,4 +1,4 @@ -use crate::{create::Signed, Config, PrepareSignError, Sign}; +use crate::{create::Signed, Config, PrepareSignError, Sign, Spawn}; use actix_rt::task::JoinError; use awc::{ http::header::{HttpDate, InvalidHeaderValue, TryIntoHeaderValue}, @@ -7,9 +7,9 @@ use awc::{ use std::{fmt::Display, future::Future, pin::Pin, time::SystemTime}; impl Sign for ClientRequest { - fn authorization_signature( + fn authorization_signature( mut self, - config: Config, + config: Config, key_id: K, f: F, ) -> Pin>>> @@ -17,11 +17,13 @@ impl Sign for ClientRequest { F: FnOnce(&str) -> Result + Send + 'static, E: From + From + + From + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, Self: Sized, { Box::pin(async move { @@ -31,9 +33,9 @@ impl Sign for ClientRequest { }) } - fn signature( + fn signature( mut self, - config: Config, + config: Config, key_id: K, f: F, ) -> Pin>>> @@ -42,10 +44,12 @@ impl Sign for ClientRequest { E: From + From + From + + From + std::fmt::Debug + Send + 'static, K: Display + 'static, + S: Spawn + 'static, Self: Sized, { Box::pin(async move { @@ -56,16 +60,22 @@ impl Sign for ClientRequest { } } -async fn prepare( +async fn prepare( request: &mut ClientRequest, - config: &Config, + config: &Config, key_id: K, f: F, ) -> Result where F: FnOnce(&str) -> Result + Send + 'static, - E: From + From + std::fmt::Debug + Send + 'static, + E: From + + From + + From + + std::fmt::Debug + + Send + + 'static, K: Display, + S: Spawn + 'static, { if config.set_date && !request.headers().contains_key("date") { request.headers_mut().insert( @@ -103,7 +113,10 @@ where let key_id = key_id.to_string(); - let signed = actix_rt::task::spawn_blocking(move || unsigned.sign(key_id, f)).await??; + let signed = config + .spawner + .spawn_blocking(move || unsigned.sign(key_id, f)) + .await??; Ok(signed) }