asonix
e0858b7b3d
Currently i18n'd - login page - register page - cookie page - delete account confirmation page - 404 page - 500 page - part of account settings
474 lines
14 KiB
Rust
474 lines
14 KiB
Rust
use crate::{Error, OptionExt, State};
|
|
use activitystreams::base::AnyBase;
|
|
use actix_rt::Arbiter;
|
|
use actix_web::client::Client;
|
|
use background_jobs::{
|
|
create_server, memory_storage::Storage, ActixJob, Backoff, MaxRetries, QueueHandle,
|
|
WorkerConfig,
|
|
};
|
|
use futures::future::LocalBoxFuture;
|
|
use http_signature_normalization_actix::prelude::*;
|
|
use hyaenidae_profiles::{MissingImage, OnBehalfOf, Spawner};
|
|
use rsa::RSAPrivateKey;
|
|
use url::Url;
|
|
use uuid::Uuid;
|
|
|
|
#[derive(Clone)]
|
|
struct JobState {
|
|
state: State,
|
|
client: Client,
|
|
}
|
|
|
|
pub(super) fn build(
|
|
base_url: Url,
|
|
pict_rs_upstream: Url,
|
|
db: sled::Db,
|
|
) -> Result<State, anyhow::Error> {
|
|
let storage = Storage::new();
|
|
|
|
let queue_handle = create_server(storage);
|
|
|
|
let state = State::new(
|
|
Spawn(queue_handle.clone()),
|
|
base_url,
|
|
pict_rs_upstream,
|
|
Arbiter::current(),
|
|
&db,
|
|
)?;
|
|
|
|
let state2 = state.clone();
|
|
WorkerConfig::new(move || JobState {
|
|
state: state2.clone(),
|
|
client: crate::build_client(),
|
|
})
|
|
.register::<Ingest>()
|
|
.register::<DownloadApub>()
|
|
.register::<DownloadApubAnonymous>()
|
|
.register::<DownloadImages>()
|
|
.register::<DeliverMany>()
|
|
.register::<Deliver>()
|
|
.register::<RemoveFile>()
|
|
.set_worker_count("ActivityPub", 16)
|
|
.start(queue_handle.clone());
|
|
|
|
Ok(state)
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub(super) struct Spawn(QueueHandle);
|
|
|
|
impl Spawn {
|
|
pub(crate) fn ingest(&self, any_base: AnyBase, key_id: Url) {
|
|
if let Err(e) = self.0.queue(Ingest {
|
|
any_base,
|
|
key_id: Some(key_id),
|
|
stack: vec![],
|
|
}) {
|
|
log::error!("Failed to queue ingest: {}", e);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn download_apub_anonymous(&self, url: Url) {
|
|
if let Err(e) = self.0.queue(DownloadApubAnonymous { url }) {
|
|
log::error!("Failed to queue download job: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Spawner for Spawn {
|
|
fn download_apub(&self, on_behalf_of: OnBehalfOf, url: Url, stack: Vec<AnyBase>) {
|
|
if let Err(e) = self.0.queue(DownloadApub {
|
|
on_behalf_of,
|
|
url,
|
|
stack,
|
|
}) {
|
|
log::error!("Failed to queue download job: {}", e);
|
|
}
|
|
}
|
|
|
|
fn download_images(&self, urls: Vec<MissingImage>, stack: Vec<AnyBase>) {
|
|
if let Err(e) = self.0.queue(DownloadImages { urls, stack }) {
|
|
log::error!("Failed to queue image download job: {}", e);
|
|
}
|
|
}
|
|
|
|
fn purge_file(&self, file_id: Uuid) {
|
|
if let Err(e) = self.0.queue(RemoveFile { file_id }) {
|
|
log::error!("Failed to queue file delete job: {}", e);
|
|
}
|
|
}
|
|
|
|
fn process(&self, any_base: AnyBase, stack: Vec<AnyBase>) {
|
|
if let Err(e) = self.0.queue(Ingest {
|
|
any_base,
|
|
key_id: None,
|
|
stack,
|
|
}) {
|
|
log::error!("Failed to queue process job: {}", e);
|
|
}
|
|
}
|
|
|
|
fn deliver(&self, on_behalf_of: OnBehalfOf, any_base: AnyBase, inboxes: Vec<Url>) {
|
|
if let Err(e) = self.0.queue(DeliverMany {
|
|
on_behalf_of,
|
|
any_base,
|
|
inboxes,
|
|
}) {
|
|
log::error!("Failed to queue deliver job: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
const MAX_INGEST_DEPTH: usize = 30;
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct RemoveFile {
|
|
file_id: Uuid,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct Deliver {
|
|
any_base: AnyBase,
|
|
url: Url,
|
|
key_id: Url,
|
|
private_key: String,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct DeliverMany {
|
|
on_behalf_of: OnBehalfOf,
|
|
any_base: AnyBase,
|
|
inboxes: Vec<Url>,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct DownloadImages {
|
|
urls: Vec<MissingImage>,
|
|
stack: Vec<AnyBase>,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct DownloadApubAnonymous {
|
|
url: Url,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct DownloadApub {
|
|
on_behalf_of: OnBehalfOf,
|
|
url: Url,
|
|
stack: Vec<AnyBase>,
|
|
}
|
|
|
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
|
struct Ingest {
|
|
any_base: AnyBase,
|
|
key_id: Option<Url>,
|
|
stack: Vec<AnyBase>,
|
|
}
|
|
|
|
impl ActixJob for RemoveFile {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "RemoveFile";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
state
|
|
.state
|
|
.profiles
|
|
.delete_file(self.file_id, &state.client)
|
|
.await?;
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for Deliver {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "Deliver";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
let signer = Signer::new(&self.private_key)?;
|
|
|
|
let item_string = serde_json::to_string(&self.any_base)?;
|
|
|
|
let res = state
|
|
.client
|
|
.post(self.url.as_str())
|
|
.header("Accept", "application/activity+json")
|
|
.content_type("application/activity+json")
|
|
.signature_with_digest(
|
|
signature_config(),
|
|
self.key_id.to_string(),
|
|
signer.hasher(),
|
|
item_string,
|
|
move |signing_string| signer.sign(signing_string),
|
|
)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Signature: {}", e))?
|
|
.send()
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
|
|
|
|
if !res.status().is_success() {
|
|
return Err(anyhow::anyhow!("Status: {}", res.status()));
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for DeliverMany {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "DeliverMany";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
let apub_store = state.state.profiles.apub.clone();
|
|
let server_store = state.state.profiles.store.servers.clone();
|
|
let behalf = self.on_behalf_of;
|
|
|
|
let (private_key, key_id): (String, Url) = actix_web::web::block(move || {
|
|
let key_id = match behalf {
|
|
OnBehalfOf::Server => {
|
|
let server_id = server_store.get_self()?.req()?;
|
|
apub_store.key_for_server(server_id)?.req()?
|
|
}
|
|
OnBehalfOf::Profile(profile_id) => {
|
|
apub_store.key_for_profile(profile_id)?.req()?
|
|
}
|
|
};
|
|
|
|
let private_key = apub_store.private_key_for_id(&key_id)?.req()?;
|
|
Ok((private_key, key_id)) as Result<_, Error>
|
|
})
|
|
.await?;
|
|
|
|
for url in self.inboxes {
|
|
let res = state.state.spawn.0.queue(Deliver {
|
|
url,
|
|
any_base: self.any_base.clone(),
|
|
private_key: private_key.clone(),
|
|
key_id: key_id.clone(),
|
|
});
|
|
|
|
if let Err(e) = res {
|
|
log::error!("Error spawning deliver job: {}", e);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for DownloadImages {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "DownloadImages";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(mut self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
let mut failed_urls = vec![];
|
|
|
|
for url in self.urls {
|
|
if let Err(e) = state
|
|
.state
|
|
.profiles
|
|
.download_image(&url, &state.client)
|
|
.await
|
|
{
|
|
log::warn!("Failed to download image: {}", e);
|
|
failed_urls.push(url);
|
|
}
|
|
}
|
|
|
|
if !failed_urls.is_empty() {
|
|
return Err(anyhow::anyhow!(
|
|
"Failed to download all images, {} remain",
|
|
failed_urls.len()
|
|
));
|
|
}
|
|
|
|
if let Some(any_base) = self.stack.pop() {
|
|
state.state.spawn.process(any_base, self.stack);
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for DownloadApubAnonymous {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "DownloadApubAnonymous";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
let mut res = state
|
|
.client
|
|
.get(self.url.as_str())
|
|
.header("Accept", "application/activity+json")
|
|
.send()
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
|
|
|
|
let any_base: AnyBase = res
|
|
.json()
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Json: {}", e))?;
|
|
|
|
state.state.spawn.process(any_base, vec![]);
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for DownloadApub {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "DownloadApub";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
let apub_store = state.state.profiles.apub.clone();
|
|
let server_store = state.state.profiles.store.servers.clone();
|
|
let behalf = self.on_behalf_of;
|
|
|
|
let (private_key, key_id): (String, Url) = actix_web::web::block(move || {
|
|
let key_id = match behalf {
|
|
OnBehalfOf::Server => {
|
|
let server_id = server_store.get_self()?.req()?;
|
|
apub_store.key_for_server(server_id)?.req()?
|
|
}
|
|
OnBehalfOf::Profile(profile_id) => {
|
|
apub_store.key_for_profile(profile_id)?.req()?
|
|
}
|
|
};
|
|
|
|
let private_key = apub_store.private_key_for_id(&key_id)?.req()?;
|
|
Ok((private_key, key_id)) as Result<_, Error>
|
|
})
|
|
.await?;
|
|
|
|
let signer = Signer::new(&private_key)?;
|
|
|
|
let mut res = state
|
|
.client
|
|
.get(self.url.as_str())
|
|
.header("Accept", "application/activity+json")
|
|
.signature(
|
|
signature_config(),
|
|
key_id.to_string(),
|
|
move |signing_string| signer.sign(signing_string),
|
|
)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Signature: {}", e))?
|
|
.send()
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Request: {}", e))?;
|
|
|
|
let any_base: AnyBase = res
|
|
.json()
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Json: {}", e))?;
|
|
|
|
state.state.spawn.process(any_base, self.stack);
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ActixJob for Ingest {
|
|
type State = JobState;
|
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
|
|
|
const NAME: &'static str = "Ingest";
|
|
const QUEUE: &'static str = "ActivityPub";
|
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
|
|
const BACKOFF: Backoff = Backoff::Exponential(3);
|
|
const TIMEOUT: i64 = 30;
|
|
|
|
fn run(self, state: Self::State) -> Self::Future {
|
|
Box::pin(async move {
|
|
if self.stack.len() > MAX_INGEST_DEPTH {
|
|
return Err(anyhow::anyhow!("Max recursion depth exceded"));
|
|
}
|
|
state
|
|
.state
|
|
.profiles
|
|
.ingest(self.any_base, self.key_id, self.stack)
|
|
.await?;
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
fn signature_config() -> http_signature_normalization_actix::Config {
|
|
http_signature_normalization_actix::Config::new().set_host_header()
|
|
}
|
|
|
|
struct Signer(RSAPrivateKey);
|
|
|
|
impl Signer {
|
|
fn new(private_key: &str) -> Result<Self, anyhow::Error> {
|
|
use rsa_pem::KeyExt;
|
|
|
|
Ok(Signer(RSAPrivateKey::from_pem_pkcs8(private_key)?))
|
|
}
|
|
|
|
fn hasher(&self) -> sha2::Sha256 {
|
|
sha2::Digest::new()
|
|
}
|
|
|
|
fn sign(&self, signing_string: &str) -> Result<String, anyhow::Error> {
|
|
use sha2::{Digest, Sha256};
|
|
|
|
let hashed = Sha256::digest(signing_string.as_bytes());
|
|
let bytes = self.0.sign(
|
|
rsa::PaddingScheme::PKCS1v15Sign {
|
|
hash: Some(rsa::Hash::SHA2_256),
|
|
},
|
|
&hashed,
|
|
)?;
|
|
Ok(base64::encode(bytes))
|
|
}
|
|
}
|