use activitystreams::base::AnyBase; use actix_rt::Arbiter; use actix_web::{client::Client, dev::Payload, HttpRequest}; use hyaenidae_content::NodeView; use sled::Db; use std::{borrow::Cow, fmt, sync::Arc}; use url::Url; use uuid::Uuid; #[macro_export] macro_rules! recover { ($url:expr, $expr:expr) => { match $expr { Some(item) => item, None => return Ok(Err(RecoverableError::MissingApub($url.to_owned()))), } }; } pub mod apub; pub mod pictrs; pub mod store; mod viewer; use apub::ApubIds; use pictrs::ImageInfo; use store::{OwnerSource, ValidationError}; use viewer::Viewer; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Missing required item: {0}")] Missing(String), #[error("ActivityPub object is malformed")] Invalid, #[error("Panic in blocking task")] Panic, #[error("Object was deleted")] Deleted, #[error("Blocked")] Blocked, #[error("Validations failed")] Validate(Vec), #[error("Error deleting file: {0}")] DeleteFile(#[from] pictrs::DeleteError), #[error("Error uploading file: {0}")] UploadFile(#[from] pictrs::UploadError), #[error("Failed to serialize or deseiralize data: {0}")] Json(#[from] serde_json::Error), #[error("Error in DB: {0}")] Store(#[from] store::StoreError), #[error("Error in DB: {0}")] Apub(#[from] apub::StoreError), } impl From> for Error { fn from(v: Vec) -> Self { Self::Validate(v) } } impl Error { pub fn validation_errors(&self) -> Option<&[ValidationError]> { if let Self::Validate(errors) = &self { Some(errors.as_slice()) } else { None } } } #[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)] pub enum OnBehalfOf { Profile(Uuid), Server, } pub trait Spawner { fn download_apub(&self, on_behalf_of: OnBehalfOf, url: Url, stack: Vec); fn download_images(&self, images: Vec, stack: Vec); fn purge_file(&self, file_id: Uuid); fn process(&self, any_base: AnyBase, stack: Vec); fn deliver(&self, on_behalf_of: OnBehalfOf, any_base: AnyBase, inboxes: Vec); } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct MissingImage { apub_id: Url, image_url: Url, } enum RecoverableError { MissingApub(Url), MissingImages(Vec), } type Context = Arc; pub trait Action { fn perform(&self, context: &Context) -> Result>, Error>; } pub trait Outbound { fn id(&self) -> Option; fn behalf(&self, context: &Context) -> Result; fn inboxes(&self, context: &Context) -> Result, Error>; fn to_apub(&self, context: &Context) -> Result; fn ready(&self) -> bool { true } } pub trait UrlFor { fn profile(&self, profile: &store::Profile) -> String; fn icon(&self, file: &store::File) -> String; } trait Required { fn req(self, msg: &str) -> Result; } impl Required for Option { fn req(self, msg: &str) -> Result { self.ok_or_else(|| Error::Missing(msg.to_owned())) } } #[derive(Clone, Debug)] pub struct ContentConfig { pub max_bio_length: usize, pub max_display_name_length: usize, pub max_handle_length: usize, pub max_domain_length: usize, pub max_submission_title_length: usize, pub max_submission_body_length: usize, pub max_post_title_length: usize, pub max_post_body_length: usize, pub max_comment_length: usize, pub max_server_title_length: usize, pub max_server_body_length: usize, } #[derive(Clone, Copy, Debug)] enum KeyOwner { Server(Uuid), Profile(Uuid), } #[derive(Clone)] pub struct State { pub store: store::Store, pub apub: apub::Store, pub pictrs: pictrs::State, pub spawner: Arc, pub url_for: Arc, pub content_config: ContentConfig, pub arbiter: Arbiter, _db: Db, } impl State { pub fn build( pictrs_upstream: Url, image_info: impl ImageInfo + Send + Sync + 'static, apub_info: impl ApubIds + Send + Sync + 'static, spawner: impl Spawner + Send + Sync + 'static, url_for: impl UrlFor + Send + Sync + 'static, content_config: ContentConfig, arbiter: Arbiter, db: Db, ) -> Result, sled::Error> { Ok(Arc::new(State { store: store::Store::build(content_config.max_handle_length, &db)?, apub: apub::Store::build(apub_info, &db)?, pictrs: pictrs::State::new(pictrs_upstream, image_info), spawner: Arc::new(spawner), url_for: Arc::new(url_for), content_config, arbiter, _db: db, })) } fn map_nodeview<'a>(&self, view: NodeView<'a>) -> NodeView<'a> { match view { NodeView::IconText { handle, domain, .. } => { let id_res = self.store.profiles.by_handle(&handle, &domain); if let Ok(Some(id)) = &id_res { if let Ok(Some(profile)) = self.store.profiles.by_id(*id) { if let Some(file_id) = profile.icon() { if let Ok(Some(file)) = self.store.files.by_id(file_id) { return NodeView::IconText { handle, domain, href: Some(self.url_for.profile(&profile)), img: Some(self.url_for.icon(&file)), }; } } return NodeView::Handle { handle, domain, href: Some(self.url_for.profile(&profile)), }; } } NodeView::Text { text: Cow::Owned(format!("@{}@{}", handle, domain)), } } NodeView::Icon { handle, domain, .. } => { if let Ok(Some(id)) = self.store.profiles.by_handle(&handle, &domain) { if let Ok(Some(profile)) = self.store.profiles.by_id(id) { if let Some(file_id) = profile.icon() { if let Ok(Some(file)) = self.store.files.by_id(file_id) { return NodeView::Icon { handle, domain, href: Some(self.url_for.profile(&profile)), img: Some(self.url_for.icon(&file)), }; } } return NodeView::Handle { handle, domain, href: Some(self.url_for.profile(&profile)), }; } } NodeView::Text { text: Cow::Owned(format!("@{}@{}", handle, domain)), } } NodeView::Handle { handle, domain, .. } => { if let Ok(Some(id)) = self.store.profiles.by_handle(&handle, &domain) { if let Ok(Some(profile)) = self.store.profiles.by_id(id) { return NodeView::Handle { handle, domain, href: Some(self.url_for.profile(&profile)), }; } } NodeView::Text { text: Cow::Owned(format!("@{}@{}", handle, domain)), } } rest => rest, } } pub async fn create_server_actor(self: &Arc, domain: String) -> Result<(), Error> { let ctx = self.clone(); actix_web::web::block(move || { if ctx.store.servers.self_exists() { return Ok(()); } let action = apub::actions::CreateServer::from_domain(domain); match action.perform(&ctx)? { Some(outbound) => { let id = outbound.id().expect("Server Actor should exist"); ctx.store.servers.set_self(id)?; let context_clone = ctx.clone(); ctx.spawn_blocking(move || context_clone.deliver(&*outbound)); } None => panic!("Failed to generate server actor"), } Ok(()) }) .await?; Ok(()) } pub async fn run( self: &Arc, action: impl Action + Send + 'static, ) -> Result, Error> { let ctx = self.clone(); let opt = actix_web::web::block(move || match action.perform(&ctx)? { Some(outbound) => { let id = outbound.id(); let context_clone = ctx.clone(); ctx.spawn_blocking(move || context_clone.deliver(&*outbound)); Ok(id) } None => Ok(None), }) .await?; Ok(opt) } pub async fn ingest( self: &Arc, any_base: AnyBase, key_id: Option, stack: Vec, ) -> Result<(), Error> { let ctx = self.clone(); actix_web::web::block(move || { if let Some(key_id) = key_id { if let Some(profile) = ctx.apub.profile_for_key(&key_id)? { return apub::ingest(any_base, Some(KeyOwner::Profile(profile)), &ctx, stack); } if let Some(server) = ctx.apub.server_for_key(&key_id)? { return apub::ingest(any_base, Some(KeyOwner::Server(server)), &ctx, stack); } } else { return apub::ingest(any_base, None, &ctx, stack); } Err(Error::Invalid) }) .await?; Ok(()) } pub async fn serve_object( self: &Arc, object_id: Url, viewer_key_id: Option, ) -> Result, Error> { let ctx = self.clone(); let opt = actix_web::web::block(move || { if ctx.apub.deleted(&object_id)? { return Ok(None); } if let Some(viewer_key_id) = viewer_key_id { let profile_id_opt = ctx.apub.profile_for_key(&viewer_key_id)?; let server_id_opt = ctx.apub.server_for_key(&viewer_key_id)?; let viewer = match (profile_id_opt, server_id_opt) { (Some(profile_id), None) => Viewer::Profile(profile_id), (None, Some(server_id)) => Viewer::Server(server_id), (_, _) => { return Ok(None); } }; let stored_record = if let Some(id) = ctx.apub.id_for_apub(&object_id)? { id } else { return Ok(None); }; if viewer.can_view(stored_record, &ctx)? { let object = ctx.apub.object(&object_id)?; if object.is_none() {} Ok(object) } else { Ok(None) } } else { let stored_record = if let Some(id) = ctx.apub.id_for_apub(&object_id)? { id } else { return Ok(None); }; if ctx.is_public(stored_record)? { let object = ctx.apub.object(&object_id)?; if object.is_none() {} Ok(object) } else { Ok(None) } } }) .await?; Ok(opt) } pub async fn download_image( self: &Arc, missing_image: &MissingImage, client: &Client, ) -> Result { let images = self .pictrs .download_image(&missing_image.image_url, client) .await?; // safe because we already checked emptiness let image = images.images().next().unwrap(); let mime = image.mime(); let file_source = store::FileSource::PictRs(store::PictRsFile::new( image.key(), image.token(), image.width(), image.height(), image.mime(), )); let ctx = self.clone(); let missing_image = missing_image.clone(); let file_id = actix_web::web::block(move || { use activitystreams::prelude::*; let file = ctx.store.files.create(&file_source)?; ctx.apub.file(&missing_image.apub_id, file.id())?; let mut apub_image = activitystreams::object::Image::new(); apub_image .set_id(missing_image.apub_id.clone()) .set_media_type(mime) .set_url(missing_image.image_url.clone()); let apub_image = apub_image.into_any_base()?; ctx.apub.store_object(&apub_image)?; Ok(file.id()) }) .await?; Ok(file_id) } pub async fn upload_image<'a>( self: &'a Arc, req: HttpRequest, body: Payload, client: &'a Client, ) -> Result, Error> { let images = self.pictrs.proxy_upload(req, body, client).await?; let ctx = self.clone(); let files = actix_web::web::block(move || { let mut files = vec![]; for image in images.images() { let file_source = store::FileSource::PictRs(store::PictRsFile::new( image.key(), image.token(), image.width(), image.height(), image.mime(), )); let file = ctx.store.files.create(&file_source)?; files.push(file.id()); } Ok(files) }) .await?; Ok(files) } pub async fn delete_file( self: &Arc, file_id: Uuid, client: &Client, ) -> Result<(), Error> { let ctx = self.clone(); let file = actix_web::web::block(move || { ctx.store.files.by_id(file_id)?.req("File in delete_file") }) .await?; let store::FileSource::PictRs(image) = file.source(); self.pictrs .delete_image(image.key(), image.token(), client) .await?; let ctx = self.clone(); actix_web::web::block(move || Ok(ctx.store.files.delete(file_id)?)).await?; Ok(()) } fn is_public(&self, record: apub::StoredRecords) -> Result { match record { apub::StoredRecords::Server(server_id) => { let self_id = self.store.servers.get_self()?.req("get self server")?; if self_id == server_id { Ok(true) } else { Ok(false) } } _ => Ok(false), } } fn deliver(self: &Arc, completed: &dyn Outbound) { if completed.ready() { let fallible = || -> Result<(), Error> { let behalf = completed.behalf(self)?; let inboxes = completed.inboxes(self)?; let any_base = completed.to_apub(self)?; self.spawner.deliver(behalf, any_base, inboxes); Ok(()) }; if let Err(e) = (fallible)() { log::error!("Error spawning deliver task: {}", e); } } } fn is_local(&self, id: Uuid) -> Result { Ok(self .store .profiles .is_local(id)? .req("Profile in is_local")?) } fn is_self_server(&self, id: Uuid) -> Result { Ok(self.store.servers.is_self(id)?) } fn is_or_is_server_of(&self, key_owner: KeyOwner, actor: &Url) -> Result { let actor_id = self.apub.id_for_apub(actor)?.req("Apub for Actor")?; match key_owner { KeyOwner::Server(server_id) => { if let Some(actor_profile_id) = actor_id.profile() { let profile = self .store .profiles .by_id(actor_profile_id)? .req("profile by id")?; if let OwnerSource::Remote(actor_server_id) = profile.owner_source() { if *actor_server_id == server_id { return Ok(true); } } else { let self_id = self.store.servers.get_self()?.req("get self server")?; if self_id == server_id { return Ok(true); } } } if let Some(actor_server_id) = actor_id.server() { if server_id == actor_server_id { return Ok(true); } } } KeyOwner::Profile(profile_id) => { if let Some(actor_profile_id) = actor_id.profile() { if profile_id == actor_profile_id { return Ok(true); } } } } Ok(false) } fn check_block(&self, left: Uuid, right: Uuid) -> Result<(), Error> { let forward = self.store.view.blocks.by_forward(left, right)?.is_some(); let backward = self.store.view.blocks.by_forward(right, left)?.is_some(); if forward || backward { return Err(Error::Blocked); } Ok(()) } fn check_server_block(&self, left: Uuid, right: Uuid) -> Result<(), Error> { let forward = self .store .view .server_blocks .by_forward(left, right)? .is_some(); let backward = self .store .view .server_blocks .by_forward(right, left)? .is_some(); if forward || backward { return Err(Error::Blocked); } Ok(()) } fn spawn_blocking(&self, f: F) where F: FnOnce() + Send + 'static, { self.arbiter.send(Box::pin(async move { let _ = actix_web::web::block(move || { (f)(); Ok(()) as Result<(), ()> }) .await; })); } } impl fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("State") .field("store", &self.store) .field("apub", &self.apub) .field("pictrs", &self.pictrs) .field("spawner", &"Arc") .finish() } } impl From for Error { fn from(_: activitystreams::error::DomainError) -> Self { Self::Invalid } } impl From> for Error { fn from(e: actix_web::error::BlockingError) -> Self { match e { actix_web::error::BlockingError::Error(e) => e, _ => Error::Panic, } } }