hyaenidae/server/src/jobs.rs

275 lines
7.7 KiB
Rust

use crate::State;
use activitystreams::base::AnyBase;
use background_jobs::{
create_server, memory_storage::Storage, ActixJob, Backoff, MaxRetries, QueueHandle,
WorkerConfig,
};
use futures::future::LocalBoxFuture;
use hyaenidae_profiles::Spawner;
use url::Url;
use uuid::Uuid;
pub(super) fn build(
base_url: Url,
pict_rs_upstream: Url,
db: sled::Db,
) -> Result<Spawn, anyhow::Error> {
let storage = Storage::new();
let queue_handle = create_server(storage);
let inner_queue_handle = queue_handle.clone();
WorkerConfig::new(move || {
crate::State::new(
Spawn(inner_queue_handle.clone()),
base_url.clone(),
pict_rs_upstream.clone(),
&db.clone(),
)
.unwrap()
})
.register::<Ingest>()
.register::<DownloadApub>()
.register::<DownloadImages>()
.register::<DeliverMany>()
.register::<Deliver>()
.register::<RemoveFile>()
.set_worker_count("ActivityPub", 16)
.start(queue_handle.clone());
Ok(Spawn(queue_handle))
}
#[derive(Clone)]
pub(super) struct Spawn(QueueHandle);
impl Spawner for Spawn {
fn download_apub(&self, url: Url, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(DownloadApub { url, stack }) {
log::error!("Failed to queue download job: {}", e);
}
}
fn download_images(&self, urls: Vec<Url>, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(DownloadImages { urls, stack }) {
log::error!("Failed to queue image download job: {}", e);
}
}
fn purge_file(&self, file_id: Uuid) {
if let Err(e) = self.0.queue(RemoveFile { file_id }) {
log::error!("Failed to queue file delete job: {}", e);
}
}
fn process(&self, any_base: AnyBase, stack: Vec<AnyBase>) {
if let Err(e) = self.0.queue(Ingest { any_base, stack }) {
log::error!("Failed to queue process job: {}", e);
}
}
fn deliver(&self, any_base: AnyBase, inboxes: Vec<Url>) {
if let Err(e) = self.0.queue(DeliverMany { any_base, inboxes }) {
log::error!("Failed to queue deliver job: {}", e);
}
}
}
const MAX_INGEST_DEPTH: usize = 30;
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct RemoveFile {
file_id: Uuid,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct Deliver {
any_base: AnyBase,
url: Url,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DeliverMany {
any_base: AnyBase,
inboxes: Vec<Url>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DownloadImages {
urls: Vec<Url>,
stack: Vec<AnyBase>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct DownloadApub {
url: Url,
stack: Vec<AnyBase>,
}
#[derive(Clone, serde::Deserialize, serde::Serialize)]
struct Ingest {
any_base: AnyBase,
stack: Vec<AnyBase>,
}
impl ActixJob for RemoveFile {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "RemoveFile";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
state.profiles.delete_file(self.file_id).await?;
Ok(())
})
}
}
impl ActixJob for Deliver {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "Deliver";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let res = state
.client
.post(self.url.as_str())
.content_type("application/activity+json")
.send_json(&self.any_base)
.await
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
if !res.status().is_success() {
return Err(anyhow::anyhow!("Status: {}", res.status()));
}
Ok(())
})
}
}
impl ActixJob for DeliverMany {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DeliverMany";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
for url in self.inboxes {
let res = state.spawn.0.queue(Deliver {
url,
any_base: self.any_base.clone(),
});
if let Err(e) = res {
log::error!("Error spawning deliver job: {}", e);
}
}
Ok(())
})
}
}
impl ActixJob for DownloadImages {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DownloadImages";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let mut failed_urls = vec![];
for url in self.urls {
if let Err(e) = state.profiles.download_image(&url).await {
log::warn!("Failed to download image: {}", e);
failed_urls.push(url);
}
}
if !failed_urls.is_empty() {
return Err(anyhow::anyhow!(
"Failed to download all images, {} remain",
failed_urls.len()
));
}
Ok(())
})
}
}
impl ActixJob for DownloadApub {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "DownloadApub";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
let mut res = state
.client
.get(self.url.as_str())
.header("Accept", "application/activity+json")
.send()
.await
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
let any_base: AnyBase = res
.json()
.await
.map_err(|e| anyhow::anyhow!("Json: {}", e))?;
state.spawn.process(any_base, self.stack);
Ok(())
})
}
}
impl ActixJob for Ingest {
type State = State;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "Ingest";
const QUEUE: &'static str = "ActivityPub";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
const TIMEOUT: i64 = 30;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move {
if self.stack.len() > MAX_INGEST_DEPTH {
return Err(anyhow::anyhow!("Max recursion depth exceded"));
}
state.profiles.ingest(self.any_base, self.stack).await?;
Ok(())
})
}
}