hyaenidae/profiles/src/lib.rs
asonix 464429747d Profiles: Add Reports, and the ability to suspend profiles
- Plus: Don't delete comments on delete, just mark as deleted
- TODO: Purge text from deleted comments
2021-01-13 22:43:43 -06:00

274 lines
7.5 KiB
Rust

use activitystreams::base::AnyBase;
use actix_rt::Arbiter;
use actix_web::{client::Client, dev::Payload, HttpRequest};
use sled::Db;
use std::{fmt, sync::Arc};
use url::Url;
use uuid::Uuid;
#[macro_export]
macro_rules! recover {
($url:expr, $expr:expr) => {
match $expr {
Some(item) => item,
None => return Ok(Err(RecoverableError::MissingApub($url.to_owned()))),
}
};
}
pub mod apub;
pub mod pictrs;
pub mod store;
use apub::ApubIds;
use pictrs::ImageInfo;
#[derive(Clone)]
pub struct Context {
store: store::Store,
apub: apub::Store,
pictrs: Arc<dyn pictrs::ImageInfo + Send + Sync>,
arbiter: Arbiter,
spawner: Arc<dyn Spawner + Send + Sync>,
}
impl Context {
// async because it requires execution when an Arbiter exists, avoids panic
async fn from_state(state: &State) -> Self {
Context {
store: state.store.clone(),
apub: state.apub.clone(),
pictrs: state.pictrs.info.clone(),
arbiter: Arbiter::current(),
spawner: state.spawner.clone(),
}
}
fn deliver(&self, completed: &dyn Outbound) {
let res = completed
.inboxes(self)
.and_then(|inboxes| completed.to_apub(self).map(|any_base| (inboxes, any_base)));
match res {
Ok((inboxes, any_base)) => self.spawner.deliver(any_base, inboxes),
Err(e) => log::error!("Error spawning deliver task: {}", e),
}
}
fn is_local(&self, id: Uuid) -> Result<bool, Error> {
Ok(self.store.profiles.is_local(id)?.req()?)
}
fn check_block(&self, left: Uuid, right: Uuid) -> Result<(), Error> {
let forward = self.store.view.blocks.by_forward(left, right)?.is_some();
let backward = self.store.view.blocks.by_backward(right, left)?.is_some();
if forward || backward {
return Err(Error::Invalid);
}
Ok(())
}
fn spawn_blocking<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.arbiter.send(Box::pin(async move {
let _ = actix_web::web::block(move || {
(f)();
Ok(()) as Result<(), ()>
})
.await;
}));
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Missing required item")]
Missing,
#[error("ActivityPub object is malformed")]
Invalid,
#[error("Panic in blocking task")]
Panic,
#[error("Object was deleted")]
Deleted,
#[error("Error deleting file: {0}")]
DeleteFile(#[from] pictrs::DeleteError),
#[error("Error uploading file: {0}")]
UploadFile(#[from] pictrs::UploadError),
#[error("Failed to serialize or deseiralize data: {0}")]
Json(#[from] serde_json::Error),
#[error("Error in DB: {0}")]
Store(#[from] store::StoreError),
#[error("Error in DB: {0}")]
Apub(#[from] apub::StoreError),
}
pub trait Spawner {
fn download_apub(&self, url: Url, stack: Vec<AnyBase>);
fn download_images(&self, images: Vec<Url>, stack: Vec<AnyBase>);
fn purge_file(&self, file_id: Uuid);
fn process(&self, any_base: AnyBase, stack: Vec<AnyBase>);
fn deliver(&self, any_base: AnyBase, inboxes: Vec<Url>);
}
enum RecoverableError {
MissingApub(Url),
MissingImages(Vec<Url>),
}
pub trait Action {
fn perform(&self, context: &Context) -> Result<Option<Box<dyn Outbound + Send>>, Error>;
}
pub trait Outbound {
fn id(&self) -> Option<Uuid>;
fn inboxes(&self, context: &Context) -> Result<Vec<Url>, Error>;
fn to_apub(&self, context: &Context) -> Result<AnyBase, Error>;
}
trait Required<T> {
fn req(self) -> Result<T, Error>;
}
impl<T> Required<T> for Option<T> {
fn req(self) -> Result<T, Error> {
self.ok_or_else(|| Error::Missing)
}
}
#[derive(Clone)]
pub struct State {
pub store: store::Store,
pub apub: apub::Store,
pub pictrs: pictrs::State,
pub spawner: Arc<dyn Spawner + Send + Sync>,
_db: Db,
}
impl State {
pub fn build(
pictrs_upstream: Url,
image_info: impl ImageInfo + Send + Sync + 'static,
apub_info: impl ApubIds + Send + Sync + 'static,
spawner: impl Spawner + Send + Sync + 'static,
client: Client,
db: Db,
) -> Result<Self, sled::Error> {
Ok(State {
store: store::Store::build(&db)?,
apub: apub::Store::build(apub_info, &db)?,
pictrs: pictrs::State::new(pictrs_upstream, image_info, client),
spawner: Arc::new(spawner),
_db: db,
})
}
pub async fn run(&self, action: impl Action + Send + 'static) -> Result<Option<Uuid>, Error> {
let ctx = Context::from_state(self).await;
let opt = actix_web::web::block(move || match action.perform(&ctx)? {
Some(outbound) => {
let id = outbound.id();
let context_clone = ctx.clone();
ctx.spawn_blocking(move || context_clone.deliver(&*outbound));
Ok(id)
}
None => Ok(None),
})
.await?;
Ok(opt)
}
pub async fn ingest(&self, any_base: AnyBase, stack: Vec<AnyBase>) -> Result<(), Error> {
let ctx = Context::from_state(self).await;
actix_web::web::block(move || apub::ingest(any_base, &ctx, stack)).await?;
Ok(())
}
pub async fn download_image(&self, url: &Url) -> Result<Uuid, Error> {
let images = self.pictrs.download_image(url).await?;
// safe because we already checked emptiness
let image = images.images().next().unwrap();
let file_source = store::FileSource::PictRs(store::PictRsFile::new(
image.key(),
image.token(),
image.width(),
image.height(),
image.mime(),
));
let file = self.store.files.create(&file_source)?;
Ok(file.id())
}
pub async fn upload_image(&self, req: HttpRequest, body: Payload) -> Result<Vec<Uuid>, Error> {
let images = self.pictrs.proxy_upload(req, body).await?;
let mut files = vec![];
for image in images.images() {
let file_source = store::FileSource::PictRs(store::PictRsFile::new(
image.key(),
image.token(),
image.width(),
image.height(),
image.mime(),
));
let file = self.store.files.create(&file_source)?;
files.push(file.id());
}
Ok(files)
}
pub async fn delete_file(&self, file_id: Uuid) -> Result<(), Error> {
let file = self.store.files.by_id(file_id)?.req()?;
let store::FileSource::PictRs(image) = file.source();
self.pictrs.delete_image(image.key(), image.token()).await?;
self.store.files.delete(file_id)?;
Ok(())
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("State")
.field("store", &self.store)
.field("apub", &self.apub)
.field("pictrs", &self.pictrs)
.field("spawner", &"Arc<dyn Spawner>")
.finish()
}
}
impl From<activitystreams::error::DomainError> for Error {
fn from(_: activitystreams::error::DomainError) -> Self {
Self::Invalid
}
}
impl From<actix_web::error::BlockingError<Error>> for Error {
fn from(e: actix_web::error::BlockingError<Error>) -> Self {
match e {
actix_web::error::BlockingError::Error(e) => e,
_ => Error::Panic,
}
}
}