use crate::State; use activitystreams::base::AnyBase; use background_jobs::{ create_server, memory_storage::Storage, ActixJob, Backoff, MaxRetries, QueueHandle, WorkerConfig, }; use futures_core::future::LocalBoxFuture; use hyaenidae_profiles::Spawner; use url::Url; use uuid::Uuid; pub(super) fn build( base_url: Url, pict_rs_upstream: Url, db: sled::Db, ) -> Result { let storage = Storage::new(); let queue_handle = create_server(storage); let inner_queue_handle = queue_handle.clone(); WorkerConfig::new(move || { crate::State::new( Spawn(inner_queue_handle.clone()), base_url.clone(), pict_rs_upstream.clone(), &db.clone(), ) .unwrap() }) .register::() .register::() .register::() .register::() .register::() .register::() .set_worker_count("ActivityPub", 16) .start(queue_handle.clone()); Ok(Spawn(queue_handle)) } #[derive(Clone)] pub(super) struct Spawn(QueueHandle); impl Spawner for Spawn { fn download_apub(&self, url: Url, stack: Vec) { if let Err(e) = self.0.queue(DownloadApub { url, stack }) { log::error!("Failed to queue download job: {}", e); } } fn download_images(&self, urls: Vec, stack: Vec) { if let Err(e) = self.0.queue(DownloadImages { urls, stack }) { log::error!("Failed to queue image download job: {}", e); } } fn purge_file(&self, file_id: Uuid) { if let Err(e) = self.0.queue(RemoveFile { file_id }) { log::error!("Failed to queue file delete job: {}", e); } } fn process(&self, any_base: AnyBase, stack: Vec) { if let Err(e) = self.0.queue(Ingest { any_base, stack }) { log::error!("Failed to queue process job: {}", e); } } fn deliver(&self, any_base: AnyBase, inboxes: Vec) { if let Err(e) = self.0.queue(DeliverMany { any_base, inboxes }) { log::error!("Failed to queue deliver job: {}", e); } } } const MAX_INGEST_DEPTH: usize = 30; #[derive(Clone, serde::Deserialize, serde::Serialize)] struct RemoveFile { file_id: Uuid, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct Deliver { any_base: AnyBase, url: Url, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DeliverMany { any_base: AnyBase, inboxes: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DownloadImages { urls: Vec, stack: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DownloadApub { url: Url, stack: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct Ingest { any_base: AnyBase, stack: Vec, } impl ActixJob for RemoveFile { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "RemoveFile"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { state.profiles.delete_file(self.file_id).await?; Ok(()) }) } } impl ActixJob for Deliver { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "Deliver"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let res = state .client .post(self.url.as_str()) .content_type("application/activity+json") .send_json(&self.any_base) .await .map_err(|e| anyhow::anyhow!("Request: {}", e))?; if !res.status().is_success() { return Err(anyhow::anyhow!("Status: {}", res.status())); } Ok(()) }) } } impl ActixJob for DeliverMany { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DeliverMany"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { for url in self.inboxes { let res = state.spawn.0.queue(Deliver { url, any_base: self.any_base.clone(), }); if let Err(e) = res { log::error!("Error spawning deliver job: {}", e); } } Ok(()) }) } } impl ActixJob for DownloadImages { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DownloadImages"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let mut failed_urls = vec![]; for url in self.urls { if let Err(e) = state.profiles.download_image(&url).await { log::warn!("Failed to download image: {}", e); failed_urls.push(url); } } if !failed_urls.is_empty() { return Err(anyhow::anyhow!( "Failed to download all images, {} remain", failed_urls.len() )); } Ok(()) }) } } impl ActixJob for DownloadApub { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DownloadApub"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let mut res = state .client .get(self.url.as_str()) .header("Accept", "application/activity+json") .send() .await .map_err(|e| anyhow::anyhow!("Request: {}", e))?; let any_base: AnyBase = res .json() .await .map_err(|e| anyhow::anyhow!("Json: {}", e))?; state.spawn.process(any_base, self.stack); Ok(()) }) } } impl ActixJob for Ingest { type State = State; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "Ingest"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { if self.stack.len() > MAX_INGEST_DEPTH { return Err(anyhow::anyhow!("Max recursion depth exceded")); } state.profiles.ingest(self.any_base, self.stack).await?; Ok(()) }) } }