Add IngestFactory for more dynamic ingest structures
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Aode (lion) 2021-12-05 15:54:12 -06:00
parent 9abb291475
commit a1ad76b485
5 changed files with 173 additions and 120 deletions

View file

@ -13,7 +13,7 @@ use actix_web::{
};
use apub_core::{
digest::{Digest, DigestBuilder, DigestFactory},
ingest::{is_local, Authority, Ingest},
ingest::{is_local, Authority, Ingest, IngestFactory},
repo::{Dereference, Repo, RepoFactory},
session::SessionFactory,
signature::{PrivateKeyBuilder, Verify, VerifyBuilder, VerifyFactory},
@ -137,17 +137,17 @@ type PrivKeyError<R> = <<R as RepoFactory>::Crypto as PrivateKeyBuilder>::Error;
/// ```
pub fn inbox<A, I, V, E>(
config: Config,
ingest: I,
ingest_factory: I,
verifier: V,
signature_config: SignatureConfig,
require_signature: bool,
) -> impl FnOnce(&mut ServiceConfig)
where
A: for<'de> serde::de::Deserialize<'de> + 'static,
I: Ingest<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory + 'static,
I: IngestFactory<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory + 'static,
I::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = I::Crypto>,
I::ActorId: FromRequest + AsRef<Url>,
<I as Ingest<A>>::Error: From<<I::Repo as Repo>::Error>,
<I::Ingest as Ingest<A>>::ActorId: FromRequest + AsRef<Url>,
<I::Ingest as Ingest<A>>::Error: From<<I::Repo as Repo>::Error>,
V: RepoFactory
+ SessionFactory
+ PublicKeyRepoFactory
@ -183,7 +183,7 @@ where
service_config.service(
web::scope("")
.app_data(web::Data::new(ingest))
.app_data(web::Data::new(ingest_factory))
.wrap(digest)
.wrap(signature)
.route("", web::post().to(inbox_handler::<A, I>)),
@ -273,29 +273,30 @@ where
}
async fn inbox_handler<A, I>(
ingest: web::Data<I>,
ingest_factory: web::Data<I>,
authority: Option<SignatureVerified>,
activity: web::Json<A>,
actor_id: I::ActorId,
actor_id: <I::Ingest as Ingest<A>>::ActorId,
) -> HttpResponse
where
A: for<'de> serde::de::Deserialize<'de> + 'static,
I: Ingest<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory,
I: IngestFactory<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory,
I::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = I::Crypto>,
I::ActorId: FromRequest + AsRef<Url>,
I::Error: From<<I::Repo as Repo>::Error>,
<I::Ingest as Ingest<A>>::ActorId: FromRequest + AsRef<Url>,
<I::Ingest as Ingest<A>>::Error: From<<I::Repo as Repo>::Error>,
{
let url = actor_id.as_ref();
let private_key_repo = ingest.build_private_key_repo();
let private_key_repo = ingest_factory.build_private_key_repo();
let private_key = match private_key_repo.fetch(url).await {
Ok(private_key) => private_key,
Err(_) => return HttpResponse::BadRequest().finish(),
};
let activity = activity.into_inner();
let remote_repo = ingest.build_repo(private_key);
let mut session = ingest.build_session();
let remote_repo = ingest_factory.build_repo(private_key);
let mut session = ingest_factory.build_session();
let ingest = ingest_factory.build_ingest();
if let Some(auth) = authority {
if let Ok(url) = auth.key_id().parse() {

View file

@ -107,6 +107,15 @@ pub trait Ingest<Object> {
}
}
/// Describes a type that can produce an Ingest
pub trait IngestFactory<A> {
/// The Ingest type
type Ingest: Ingest<A>;
/// Build the ingest type
fn build_ingest(&self) -> Self::Ingest;
}
impl std::fmt::Display for Authority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {

View file

@ -0,0 +1,130 @@
use crate::{MemoryRepo, PrivateKeyStore, ServerError};
use actix_web::FromRequest;
use apub::{
activitypub::{keys::PrivateKeyRepoFactory, simple::SimpleActor},
clients::{awc::SignatureConfig as AwcSignatureConfig, AwcClient},
cryptography::Rustcrypto,
ingest::{
validate_authority, validate_hosts, validate_inbox,
validators::{InboxType, ValidateAuthority, ValidateHosts, ValidateInbox},
Authority, IngestFactory,
},
servers::actix_web::Config,
session::{BreakerSession, RequestCountSession, SessionFactory},
Dereference, Ingest, Repo, RepoFactory, Session,
};
use example_types::AcceptedActivity;
use std::future::{ready, Ready};
use url::Url;
#[derive(Clone)]
pub(super) struct ActivityIngester {
pub(super) repo: MemoryRepo,
pub(super) private_key_store: PrivateKeyStore,
pub(super) session: BreakerSession,
pub(super) client: awc::Client,
pub(super) signature_config: AwcSignatureConfig,
pub(super) config: Config,
}
pub(super) struct InboxActor(pub(super) Url);
pub(super) struct ActorId(pub(super) Url);
impl PrivateKeyRepoFactory for ActivityIngester {
type PrivateKeyRepo = PrivateKeyStore;
fn build_private_key_repo(&self) -> Self::PrivateKeyRepo {
self.private_key_store.clone()
}
}
impl IngestFactory<AcceptedActivity> for ActivityIngester {
type Ingest = ValidateHosts<ValidateInbox<ValidateAuthority<ActivityIngester>>>;
fn build_ingest(&self) -> Self::Ingest {
validate_hosts(validate_inbox(validate_authority(self.clone())))
}
}
#[async_trait::async_trait(?Send)]
impl Ingest<AcceptedActivity> for ActivityIngester {
type Local = MemoryRepo;
type Error = ServerError;
type ActorId = InboxActor;
fn local_repo(&self) -> &Self::Local {
&self.repo
}
fn is_local(&self, url: &Url) -> bool {
self.config.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
_authority: Authority,
_actor_id: Self::ActorId,
activity: &AcceptedActivity,
_remote_repo: R,
_session: S,
) -> Result<(), Self::Error>
where
Self::Error: From<R::Error>,
{
Ok(self.repo.insert(activity.id().clone(), activity)?)
}
}
impl RepoFactory for ActivityIngester {
type Crypto = Rustcrypto;
type Repo = AwcClient<Rustcrypto>;
fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo {
AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto)
}
}
impl SessionFactory for ActivityIngester {
type Session = (RequestCountSession, BreakerSession);
fn build_session(&self) -> Self::Session {
(RequestCountSession::max(25), self.session.clone())
}
}
impl FromRequest for InboxActor {
type Error = actix_web::Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(_: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload) -> Self::Future {
ready(Ok(InboxActor(
Url::parse("http://localhost:8008/actors/localhost").unwrap(),
)))
}
}
impl InboxType for InboxActor {
fn is_shared(&self) -> bool {
true
}
}
impl AsRef<Url> for InboxActor {
fn as_ref(&self) -> &Url {
&self.0
}
}
impl Dereference for ActorId {
type Output = SimpleActor;
fn url(&self) -> &Url {
&self.0
}
}
impl From<Url> for ActorId {
fn from(url: Url) -> Self {
ActorId(url)
}
}

View file

@ -1,4 +1,4 @@
use actix_web::{middleware::Logger, web, App, FromRequest, HttpServer, ResponseError};
use actix_web::{middleware::Logger, web, App, HttpServer, ResponseError};
use apub::{
activitypub::{
keys::{
@ -16,27 +16,27 @@ use apub::{
signature::PrivateKeyBuilder,
DigestFactory, PrivateKey, Rustcrypto, VerifyFactory,
},
ingest::Authority,
ingest::validators::{AuthorityError, HostError, InboxError},
servers::actix_web::{
inbox, serve_objects, Config, SignatureConfig as ActixWebSignatureConfig, VerifyError,
},
session::{BreakerSession, RequestCountSession, SessionFactory},
Dereference, Ingest, Repo, RepoFactory, Session,
Dereference, Repo, RepoFactory, Session,
};
use dashmap::DashMap;
use example_types::{AcceptedActivity, ActivityType, ObjectId};
use example_types::{ActivityType, ObjectId};
use rsa::{
pkcs8::{ToPrivateKey, ToPublicKey},
RsaPrivateKey,
};
use serde_json::Map;
use std::{
future::{ready, Ready},
sync::Arc,
time::Duration,
};
use std::{sync::Arc, time::Duration};
use url::{Host, Url};
mod ingest;
use ingest::{ActivityIngester, ActorId};
#[derive(Clone, Default)]
struct MemoryRepo {
inner: Arc<DashMap<Url, serde_json::Value>>,
@ -56,16 +56,6 @@ struct RequestVerifier {
signature_config: AwcSignatureConfig,
}
#[derive(Clone)]
struct ActivityIngester {
repo: MemoryRepo,
private_key_store: PrivateKeyStore,
session: BreakerSession,
client: awc::Client,
signature_config: AwcSignatureConfig,
config: Config,
}
#[derive(Debug, thiserror::Error)]
enum ServerError {
#[error(transparent)]
@ -85,6 +75,15 @@ enum ServerError {
#[error(transparent)]
Key(#[from] KeyError),
#[error(transparent)]
Authority(#[from] AuthorityError),
#[error(transparent)]
Inbox(#[from] InboxError),
#[error(transparent)]
Hosts(#[from] HostError),
}
impl From<PublicKeyError<serde_json::Error, AwcError<RustcryptoError>>> for ServerError {
@ -203,14 +202,6 @@ impl PrivateKeyRepoFactory for RequestVerifier {
}
}
impl PrivateKeyRepoFactory for ActivityIngester {
type PrivateKeyRepo = PrivateKeyStore;
fn build_private_key_repo(&self) -> Self::PrivateKeyRepo {
self.private_key_store.clone()
}
}
impl RepoFactory for MemoryRepo {
type Crypto = ();
type Repo = MemoryRepo;
@ -253,84 +244,6 @@ impl DigestFactory for RequestVerifier {
type Digest = Sha256Digest;
}
struct InboxActor(Url);
impl FromRequest for InboxActor {
type Error = actix_web::Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(_: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload) -> Self::Future {
ready(Ok(InboxActor(
Url::parse("http://localhost:8008/actors/localhost").unwrap(),
)))
}
}
impl AsRef<Url> for InboxActor {
fn as_ref(&self) -> &Url {
&self.0
}
}
#[async_trait::async_trait(?Send)]
impl Ingest<AcceptedActivity> for ActivityIngester {
type Local = MemoryRepo;
type Error = ServerError;
type ActorId = InboxActor;
fn local_repo(&self) -> &Self::Local {
&self.repo
}
fn is_local(&self, url: &Url) -> bool {
self.config.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
_authority: Authority,
_actor_id: Self::ActorId,
activity: &AcceptedActivity,
_remote_repo: R,
_session: S,
) -> Result<(), Self::Error>
where
Self::Error: From<R::Error>,
{
Ok(self.repo.insert(activity.id().clone(), activity)?)
}
}
impl RepoFactory for ActivityIngester {
type Crypto = Rustcrypto;
type Repo = AwcClient<Rustcrypto>;
fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo {
AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto)
}
}
impl SessionFactory for ActivityIngester {
type Session = (RequestCountSession, BreakerSession);
fn build_session(&self) -> Self::Session {
(RequestCountSession::max(25), self.session.clone())
}
}
pub struct ActorId(Url);
impl Dereference for ActorId {
type Output = SimpleActor;
fn url(&self) -> &Url {
&self.0
}
}
impl From<Url> for ActorId {
fn from(url: Url) -> Self {
ActorId(url)
}
}
#[actix_web::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var("RUST_LOG").is_ok() {

View file

@ -132,7 +132,7 @@ pub mod clients {
pub mod ingest {
//! Utilities for ingesting activities
pub use apub_core::ingest::Authority;
pub use apub_core::ingest::{Authority, IngestFactory};
#[cfg(feature = "apub-ingest")]
pub use apub_ingest::{validate_authority, validate_hosts, validate_inbox};