diff --git a/apub-background-jobs/Cargo.toml b/apub-background-jobs/Cargo.toml index 6042f16..7972816 100644 --- a/apub-background-jobs/Cargo.toml +++ b/apub-background-jobs/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow = "1" apub-core = { version = "0.1.0", path = "../apub-core/" } -background-jobs = "0.9.1" +background-jobs = "0.11.0" serde = { version = "1", features = ["derive"] } serde_json = "1" url = { version = "2", features = ["serde"] } @@ -17,6 +17,5 @@ url = { version = "2", features = ["serde"] } actix-rt = "2.5.0" apub-reqwest = { version = "0.1.0", path = "../apub-reqwest/" } apub-openssl = { version = "0.1.0", path = "../apub-openssl/" } -background-jobs = "0.9.1" openssl = "0.10" reqwest = { version = "0.11", default-features = false } diff --git a/apub-background-jobs/src/lib.rs b/apub-background-jobs/src/lib.rs index 44dd339..157b744 100644 --- a/apub-background-jobs/src/lib.rs +++ b/apub-background-jobs/src/lib.rs @@ -55,11 +55,7 @@ use apub_core::{deliver::Client, digest::DigestFactory, signature::SignFactory}; use background_jobs::{ActixJob, QueueHandle}; -use std::{ - future::{Future, Ready}, - marker::PhantomData, - pin::Pin, -}; +use std::{future::Future, marker::PhantomData, pin::Pin}; use url::Url; /// A trait describing acquiring an HTTP Client type from Job State and crypto @@ -126,14 +122,16 @@ where + DigestFactory + SignFactory + Clone + + std::panic::UnwindSafe + 'static, { /// Enqueue the activity to be delivered, consuming the client - pub fn enqueue(self, inbox: Url, activity: &T) -> Result<(), EnqueueError> + pub async fn enqueue(self, inbox: Url, activity: &T) -> Result<(), EnqueueError> where T: serde::ser::Serialize, { queue_deliver::(inbox, activity, self.crypto, &self.handle) + .await .map_err(EnqueueError) } } @@ -158,29 +156,31 @@ where + DigestFactory + SignFactory + Clone + + std::panic::UnwindSafe + 'static, { type Error = EnqueueError; - type Future = Ready>; + type Future = Pin> + 'a>>; fn deliver( &'a self, inbox: &'a Url, activity: &'a T, ) -> Self::Future { - std::future::ready( + Box::pin(async move { queue_deliver::( inbox.clone(), activity, self.crypto.clone(), &self.handle, ) - .map_err(EnqueueError), - ) + .await + .map_err(EnqueueError) + }) } } -fn queue_deliver( +async fn queue_deliver( inbox: Url, activity: &T, crypto: Crypto, @@ -193,6 +193,7 @@ where + serde::ser::Serialize + DigestFactory + SignFactory + + std::panic::UnwindSafe + 'static, { let job: DeliverJob = DeliverJob { @@ -202,7 +203,7 @@ where _state: PhantomData, }; - handle.queue(job) + handle.queue(job).await } impl ActixJob for DeliverJob