hyaenidae/src/jobs.rs
2021-04-02 12:07:19 -05:00

476 lines
14 KiB
Rust

use crate::{Error, OptionExt, State};
use activitystreams::base::AnyBase;
use actix_rt::Arbiter;
use awc::Client;
use background_jobs::{
create_server, memory_storage::Storage, ActixJob, Backoff, MaxRetries, QueueHandle,
WorkerConfig,
};
use futures::future::LocalBoxFuture;
use http_signature_normalization_actix::prelude::*;
use hyaenidae_profiles::{MissingImage, OnBehalfOf, Spawner};
use rsa::RSAPrivateKey;
use url::Url;
use uuid::Uuid;
#[derive(Clone)]
struct JobState {
state: State,
client: Client,
}
pub(super) fn build(
base_url: Url,
pict_rs_upstream: Url,
content_config: hyaenidae_profiles::ContentConfig,
db: sled::Db,
) -> Result<State, anyhow::Error> {
let storage = Storage::new();
let queue_handle = create_server(storage);
let state = State::new(
Spawn(queue_handle.clone()),
base_url,
pict_rs_upstream,
content_config,
Arbiter::current(),
&db,
)?;
let state2 = state.clone();
WorkerConfig::new(move || JobState {
state: state2.clone(),
client: crate::build_client(),
})
.register::<Ingest>()
.register::<DownloadApub>()
.register::<DownloadApubAnonymous>()
.register::<DownloadImages>()
.register::<DeliverMany>()
.register::<Deliver>()
.register::<RemoveFile>()
.set_worker_count("ActivityPub", 16)
.start(queue_handle.clone());
Ok(state)
}
#[derive(Clone)]
pub(super) struct Spawn(QueueHandle);
impl Spawn {
pub(crate) fn ingest(&self, any_base: AnyBase, key_id: Url) {
if let Err(e) = self.0.queue(Ingest {
any_base,
key_id: Some(key_id),
stack: vec![],
}) {
log::error!("Failed to queue ingest: {}", e);
}
}
pub(crate) fn download_apub_anonymous(&self, url: Url) {
if let Err(e) = self.0.queue(DownloadApubAnonymous { url }) {
log::error!("Failed to queue download job: {}", e);
}
}
}
impl Spawner for Spawn {
fn download_apub(&self, on_behalf_of: OnBehalfOf, url: Url, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(DownloadApub {
on_behalf_of,
url,
stack,
}) {
log::error!("Failed to queue download job: {}", e);
}
}
fn download_images(&self, urls: Vec<MissingImage>, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(DownloadImages { urls, stack }) {
log::error!("Failed to queue image download job: {}", e);
}
}
fn purge_file(&self, file_id: Uuid) {
if let Err(e) = self.0.queue(RemoveFile { file_id }) {
log::error!("Failed to queue file delete job: {}", e);
}
}
fn process(&self, any_base: AnyBase, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(Ingest {
any_base,
key_id: None,
stack,
}) {
log::error!("Failed to queue process job: {}", e);
}
}
fn deliver(&self, on_behalf_of: OnBehalfOf, any_base: AnyBase, inboxes: Vec<Url>) {
if let Err(e) = self.0.queue(DeliverMany {
on_behalf_of,
any_base,
inboxes,
}) {
log::error!("Failed to queue deliver job: {}", e);
}
}
}
const MAX_INGEST_DEPTH: usize = 30;
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct RemoveFile {
file_id: Uuid,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct Deliver {
any_base: AnyBase,
url: Url,
key_id: Url,
private_key: String,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DeliverMany {
on_behalf_of: OnBehalfOf,
any_base: AnyBase,
inboxes: Vec<Url>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DownloadImages {
urls: Vec<MissingImage>,
stack: Vec<AnyBase>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DownloadApubAnonymous {
url: Url,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DownloadApub {
on_behalf_of: OnBehalfOf,
url: Url,
stack: Vec<AnyBase>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct Ingest {
any_base: AnyBase,
key_id: Option<Url>,
stack: Vec<AnyBase>,
}
impl ActixJob for RemoveFile {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "RemoveFile";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
state
.state
.profiles
.delete_file(self.file_id, &state.client)
.await?;
Ok(())
})
}
}
impl ActixJob for Deliver {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "Deliver";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let signer = Signer::new(&self.private_key)?;
let item_string = serde_json::to_string(&self.any_base)?;
let res = state
.client
.post(self.url.as_str())
.insert_header(("Accept", "application/activity+json"))
.content_type("application/activity+json")
.signature_with_digest(
signature_config(),
self.key_id.to_string(),
signer.hasher(),
item_string,
move |signing_string| signer.sign(signing_string),
)
.await
.map_err(|e| anyhow::anyhow!("Signature: {}", e))?
.send()
.await
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
if !res.status().is_success() {
return Err(anyhow::anyhow!("Status: {}", res.status()));
}
Ok(())
})
}
}
impl ActixJob for DeliverMany {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DeliverMany";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let apub_store = state.state.profiles.apub.clone();
let server_store = state.state.profiles.store.servers.clone();
let behalf = self.on_behalf_of;
let (private_key, key_id): (String, Url) = actix_web::web::block(move || {
let key_id = match behalf {
OnBehalfOf::Server => {
let server_id = server_store.get_self()?.req()?;
apub_store.key_for_server(server_id)?.req()?
}
OnBehalfOf::Profile(profile_id) => {
apub_store.key_for_profile(profile_id)?.req()?
}
};
let private_key = apub_store.private_key_for_id(&key_id)?.req()?;
Ok((private_key, key_id)) as Result<_, Error>
})
.await??;
for url in self.inboxes {
let res = state.state.spawn.0.queue(Deliver {
url,
any_base: self.any_base.clone(),
private_key: private_key.clone(),
key_id: key_id.clone(),
});
if let Err(e) = res {
log::error!("Error spawning deliver job: {}", e);
}
}
Ok(())
})
}
}
impl ActixJob for DownloadImages {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DownloadImages";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(mut self, state: Self::State) -> Self::Future {
Box::pin(async move {
let mut failed_urls = vec![];
for url in self.urls {
if let Err(e) = state
.state
.profiles
.download_image(&url, &state.client)
.await
{
log::warn!("Failed to download image: {}", e);
failed_urls.push(url);
}
}
if !failed_urls.is_empty() {
return Err(anyhow::anyhow!(
"Failed to download all images, {} remain",
failed_urls.len()
));
}
if let Some(any_base) = self.stack.pop() {
state.state.spawn.process(any_base, self.stack);
}
Ok(())
})
}
}
impl ActixJob for DownloadApubAnonymous {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DownloadApubAnonymous";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let mut res = state
.client
.get(self.url.as_str())
.insert_header(("Accept", "application/activity+json"))
.send()
.await
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
let any_base: AnyBase = res
.json()
.await
.map_err(|e| anyhow::anyhow!("Json: {}", e))?;
state.state.spawn.process(any_base, vec![]);
Ok(())
})
}
}
impl ActixJob for DownloadApub {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DownloadApub";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let apub_store = state.state.profiles.apub.clone();
let server_store = state.state.profiles.store.servers.clone();
let behalf = self.on_behalf_of;
let (private_key, key_id): (String, Url) = actix_web::web::block(move || {
let key_id = match behalf {
OnBehalfOf::Server => {
let server_id = server_store.get_self()?.req()?;
apub_store.key_for_server(server_id)?.req()?
}
OnBehalfOf::Profile(profile_id) => {
apub_store.key_for_profile(profile_id)?.req()?
}
};
let private_key = apub_store.private_key_for_id(&key_id)?.req()?;
Ok((private_key, key_id)) as Result<_, Error>
})
.await??;
let signer = Signer::new(&private_key)?;
let mut res = state
.client
.get(self.url.as_str())
.insert_header(("Accept", "application/activity+json"))
.signature(
signature_config(),
key_id.to_string(),
move |signing_string| signer.sign(signing_string),
)
.await
.map_err(|e| anyhow::anyhow!("Signature: {}", e))?
.send()
.await
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
let any_base: AnyBase = res
.json()
.await
.map_err(|e| anyhow::anyhow!("Json: {}", e))?;
state.state.spawn.process(any_base, self.stack);
Ok(())
})
}
}
impl ActixJob for Ingest {
type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "Ingest";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
if self.stack.len() > MAX_INGEST_DEPTH {
return Err(anyhow::anyhow!("Max recursion depth exceded"));
}
state
.state
.profiles
.ingest(self.any_base, self.key_id, self.stack)
.await?;
Ok(())
})
}
}
fn signature_config() -> http_signature_normalization_actix::Config {
http_signature_normalization_actix::Config::new().set_host_header()
}
struct Signer(RSAPrivateKey);
impl Signer {
fn new(private_key: &str) -> Result<Self, anyhow::Error> {
use rsa_pem::KeyExt;
Ok(Signer(RSAPrivateKey::from_pem_pkcs8(private_key)?))
}
fn hasher(&self) -> sha2::Sha256 {
sha2::Digest::new()
}
fn sign(&self, signing_string: &str) -> Result<String, anyhow::Error> {
use sha2::{Digest, Sha256};
let hashed = Sha256::digest(signing_string.as_bytes());
let bytes = self.0.sign(
rsa::PaddingScheme::PKCS1v15Sign {
hash: Some(rsa::Hash::SHA2_256),
},
&hashed,
)?;
Ok(base64::encode(bytes))
}
}