diff --git a/apub-background-jobs/Cargo.toml b/apub-background-jobs/Cargo.toml index b761e5d..6042f16 100644 --- a/apub-background-jobs/Cargo.toml +++ b/apub-background-jobs/Cargo.toml @@ -12,3 +12,11 @@ background-jobs = "0.9.1" serde = { version = "1", features = ["derive"] } serde_json = "1" url = { version = "2", features = ["serde"] } + +[dev-dependencies] +actix-rt = "2.5.0" +apub-reqwest = { version = "0.1.0", path = "../apub-reqwest/" } +apub-openssl = { version = "0.1.0", path = "../apub-openssl/" } +background-jobs = "0.9.1" +openssl = "0.10" +reqwest = { version = "0.11", default-features = false } diff --git a/apub-background-jobs/src/lib.rs b/apub-background-jobs/src/lib.rs index 60e8eb3..7539181 100644 --- a/apub-background-jobs/src/lib.rs +++ b/apub-background-jobs/src/lib.rs @@ -1,3 +1,7 @@ +//! An implementation of Client based on background_jobs + +#![deny(missing_docs)] + use apub_core::{deliver::Client, digest::DigestFactory, signature::SignFactory}; use background_jobs::{ActixJob, QueueHandle}; use std::{ @@ -7,13 +11,71 @@ use std::{ }; use url::Url; +/// A trait describing acquiring an HTTP Client type from Job State and crypto +/// +/// ```rust +/// use actix_rt::Arbiter; +/// use apub_background_jobs::{client, ClientFactory, DeliverJob}; +/// use apub_core::deliver::Client; +/// use apub_openssl::OpenSsl; +/// use apub_reqwest::{ReqwestClient, SignatureConfig}; +/// use background_jobs::{create_server_in_arbiter, memory_storage::Storage, WorkerConfig}; +/// use openssl::{pkey::PKey, rsa::Rsa}; +/// use url::Url; +/// +/// #[derive(Clone)] +/// struct State { +/// config: SignatureConfig, +/// client: reqwest::Client, +/// } +/// +/// impl<'a> ClientFactory<'a> for State { +/// type Crypto = OpenSsl; +/// type Client = ReqwestClient<'a, (), &'a OpenSsl>; +/// +/// fn client(&'a self, crypto: &'a Self::Crypto) -> Self::Client { +/// ReqwestClient::new(&self.client, (), &self.config, crypto) +/// } +/// } +/// +/// #[actix_rt::main] +/// async fn main() -> Result<(), Box> { +/// let config = SignatureConfig::default(); +/// +/// let arbiter = Arbiter::new(); +/// let storage = Storage::new(); +/// let queue_handle = create_server_in_arbiter(&arbiter, storage); +/// let client = reqwest::Client::new(); +/// +/// WorkerConfig::new(move || State { +/// config: config.clone(), +/// client: client.clone(), +/// }) +/// .register::>() +/// .start_in_arbiter(&arbiter, queue_handle.clone()); +/// +/// let private_key = PKey::from_rsa(Rsa::generate(1024)?)?; +/// let crypto = OpenSsl::new("key-id".to_string(), private_key); +/// +/// let inbox: Url = "https://masto.asonix.dog/inbox".parse()?; +/// // let activity = /* ... */; +/// // client::(crypto, queue_handle.clone()).enqueue(inbox, activity)?; +/// +/// Ok(()) +/// } +/// ``` pub trait ClientFactory<'a> { + /// The cryptography implementation associated with this client type Crypto; + + /// The client type produced by this factory type Client: for<'b> Client<'b> + 'a; + /// Produce the client fn client(&'a self, crypto: &'a Self::Crypto) -> Self::Client; } +/// The actual Job type that gets queued #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct DeliverJob { inbox: Url, @@ -25,15 +87,21 @@ pub struct DeliverJob { _state: PhantomData, } +/// The "Client" implementation, generic over State and Crypto pub struct JobClient { handle: QueueHandle, crypto: Crypto, _state: PhantomData, } +/// Errors produced when queueing jobs #[derive(Debug)] pub struct EnqueueError(pub anyhow::Error); +/// produce a Client given Cryptography and a QueueHandle +/// +/// This client should be used only to deliver on behalf of the actor associated with the provided +/// crypto. In most cases, this means the client is 1-time-use pub fn client(crypto: Crypto, handle: QueueHandle) -> JobClient where State: for<'b> ClientFactory<'b, Crypto = Crypto> + Clone + 'static, @@ -61,11 +129,13 @@ where + Clone + 'static, { - pub fn enqueue(self, inbox: Url, activity: &T) -> anyhow::Result<()> + /// Enqueue the activity to be delivered, consuming the client + pub fn enqueue(self, inbox: Url, activity: &T) -> Result<(), EnqueueError> where T: serde::ser::Serialize, { queue_deliver::(inbox, activity, self.crypto, &self.handle) + .map_err(EnqueueError) } } @@ -111,7 +181,7 @@ where } } -pub fn queue_deliver( +fn queue_deliver( inbox: Url, activity: &T, crypto: Crypto,