diff --git a/apub-actix-web/src/lib.rs b/apub-actix-web/src/lib.rs index 6a92b45..b8dbab5 100644 --- a/apub-actix-web/src/lib.rs +++ b/apub-actix-web/src/lib.rs @@ -13,7 +13,7 @@ use actix_web::{ }; use apub_core::{ digest::{Digest, DigestBuilder, DigestFactory}, - ingest::{is_local, Authority, Ingest}, + ingest::{is_local, Authority, Ingest, IngestFactory}, repo::{Dereference, Repo, RepoFactory}, session::SessionFactory, signature::{PrivateKeyBuilder, Verify, VerifyBuilder, VerifyFactory}, @@ -137,17 +137,17 @@ type PrivKeyError = <::Crypto as PrivateKeyBuilder>::Error; /// ``` pub fn inbox( config: Config, - ingest: I, + ingest_factory: I, verifier: V, signature_config: SignatureConfig, require_signature: bool, ) -> impl FnOnce(&mut ServiceConfig) where A: for<'de> serde::de::Deserialize<'de> + 'static, - I: Ingest + PrivateKeyRepoFactory + RepoFactory + SessionFactory + 'static, + I: IngestFactory + PrivateKeyRepoFactory + RepoFactory + SessionFactory + 'static, I::PrivateKeyRepo: PrivateKeyRepo, - I::ActorId: FromRequest + AsRef, - >::Error: From<::Error>, + >::ActorId: FromRequest + AsRef, + >::Error: From<::Error>, V: RepoFactory + SessionFactory + PublicKeyRepoFactory @@ -183,7 +183,7 @@ where service_config.service( web::scope("") - .app_data(web::Data::new(ingest)) + .app_data(web::Data::new(ingest_factory)) .wrap(digest) .wrap(signature) .route("", web::post().to(inbox_handler::)), @@ -273,29 +273,30 @@ where } async fn inbox_handler( - ingest: web::Data, + ingest_factory: web::Data, authority: Option, activity: web::Json, - actor_id: I::ActorId, + actor_id: >::ActorId, ) -> HttpResponse where A: for<'de> serde::de::Deserialize<'de> + 'static, - I: Ingest + PrivateKeyRepoFactory + RepoFactory + SessionFactory, + I: IngestFactory + PrivateKeyRepoFactory + RepoFactory + SessionFactory, I::PrivateKeyRepo: PrivateKeyRepo, - I::ActorId: FromRequest + AsRef, - I::Error: From<::Error>, + >::ActorId: FromRequest + AsRef, + >::Error: From<::Error>, { let url = actor_id.as_ref(); - let private_key_repo = ingest.build_private_key_repo(); + let private_key_repo = ingest_factory.build_private_key_repo(); let private_key = match private_key_repo.fetch(url).await { Ok(private_key) => private_key, Err(_) => return HttpResponse::BadRequest().finish(), }; let activity = activity.into_inner(); - let remote_repo = ingest.build_repo(private_key); - let mut session = ingest.build_session(); + let remote_repo = ingest_factory.build_repo(private_key); + let mut session = ingest_factory.build_session(); + let ingest = ingest_factory.build_ingest(); if let Some(auth) = authority { if let Ok(url) = auth.key_id().parse() { diff --git a/apub-core/src/ingest.rs b/apub-core/src/ingest.rs index 835db49..207ba95 100644 --- a/apub-core/src/ingest.rs +++ b/apub-core/src/ingest.rs @@ -107,6 +107,15 @@ pub trait Ingest { } } +/// Describes a type that can produce an Ingest +pub trait IngestFactory { + /// The Ingest type + type Ingest: Ingest; + + /// Build the ingest type + fn build_ingest(&self) -> Self::Ingest; +} + impl std::fmt::Display for Authority { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/examples/actix-web-example/src/ingest.rs b/examples/actix-web-example/src/ingest.rs new file mode 100644 index 0000000..1aa1fc6 --- /dev/null +++ b/examples/actix-web-example/src/ingest.rs @@ -0,0 +1,130 @@ +use crate::{MemoryRepo, PrivateKeyStore, ServerError}; +use actix_web::FromRequest; +use apub::{ + activitypub::{keys::PrivateKeyRepoFactory, simple::SimpleActor}, + clients::{awc::SignatureConfig as AwcSignatureConfig, AwcClient}, + cryptography::Rustcrypto, + ingest::{ + validate_authority, validate_hosts, validate_inbox, + validators::{InboxType, ValidateAuthority, ValidateHosts, ValidateInbox}, + Authority, IngestFactory, + }, + servers::actix_web::Config, + session::{BreakerSession, RequestCountSession, SessionFactory}, + Dereference, Ingest, Repo, RepoFactory, Session, +}; +use example_types::AcceptedActivity; +use std::future::{ready, Ready}; +use url::Url; + +#[derive(Clone)] +pub(super) struct ActivityIngester { + pub(super) repo: MemoryRepo, + pub(super) private_key_store: PrivateKeyStore, + pub(super) session: BreakerSession, + pub(super) client: awc::Client, + pub(super) signature_config: AwcSignatureConfig, + pub(super) config: Config, +} + +pub(super) struct InboxActor(pub(super) Url); + +pub(super) struct ActorId(pub(super) Url); + +impl PrivateKeyRepoFactory for ActivityIngester { + type PrivateKeyRepo = PrivateKeyStore; + + fn build_private_key_repo(&self) -> Self::PrivateKeyRepo { + self.private_key_store.clone() + } +} + +impl IngestFactory for ActivityIngester { + type Ingest = ValidateHosts>>; + + fn build_ingest(&self) -> Self::Ingest { + validate_hosts(validate_inbox(validate_authority(self.clone()))) + } +} + +#[async_trait::async_trait(?Send)] +impl Ingest for ActivityIngester { + type Local = MemoryRepo; + type Error = ServerError; + type ActorId = InboxActor; + + fn local_repo(&self) -> &Self::Local { + &self.repo + } + + fn is_local(&self, url: &Url) -> bool { + self.config.is_local(url) + } + + async fn ingest( + &self, + _authority: Authority, + _actor_id: Self::ActorId, + activity: &AcceptedActivity, + _remote_repo: R, + _session: S, + ) -> Result<(), Self::Error> + where + Self::Error: From, + { + Ok(self.repo.insert(activity.id().clone(), activity)?) + } +} + +impl RepoFactory for ActivityIngester { + type Crypto = Rustcrypto; + type Repo = AwcClient; + + fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo { + AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto) + } +} + +impl SessionFactory for ActivityIngester { + type Session = (RequestCountSession, BreakerSession); + + fn build_session(&self) -> Self::Session { + (RequestCountSession::max(25), self.session.clone()) + } +} + +impl FromRequest for InboxActor { + type Error = actix_web::Error; + type Future = Ready>; + + fn from_request(_: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload) -> Self::Future { + ready(Ok(InboxActor( + Url::parse("http://localhost:8008/actors/localhost").unwrap(), + ))) + } +} + +impl InboxType for InboxActor { + fn is_shared(&self) -> bool { + true + } +} + +impl AsRef for InboxActor { + fn as_ref(&self) -> &Url { + &self.0 + } +} + +impl Dereference for ActorId { + type Output = SimpleActor; + fn url(&self) -> &Url { + &self.0 + } +} + +impl From for ActorId { + fn from(url: Url) -> Self { + ActorId(url) + } +} diff --git a/examples/actix-web-example/src/main.rs b/examples/actix-web-example/src/main.rs index 302157c..14d70de 100644 --- a/examples/actix-web-example/src/main.rs +++ b/examples/actix-web-example/src/main.rs @@ -1,4 +1,4 @@ -use actix_web::{middleware::Logger, web, App, FromRequest, HttpServer, ResponseError}; +use actix_web::{middleware::Logger, web, App, HttpServer, ResponseError}; use apub::{ activitypub::{ keys::{ @@ -16,27 +16,27 @@ use apub::{ signature::PrivateKeyBuilder, DigestFactory, PrivateKey, Rustcrypto, VerifyFactory, }, - ingest::Authority, + ingest::validators::{AuthorityError, HostError, InboxError}, servers::actix_web::{ inbox, serve_objects, Config, SignatureConfig as ActixWebSignatureConfig, VerifyError, }, session::{BreakerSession, RequestCountSession, SessionFactory}, - Dereference, Ingest, Repo, RepoFactory, Session, + Dereference, Repo, RepoFactory, Session, }; use dashmap::DashMap; -use example_types::{AcceptedActivity, ActivityType, ObjectId}; +use example_types::{ActivityType, ObjectId}; use rsa::{ pkcs8::{ToPrivateKey, ToPublicKey}, RsaPrivateKey, }; use serde_json::Map; -use std::{ - future::{ready, Ready}, - sync::Arc, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use url::{Host, Url}; +mod ingest; + +use ingest::{ActivityIngester, ActorId}; + #[derive(Clone, Default)] struct MemoryRepo { inner: Arc>, @@ -56,16 +56,6 @@ struct RequestVerifier { signature_config: AwcSignatureConfig, } -#[derive(Clone)] -struct ActivityIngester { - repo: MemoryRepo, - private_key_store: PrivateKeyStore, - session: BreakerSession, - client: awc::Client, - signature_config: AwcSignatureConfig, - config: Config, -} - #[derive(Debug, thiserror::Error)] enum ServerError { #[error(transparent)] @@ -85,6 +75,15 @@ enum ServerError { #[error(transparent)] Key(#[from] KeyError), + + #[error(transparent)] + Authority(#[from] AuthorityError), + + #[error(transparent)] + Inbox(#[from] InboxError), + + #[error(transparent)] + Hosts(#[from] HostError), } impl From>> for ServerError { @@ -203,14 +202,6 @@ impl PrivateKeyRepoFactory for RequestVerifier { } } -impl PrivateKeyRepoFactory for ActivityIngester { - type PrivateKeyRepo = PrivateKeyStore; - - fn build_private_key_repo(&self) -> Self::PrivateKeyRepo { - self.private_key_store.clone() - } -} - impl RepoFactory for MemoryRepo { type Crypto = (); type Repo = MemoryRepo; @@ -253,84 +244,6 @@ impl DigestFactory for RequestVerifier { type Digest = Sha256Digest; } -struct InboxActor(Url); - -impl FromRequest for InboxActor { - type Error = actix_web::Error; - type Future = Ready>; - - fn from_request(_: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload) -> Self::Future { - ready(Ok(InboxActor( - Url::parse("http://localhost:8008/actors/localhost").unwrap(), - ))) - } -} - -impl AsRef for InboxActor { - fn as_ref(&self) -> &Url { - &self.0 - } -} - -#[async_trait::async_trait(?Send)] -impl Ingest for ActivityIngester { - type Local = MemoryRepo; - type Error = ServerError; - type ActorId = InboxActor; - - fn local_repo(&self) -> &Self::Local { - &self.repo - } - - fn is_local(&self, url: &Url) -> bool { - self.config.is_local(url) - } - - async fn ingest( - &self, - _authority: Authority, - _actor_id: Self::ActorId, - activity: &AcceptedActivity, - _remote_repo: R, - _session: S, - ) -> Result<(), Self::Error> - where - Self::Error: From, - { - Ok(self.repo.insert(activity.id().clone(), activity)?) - } -} - -impl RepoFactory for ActivityIngester { - type Crypto = Rustcrypto; - type Repo = AwcClient; - - fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo { - AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto) - } -} - -impl SessionFactory for ActivityIngester { - type Session = (RequestCountSession, BreakerSession); - - fn build_session(&self) -> Self::Session { - (RequestCountSession::max(25), self.session.clone()) - } -} - -pub struct ActorId(Url); -impl Dereference for ActorId { - type Output = SimpleActor; - fn url(&self) -> &Url { - &self.0 - } -} -impl From for ActorId { - fn from(url: Url) -> Self { - ActorId(url) - } -} - #[actix_web::main] async fn main() -> Result<(), Box> { if std::env::var("RUST_LOG").is_ok() { diff --git a/src/lib.rs b/src/lib.rs index 21dac7b..a517e80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,7 +132,7 @@ pub mod clients { pub mod ingest { //! Utilities for ingesting activities - pub use apub_core::ingest::Authority; + pub use apub_core::ingest::{Authority, IngestFactory}; #[cfg(feature = "apub-ingest")] pub use apub_ingest::{validate_authority, validate_hosts, validate_inbox};