142 lines
3.4 KiB
Rust
142 lines
3.4 KiB
Rust
use activitystreams::base::AnyBase;
|
|
use actix_rt::Arbiter;
|
|
use actix_web::client::Client;
|
|
use sled::Db;
|
|
use std::{fmt, sync::Arc};
|
|
use url::Url;
|
|
use uuid::Uuid;
|
|
|
|
#[macro_export]
|
|
macro_rules! recover {
|
|
($url:expr, $expr:expr) => {
|
|
match $expr {
|
|
Some(item) => item,
|
|
None => return Ok(Err(RecoverableError::MissingApub($url.to_owned()))),
|
|
}
|
|
};
|
|
}
|
|
|
|
mod actions;
|
|
pub mod apub;
|
|
pub mod pictrs;
|
|
pub mod store;
|
|
|
|
use apub::ApubIds;
|
|
use pictrs::ImageInfo;
|
|
|
|
#[derive(Clone)]
|
|
struct Context {
|
|
store: store::Store,
|
|
apub: apub::Store,
|
|
arbiter: Arbiter,
|
|
spawner: Arc<dyn Spawner + Send + Sync>,
|
|
}
|
|
|
|
impl Context {
|
|
// async because it requires execution when an Arbiter exists, avoids panic
|
|
async fn from_state(state: &State) -> Self {
|
|
Context {
|
|
store: state.store.clone(),
|
|
apub: state.apub.clone(),
|
|
arbiter: Arbiter::current(),
|
|
spawner: state.spawner.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum Error {
|
|
#[error("Missing required item")]
|
|
Missing,
|
|
|
|
#[error("ActivityPub object is malformed")]
|
|
Invalid,
|
|
|
|
#[error("Failed to serialize or deseiralize data: {0}")]
|
|
Json(#[from] serde_json::Error),
|
|
|
|
#[error("Error in DB: {0}")]
|
|
Store(#[from] store::StoreError),
|
|
|
|
#[error("Error in DB: {0}")]
|
|
Apub(#[from] apub::StoreError),
|
|
}
|
|
|
|
pub trait Spawner {
|
|
fn download_apub(&self, url: Url, stack: Vec<AnyBase>);
|
|
fn download_images(&self, images: Vec<Url>, stack: Vec<AnyBase>);
|
|
fn purge_file(&self, file_id: Uuid);
|
|
fn process(&self, any_base: AnyBase, stack: Vec<AnyBase>);
|
|
}
|
|
|
|
enum RecoverableError {
|
|
MissingApub(Url),
|
|
MissingImages(Vec<Url>),
|
|
}
|
|
|
|
trait Action {
|
|
fn perform(&self, context: &Context) -> Result<(), Error>;
|
|
}
|
|
|
|
trait Required<T> {
|
|
fn req(self) -> Result<T, Error>;
|
|
}
|
|
|
|
impl<T> Required<T> for Option<T> {
|
|
fn req(self) -> Result<T, Error> {
|
|
self.ok_or_else(|| Error::Missing)
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct State {
|
|
pub store: store::Store,
|
|
pub apub: apub::Store,
|
|
pub pictrs: pictrs::State,
|
|
pub spawner: Arc<dyn Spawner + Send + Sync>,
|
|
_db: Db,
|
|
}
|
|
|
|
impl State {
|
|
pub fn build(
|
|
pictrs_upstream: Url,
|
|
image_info: impl ImageInfo + 'static,
|
|
apub_info: impl ApubIds + Send + Sync + 'static,
|
|
spawner: impl Spawner + Send + Sync + 'static,
|
|
client: Client,
|
|
db: Db,
|
|
) -> Result<Self, sled::Error> {
|
|
Ok(State {
|
|
store: store::Store::build(&db)?,
|
|
apub: apub::Store::build(apub_info, &db)?,
|
|
pictrs: pictrs::State::new(pictrs_upstream, image_info, client),
|
|
spawner: Arc::new(spawner),
|
|
_db: db,
|
|
})
|
|
}
|
|
|
|
pub async fn ingest(&self, any_base: AnyBase, stack: Vec<AnyBase>) -> Result<(), Error> {
|
|
let context = Context::from_state(self).await;
|
|
|
|
actions::ingest(any_base, &context, stack)?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl fmt::Debug for State {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
f.debug_struct("State")
|
|
.field("store", &self.store)
|
|
.field("apub", &self.apub)
|
|
.field("pictrs", &self.pictrs)
|
|
.field("spawner", &"Arc<dyn Spawner>")
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl From<activitystreams::error::DomainError> for Error {
|
|
fn from(_: activitystreams::error::DomainError) -> Self {
|
|
Self::Invalid
|
|
}
|
|
}
|