use actix_web::{middleware::Logger, web, App, FromRequest, HttpServer, ResponseError}; use apub::{ activitypub::keys::{ publickey::{PublicKeyError, PublicKeyRepoFactory}, KeyId, PrivateKeyRepo, PrivateKeyRepoFactory, PublicKeyRepo, SimplePublicKey, }, clients::{ awc::{AwcError, SignatureConfig as AwcSignatureConfig}, AwcClient, }, cryptography::{ rustcrypto::{RsaVerifier, RustcryptoError, Sha256Digest}, DigestFactory, Rustcrypto, VerifyFactory, }, ingest::{ validate_authority, validate_hosts, validate_inbox, validators::{ AuthorityError, HostError, InboxError, InboxType, ValidateAuthority, ValidateHosts, ValidateInbox, }, Authority, }, servers::actix_web::{ inbox, serve_objects, Config, SignatureConfig as ActixWebSignatureConfig, VerifyError, }, session::{BreakerSession, RequestCountSession, SessionFactory}, Dereference, FullRepo, Ingest, Repo, RepoFactory, Session, }; use dashmap::DashMap; use example_types::{AcceptedActivity, ActivityType, ObjectId}; use rsa::{pkcs8::ToPrivateKey, RsaPrivateKey}; use std::{ future::{ready, Ready}, sync::Arc, time::Duration, }; use url::{Host, Url}; #[derive(Clone, Default)] struct MemoryRepo { inner: Arc>, } #[derive(Clone, Default)] struct PrivateKeyStore { inner: Arc>, } #[derive(Clone)] struct RequestVerifier { repo: MemoryRepo, private_key_store: PrivateKeyStore, session: BreakerSession, client: awc::Client, signature_config: AwcSignatureConfig, } #[derive(Clone)] struct ActivityIngester { repo: MemoryRepo, ingest: ValidateAuthority>>, private_key_store: PrivateKeyStore, session: BreakerSession, client: awc::Client, signature_config: AwcSignatureConfig, config: Config, } #[derive(Debug, thiserror::Error)] enum ServerError { #[error(transparent)] Json(#[from] serde_json::Error), #[error(transparent)] Verify(#[from] VerifyError), #[error(transparent)] Rustcrypto(#[from] RustcryptoError), #[error("Public key owner doesn't match actor ID")] OwnerMismatch, #[error(transparent)] Awc(#[from] AwcError), #[error(transparent)] Key(#[from] KeyError), #[error(transparent)] Host(#[from] HostError), #[error(transparent)] Inbox(#[from] InboxError), #[error(transparent)] Authority(#[from] AuthorityError), } impl From>> for ServerError { fn from(e: PublicKeyError>) -> Self { match e { PublicKeyError::RemoteRepo(awc_error) => awc_error.into(), PublicKeyError::PublicKeyRepo(json_error) => json_error.into(), PublicKeyError::OwnerMismatch => Self::OwnerMismatch, } } } impl MemoryRepo { fn insert(&self, id: D, item: &D::Output) -> Result<(), serde_json::Error> where D: Dereference, D::Output: serde::ser::Serialize, { let value = serde_json::to_value(item)?; self.inner.insert(id.url().clone(), value); Ok(()) } } impl ResponseError for ServerError {} #[async_trait::async_trait(?Send)] impl Repo for MemoryRepo { type Error = serde_json::Error; async fn fetch( &self, id: D, _session: S, ) -> Result, Self::Error> { if let Some(obj_ref) = self.inner.get(id.url()) { serde_json::from_value(obj_ref.clone()).map(Some) } else { Ok(None) } } } #[async_trait::async_trait(?Send)] impl PublicKeyRepo for MemoryRepo { type Error = serde_json::Error; async fn fetch(&self, key_id: &KeyId) -> Result, Self::Error> { if let Some(obj_ref) = self.inner.get(key_id) { return serde_json::from_value(obj_ref.clone()).map(Some); } Ok(None) } async fn store(&self, public_key: &SimplePublicKey) -> Result<(), Self::Error> { let value = serde_json::to_value(public_key)?; self.inner.insert(public_key.id.as_ref().clone(), value); Ok(()) } } #[derive(Clone, Debug)] struct KeyError; impl std::fmt::Display for KeyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Key is missing") } } impl std::error::Error for KeyError {} #[async_trait::async_trait(?Send)] impl PrivateKeyRepo for PrivateKeyStore { type Error = KeyError; async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> { if let Some(tup_ref) = self.inner.get(actor_id) { return Ok(tup_ref.clone()); } Err(KeyError) } async fn store( &self, actor_id: Url, key_id: KeyId, private_key_pem: String, ) -> Result<(), Self::Error> { self.inner.insert(actor_id, (key_id, private_key_pem)); Ok(()) } } impl PrivateKeyRepoFactory for RequestVerifier { type PrivateKeyRepo = PrivateKeyStore; fn build_private_key_repo(&self) -> Self::PrivateKeyRepo { self.private_key_store.clone() } } impl PrivateKeyRepoFactory for ActivityIngester { type PrivateKeyRepo = PrivateKeyStore; fn build_private_key_repo(&self) -> Self::PrivateKeyRepo { self.private_key_store.clone() } } impl RepoFactory for MemoryRepo { type Crypto = (); type Repo = MemoryRepo; fn build_repo(&self, _: Self::Crypto) -> Self::Repo { self.clone() } } impl VerifyFactory for RequestVerifier { type Verify = RsaVerifier; } impl RepoFactory for RequestVerifier { type Crypto = Rustcrypto; type Repo = AwcClient; fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo { AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto) } } impl PublicKeyRepoFactory for RequestVerifier { type PublicKeyRepo = MemoryRepo; fn public_key_repo(&self) -> Self::PublicKeyRepo { self.repo.clone() } } impl SessionFactory for RequestVerifier { type Session = (RequestCountSession, BreakerSession); fn build_session(&self) -> Self::Session { (RequestCountSession::max(25), self.session.clone()) } } impl DigestFactory for RequestVerifier { type Digest = Sha256Digest; } struct InboxActor(Url); impl FromRequest for InboxActor { type Error = actix_web::Error; type Future = Ready>; fn from_request(_: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload) -> Self::Future { ready(Ok(InboxActor( Url::parse("http://localhost:8008/actor").unwrap(), ))) } } impl InboxType for InboxActor { fn is_shared(&self) -> bool { true } } impl AsRef for InboxActor { fn as_ref(&self) -> &Url { &self.0 } } #[async_trait::async_trait(?Send)] impl Ingest for MemoryRepo { type Error = ServerError; type ActorId = InboxActor; async fn ingest( &self, _authority: Authority, _actor_id: Self::ActorId, activity: AcceptedActivity, _repo: R, _remote_repo: &R::Remote, _session: S, ) -> Result<(), R::Error> where R::Error: From, { Ok(self .insert(activity.id().clone(), &activity) .map_err(Self::Error::from)?) } } impl RepoFactory for ActivityIngester { type Crypto = (); type Repo = MemoryRepo; fn build_repo(&self, _: Self::Crypto) -> Self::Repo { self.repo.clone() } } impl SessionFactory for ActivityIngester { type Session = (RequestCountSession, BreakerSession); fn build_session(&self) -> Self::Session { (RequestCountSession::max(25), self.session.clone()) } } impl FullRepo for ActivityIngester { type Error = ServerError; type Local = MemoryRepo; type Crypto = Rustcrypto; type Remote = AwcClient; type Ingest = ValidateAuthority>>; fn local(&self) -> &Self::Local { &self.repo } fn remote(&self, crypto: Self::Crypto) -> Self::Remote { AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto) } fn ingest(&self) -> &Self::Ingest { &self.ingest } fn local_port(&self) -> Option { self.config.local_port } fn local_domain(&self) -> &Host { &self.config.local_host } } #[actix_web::main] async fn main() -> Result<(), Box> { if std::env::var("RUST_LOG").is_ok() { env_logger::builder().init() } else { env_logger::Builder::new() .filter_level(log::LevelFilter::Info) .init(); }; let repo = MemoryRepo::default(); let private_key_store = PrivateKeyStore::default(); let session = BreakerSession::limit(10, Duration::from_secs(60 * 60)); let signature_config = ActixWebSignatureConfig::default(); let config = Config { local_host: Host::Domain("localhost".to_string()), local_port: Some(8008), scheme: "http://".to_string(), server_actor_id: "http://localhost:8008/actor".parse()?, }; let awc_signature_config = AwcSignatureConfig::default(); let private_key = RsaPrivateKey::new(&mut rand::thread_rng(), 1024)?; let private_key_pem = private_key.to_pkcs8_pem()?; let actor_id: Url = "http://localhost:8008/actor".parse()?; let key_id: KeyId = "http://localhost:8008/actor#main-key".parse()?; private_key_store .store(actor_id, key_id, private_key_pem.to_string()) .await?; HttpServer::new(move || { let client = awc::Client::new(); let verifier = RequestVerifier { repo: repo.clone(), private_key_store: private_key_store.clone(), session: session.clone(), client: client.clone(), signature_config: awc_signature_config.clone(), }; let ingester = ActivityIngester { repo: repo.clone(), ingest: validate_authority(validate_inbox(validate_hosts(repo.clone()))), private_key_store: private_key_store.clone(), session: session.clone(), client, signature_config: awc_signature_config.clone(), config: config.clone(), }; App::new() .wrap(Logger::default()) .service( web::scope("/shared_inbox").configure(inbox::<_, _, _, ServerError>( config.clone(), ingester, verifier.clone(), signature_config.clone(), false, )), ) .service(web::scope("/activities").configure(serve_objects::< ObjectId, _, _, ServerError, >( config.clone(), repo.clone(), verifier, signature_config.clone(), false, ))) }) .bind("127.0.0.1:8008")? .run() .await?; Ok(()) }