use crate::{Error, OptionExt, State}; use activitystreams::base::AnyBase; use actix_rt::Arbiter; use actix_web::client::Client; use background_jobs::{ create_server, memory_storage::Storage, ActixJob, Backoff, MaxRetries, QueueHandle, WorkerConfig, }; use futures::future::LocalBoxFuture; use http_signature_normalization_actix::prelude::*; use hyaenidae_profiles::{MissingImage, OnBehalfOf, Spawner}; use rsa::RSAPrivateKey; use url::Url; use uuid::Uuid; #[derive(Clone)] struct JobState { state: State, client: Client, } pub(super) fn build( base_url: Url, pict_rs_upstream: Url, content_config: hyaenidae_profiles::ContentConfig, db: sled::Db, ) -> Result { let storage = Storage::new(); let queue_handle = create_server(storage); let state = State::new( Spawn(queue_handle.clone()), base_url, pict_rs_upstream, content_config, Arbiter::current(), &db, )?; let state2 = state.clone(); WorkerConfig::new(move || JobState { state: state2.clone(), client: crate::build_client(), }) .register::() .register::() .register::() .register::() .register::() .register::() .register::() .set_worker_count("ActivityPub", 16) .start(queue_handle.clone()); Ok(state) } #[derive(Clone)] pub(super) struct Spawn(QueueHandle); impl Spawn { pub(crate) fn ingest(&self, any_base: AnyBase, key_id: Url) { if let Err(e) = self.0.queue(Ingest { any_base, key_id: Some(key_id), stack: vec![], }) { log::error!("Failed to queue ingest: {}", e); } } pub(crate) fn download_apub_anonymous(&self, url: Url) { if let Err(e) = self.0.queue(DownloadApubAnonymous { url }) { log::error!("Failed to queue download job: {}", e); } } } impl Spawner for Spawn { fn download_apub(&self, on_behalf_of: OnBehalfOf, url: Url, stack: Vec) { if let Err(e) = self.0.queue(DownloadApub { on_behalf_of, url, stack, }) { log::error!("Failed to queue download job: {}", e); } } fn download_images(&self, urls: Vec, stack: Vec) { if let Err(e) = self.0.queue(DownloadImages { urls, stack }) { log::error!("Failed to queue image download job: {}", e); } } fn purge_file(&self, file_id: Uuid) { if let Err(e) = self.0.queue(RemoveFile { file_id }) { log::error!("Failed to queue file delete job: {}", e); } } fn process(&self, any_base: AnyBase, stack: Vec) { if let Err(e) = self.0.queue(Ingest { any_base, key_id: None, stack, }) { log::error!("Failed to queue process job: {}", e); } } fn deliver(&self, on_behalf_of: OnBehalfOf, any_base: AnyBase, inboxes: Vec) { if let Err(e) = self.0.queue(DeliverMany { on_behalf_of, any_base, inboxes, }) { log::error!("Failed to queue deliver job: {}", e); } } } const MAX_INGEST_DEPTH: usize = 30; #[derive(Clone, serde::Deserialize, serde::Serialize)] struct RemoveFile { file_id: Uuid, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct Deliver { any_base: AnyBase, url: Url, key_id: Url, private_key: String, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DeliverMany { on_behalf_of: OnBehalfOf, any_base: AnyBase, inboxes: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DownloadImages { urls: Vec, stack: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DownloadApubAnonymous { url: Url, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct DownloadApub { on_behalf_of: OnBehalfOf, url: Url, stack: Vec, } #[derive(Clone, serde::Deserialize, serde::Serialize)] struct Ingest { any_base: AnyBase, key_id: Option, stack: Vec, } impl ActixJob for RemoveFile { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "RemoveFile"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { state .state .profiles .delete_file(self.file_id, &state.client) .await?; Ok(()) }) } } impl ActixJob for Deliver { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "Deliver"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let signer = Signer::new(&self.private_key)?; let item_string = serde_json::to_string(&self.any_base)?; let res = state .client .post(self.url.as_str()) .header("Accept", "application/activity+json") .content_type("application/activity+json") .signature_with_digest( signature_config(), self.key_id.to_string(), signer.hasher(), item_string, move |signing_string| signer.sign(signing_string), ) .await .map_err(|e| anyhow::anyhow!("Signature: {}", e))? .send() .await .map_err(|e| anyhow::anyhow!("Request: {}", e))?; if !res.status().is_success() { return Err(anyhow::anyhow!("Status: {}", res.status())); } Ok(()) }) } } impl ActixJob for DeliverMany { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DeliverMany"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let apub_store = state.state.profiles.apub.clone(); let server_store = state.state.profiles.store.servers.clone(); let behalf = self.on_behalf_of; let (private_key, key_id): (String, Url) = actix_web::web::block(move || { let key_id = match behalf { OnBehalfOf::Server => { let server_id = server_store.get_self()?.req()?; apub_store.key_for_server(server_id)?.req()? } OnBehalfOf::Profile(profile_id) => { apub_store.key_for_profile(profile_id)?.req()? } }; let private_key = apub_store.private_key_for_id(&key_id)?.req()?; Ok((private_key, key_id)) as Result<_, Error> }) .await?; for url in self.inboxes { let res = state.state.spawn.0.queue(Deliver { url, any_base: self.any_base.clone(), private_key: private_key.clone(), key_id: key_id.clone(), }); if let Err(e) = res { log::error!("Error spawning deliver job: {}", e); } } Ok(()) }) } } impl ActixJob for DownloadImages { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DownloadImages"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(mut self, state: Self::State) -> Self::Future { Box::pin(async move { let mut failed_urls = vec![]; for url in self.urls { if let Err(e) = state .state .profiles .download_image(&url, &state.client) .await { log::warn!("Failed to download image: {}", e); failed_urls.push(url); } } if !failed_urls.is_empty() { return Err(anyhow::anyhow!( "Failed to download all images, {} remain", failed_urls.len() )); } if let Some(any_base) = self.stack.pop() { state.state.spawn.process(any_base, self.stack); } Ok(()) }) } } impl ActixJob for DownloadApubAnonymous { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DownloadApubAnonymous"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let mut res = state .client .get(self.url.as_str()) .header("Accept", "application/activity+json") .send() .await .map_err(|e| anyhow::anyhow!("Request: {}", e))?; let any_base: AnyBase = res .json() .await .map_err(|e| anyhow::anyhow!("Json: {}", e))?; state.state.spawn.process(any_base, vec![]); Ok(()) }) } } impl ActixJob for DownloadApub { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "DownloadApub"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { let apub_store = state.state.profiles.apub.clone(); let server_store = state.state.profiles.store.servers.clone(); let behalf = self.on_behalf_of; let (private_key, key_id): (String, Url) = actix_web::web::block(move || { let key_id = match behalf { OnBehalfOf::Server => { let server_id = server_store.get_self()?.req()?; apub_store.key_for_server(server_id)?.req()? } OnBehalfOf::Profile(profile_id) => { apub_store.key_for_profile(profile_id)?.req()? } }; let private_key = apub_store.private_key_for_id(&key_id)?.req()?; Ok((private_key, key_id)) as Result<_, Error> }) .await?; let signer = Signer::new(&private_key)?; let mut res = state .client .get(self.url.as_str()) .header("Accept", "application/activity+json") .signature( signature_config(), key_id.to_string(), move |signing_string| signer.sign(signing_string), ) .await .map_err(|e| anyhow::anyhow!("Signature: {}", e))? .send() .await .map_err(|e| anyhow::anyhow!("Request: {}", e))?; let any_base: AnyBase = res .json() .await .map_err(|e| anyhow::anyhow!("Json: {}", e))?; state.state.spawn.process(any_base, self.stack); Ok(()) }) } } impl ActixJob for Ingest { type State = JobState; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "Ingest"; const QUEUE: &'static str = "ActivityPub"; const MAX_RETRIES: MaxRetries = MaxRetries::Count(10); const BACKOFF: Backoff = Backoff::Exponential(3); const TIMEOUT: i64 = 30; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { if self.stack.len() > MAX_INGEST_DEPTH { return Err(anyhow::anyhow!("Max recursion depth exceded")); } state .state .profiles .ingest(self.any_base, self.key_id, self.stack) .await?; Ok(()) }) } } fn signature_config() -> http_signature_normalization_actix::Config { http_signature_normalization_actix::Config::new().set_host_header() } struct Signer(RSAPrivateKey); impl Signer { fn new(private_key: &str) -> Result { use rsa_pem::KeyExt; Ok(Signer(RSAPrivateKey::from_pem_pkcs8(private_key)?)) } fn hasher(&self) -> sha2::Sha256 { sha2::Digest::new() } fn sign(&self, signing_string: &str) -> Result { use sha2::{Digest, Sha256}; let hashed = Sha256::digest(signing_string.as_bytes()); let bytes = self.0.sign( rsa::PaddingScheme::PKCS1v15Sign { hash: Some(rsa::Hash::SHA2_256), }, &hashed, )?; Ok(base64::encode(bytes)) } }