apub/examples/actix-web-example/src/main.rs

170 lines
4.1 KiB
Rust

#![feature(generic_associated_types)]
use actix_web::{middleware::Logger, web, App, HttpServer, ResponseError};
use apub_actix_web::{inbox, serve_objects, RepoFactory, SignatureConfig, Verifier, VerifyError};
use apub_core::{
deref::{Dereference, Repo},
digest::DigestFactory,
ingest::Ingest,
};
use apub_rustcrypto::{RsaVerifier, RustcryptoError, Sha256Digest};
use dashmap::DashMap;
use example_types::{AcceptedActivity, ActivityType, ObjectId};
use std::{
future::{ready, Ready},
sync::Arc,
};
use url::Url;
#[derive(Clone, Default)]
struct MemoryRepo {
inner: Arc<DashMap<Url, serde_json::Value>>,
}
#[derive(Clone)]
struct RequestVerifier {
repo: MemoryRepo,
}
#[derive(Clone)]
struct ActivityIngester {
repo: MemoryRepo,
}
#[derive(Debug, thiserror::Error)]
enum ServerError {
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Verify(#[from] VerifyError),
#[error(transparent)]
Rustcrypto(#[from] RustcryptoError),
}
impl MemoryRepo {
fn insert<D>(&self, id: D, item: &D::Output) -> Result<(), serde_json::Error>
where
D: Dereference,
D::Output: serde::ser::Serialize,
{
let value = serde_json::to_value(item)?;
self.inner.insert(id.url().clone(), value);
Ok(())
}
}
impl ResponseError for ServerError {}
impl Repo for MemoryRepo {
type Error = serde_json::Error;
type Future<'a, Id: Dereference + 'a>
where
Self: 'a,
= Ready<Result<Option<Id::Output>, Self::Error>>;
fn fetch<'a, D: Dereference + 'a>(&'a self, id: D) -> Self::Future<'a, D> {
if let Some(obj_ref) = self.inner.get(id.url()) {
match serde_json::from_value(obj_ref.clone()) {
Ok(output) => ready(Ok(Some(output))),
Err(e) => ready(Err(e)),
}
} else {
ready(Ok(None))
}
}
}
impl RepoFactory for MemoryRepo {
type RepoError = serde_json::Error;
type Repo<'a>
where
Self: 'a,
= &'a MemoryRepo;
fn repo<'a>(&'a self) -> Self::Repo<'a> {
self
}
}
impl Verifier for RequestVerifier {
type VerifyError = RustcryptoError;
type Verify = RsaVerifier;
}
impl RepoFactory for RequestVerifier {
type RepoError = serde_json::Error;
type Repo<'a>
where
Self: 'a,
= &'a MemoryRepo;
fn repo<'a>(&'a self) -> Self::Repo<'a> {
&self.repo
}
}
impl DigestFactory for RequestVerifier {
type Digest = Sha256Digest;
}
impl<'a> Ingest<'a, AcceptedActivity, ()> for ActivityIngester {
type Error = serde_json::Error;
type Future = Ready<Result<(), Self::Error>>;
fn ingest(
&'a self,
_authority: Option<Url>,
activity: AcceptedActivity,
_metadata: (),
) -> Self::Future {
ready(self.repo.insert(activity.id().clone(), &activity))
}
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var("RUST_LOG").is_ok() {
env_logger::builder().init()
} else {
env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.init();
};
let repo = MemoryRepo::default();
let verifier = RequestVerifier { repo: repo.clone() };
let ingest = ActivityIngester { repo: repo.clone() };
let config = SignatureConfig::default();
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.service(
web::scope("/shared_inbox").configure(inbox::<_, _, _, _, ServerError>(
ingest.clone(),
verifier.clone(),
config.clone(),
false,
)),
)
.service(web::scope("/activities").configure(serve_objects::<
ObjectId<ActivityType>,
_,
_,
ServerError,
>(
repo.clone(),
verifier.clone(),
config.clone(),
"http://localhost:8008".to_string(),
false,
)))
})
.bind("127.0.0.1:8008")?
.run()
.await?;
Ok(())
}