background-jobs: make it work

This commit is contained in:
Aode (lion) 2021-11-18 16:10:12 -06:00
parent abb8e2b7ac
commit 86a8b6f181
5 changed files with 303 additions and 0 deletions

View file

@ -10,6 +10,7 @@ edition = "2021"
[workspace]
members = [
"apub-awc",
"apub-background-jobs",
"apub-breaker-session",
"apub-core",
"apub-deref-client",
@ -17,6 +18,7 @@ members = [
"apub-reqwest",
"apub-rustcrypto",
"examples/awc-example",
"examples/background-jobs-example",
"examples/example-types",
"examples/reqwest-example"
]

View file

@ -0,0 +1,14 @@
[package]
name = "apub-background-jobs"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
apub-core = { version = "0.1.0", path = "../apub-core/" }
background-jobs = "0.9.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
url = { version = "2", features = ["serde"] }

View file

@ -0,0 +1,173 @@
use apub_core::{deliver::Client, digest::DigestFactory, signature::SignFactory};
use background_jobs::{ActixJob, QueueHandle};
use std::{
future::{Future, Ready},
marker::PhantomData,
pin::Pin,
};
use url::Url;
pub trait ClientFactoryBuilder {
type Crypto: SignFactory + DigestFactory;
type ClientFactory: for<'a> ClientFactory<'a, Crypto = Self::Crypto>;
fn build(&self) -> Self::ClientFactory;
}
pub trait ClientFactory<'a> {
type Crypto: SignFactory + DigestFactory;
type Client: Client<'a>;
fn client(&'a self, crypto: &'a Self::Crypto) -> Self::Client;
}
pub type DeliverJob<State> = DeliverJobType<State, <State as ClientFactoryBuilder>::Crypto>;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct DeliverJobType<State, Crypto> {
inbox: Url,
activity: serde_json::Value,
crypto: Crypto,
#[serde(skip)]
_state: PhantomData<fn(State)>,
}
pub struct JobClient<State: ClientFactoryBuilder> {
handle: QueueHandle,
crypto: <State as ClientFactoryBuilder>::Crypto,
_state: PhantomData<fn(State)>,
}
#[derive(Debug)]
pub struct EnqueueError(pub anyhow::Error);
pub fn client<State>(
crypto: <State as ClientFactoryBuilder>::Crypto,
handle: QueueHandle,
) -> JobClient<State>
where
State: ClientFactoryBuilder + Clone + 'static,
State: ClientFactoryBuilder + Clone,
<State as ClientFactoryBuilder>::Crypto: for<'de> serde::de::Deserialize<'de>
+ serde::ser::Serialize
+ DigestFactory
+ SignFactory
+ Clone
+ 'static,
{
JobClient {
handle,
crypto,
_state: PhantomData,
}
}
impl<State> JobClient<State>
where
State: ClientFactoryBuilder + Clone + 'static,
State: ClientFactoryBuilder + Clone,
<State as ClientFactoryBuilder>::Crypto: for<'de> serde::de::Deserialize<'de>
+ serde::ser::Serialize
+ DigestFactory
+ SignFactory
+ Clone
+ 'static,
{
pub fn enqueue<T>(self, inbox: Url, activity: &T) -> anyhow::Result<()>
where
T: serde::ser::Serialize,
{
queue_deliver::<T, State>(inbox, activity, self.crypto, &self.handle)
}
}
impl std::fmt::Display for EnqueueError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl std::error::Error for EnqueueError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.0.as_ref())
}
}
impl<'a, State> Client<'a> for JobClient<State>
where
State: ClientFactoryBuilder + Clone + 'static,
<State as ClientFactoryBuilder>::Crypto: for<'de> serde::de::Deserialize<'de>
+ serde::ser::Serialize
+ DigestFactory
+ SignFactory
+ Clone
+ 'static,
{
type Error = EnqueueError;
type Future = Ready<Result<(), Self::Error>>;
fn deliver<'b, T: serde::ser::Serialize>(
&'b self,
inbox: &'a Url,
activity: &'a T,
) -> Self::Future {
std::future::ready(
queue_deliver::<T, State>(inbox.clone(), activity, self.crypto.clone(), &self.handle)
.map_err(EnqueueError),
)
}
}
pub fn queue_deliver<T, State>(
inbox: Url,
activity: &T,
crypto: <State as ClientFactoryBuilder>::Crypto,
handle: &QueueHandle,
) -> anyhow::Result<()>
where
T: serde::ser::Serialize,
State: ClientFactoryBuilder + Clone + 'static,
<State as ClientFactoryBuilder>::Crypto: for<'de> serde::de::Deserialize<'de>
+ serde::ser::Serialize
+ DigestFactory
+ SignFactory
+ 'static,
{
let job: DeliverJob<State> = DeliverJob {
inbox,
activity: serde_json::to_value(activity)?,
crypto,
_state: PhantomData,
};
handle.queue(job)
}
impl<State> ActixJob for DeliverJob<State>
where
State: ClientFactoryBuilder + Clone + 'static,
<State as ClientFactoryBuilder>::Crypto: for<'de> serde::de::Deserialize<'de>
+ serde::ser::Serialize
+ SignFactory
+ DigestFactory
+ 'static,
{
const NAME: &'static str = "apub-background-jobs::DeliverJob";
type State = State;
type Future = Pin<Box<dyn Future<Output = anyhow::Result<()>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let factory = state.build();
let client = factory.client(&self.crypto);
client
.deliver(&self.inbox, &self.activity)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
Ok(())
})
}
}

View file

@ -0,0 +1,22 @@
[package]
name = "background-jobs-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.4.0"
apub-awc = { version = "0.1.0", path = "../../apub-awc/" }
apub-background-jobs = { version = "0.1.0", path = "../../apub-background-jobs/" }
apub-breaker-session = { version = "0.1.0", path = "../../apub-breaker-session/" }
apub-core = { version = "0.1.0", path = "../../apub-core/" }
apub-rustcrypto = { version = "0.1.0", path = "../../apub-rustcrypto/" }
anyhow = "1"
awc = { version = "3.0.0-beta.10", default-features = false, features = ["rustls"] }
background-jobs = "0.9.1"
env_logger = "0.9"
example-types = { version = "0.1.0", path = "../example-types/" }
log = "0.4"
rand = "0.8.4"
rsa = "0.5.0"

View file

@ -0,0 +1,92 @@
use actix_rt::Arbiter;
use apub_awc::{AwcClient, SignatureConfig};
use apub_background_jobs::{client, ClientFactoryBuilder, DeliverJob};
use apub_breaker_session::BreakerSession;
use apub_core::session::RequestCountSession;
use apub_rustcrypto::Rustcrypto;
use background_jobs::{create_server_in_arbiter, memory_storage::Storage, WorkerConfig};
use example_types::{object_id, Note, NoteType};
use rsa::RsaPrivateKey;
use std::time::Duration;
#[derive(Clone)]
struct State {
config: SignatureConfig,
breakers: BreakerSession,
client: awc::Client,
}
struct ClientFactory {
config: SignatureConfig,
session: (RequestCountSession, BreakerSession),
client: awc::Client,
}
impl ClientFactoryBuilder for State {
type Crypto = Rustcrypto;
type ClientFactory = ClientFactory;
fn build(&self) -> Self::ClientFactory {
ClientFactory {
config: self.config.clone(),
session: (RequestCountSession::max(30), self.breakers.clone()),
client: self.client.clone(),
}
}
}
impl<'a> apub_background_jobs::ClientFactory<'a> for ClientFactory {
type Crypto = Rustcrypto;
type Client = AwcClient<'a, (RequestCountSession, BreakerSession), Self::Crypto>;
fn client(&'a self, crypto: &'a Self::Crypto) -> Self::Client {
AwcClient::new(
&self.client,
&self.session,
self.config.clone(),
crypto.clone(),
)
}
}
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var("RUST_LOG").is_ok() {
env_logger::builder().init()
} else {
env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.init();
};
let config = SignatureConfig::default();
let breakers = BreakerSession::limit(10, Duration::from_secs(60 * 60));
let arbiter = Arbiter::new();
let storage = Storage::new();
let queue_handle = create_server_in_arbiter(&arbiter, storage);
WorkerConfig::new(move || State {
config: config.clone(),
breakers: breakers.clone(),
client: awc::Client::new(),
})
.register::<DeliverJob<State>>()
.start_in_arbiter(&arbiter, queue_handle.clone());
let private_key = RsaPrivateKey::new(&mut rand::thread_rng(), 1024)?;
let crypto = Rustcrypto::new("key-id".to_string(), private_key);
let inbox = "https://masto.asonix.dog/inbox".parse()?;
let note = Note {
id: object_id("https://masto.asonix.dog/blah/blah".parse()?),
kind: NoteType::Note,
content: String::from("hi"),
};
note.deliver(&inbox, &client::<State>(crypto, queue_handle.clone()))
.await?;
actix_rt::signal::ctrl_c().await?;
Ok(())
}