Rework Ingest

This commit is contained in:
Aode (lion) 2021-12-05 15:15:21 -06:00
parent 1cefb00f6f
commit caa4da19b9
11 changed files with 350 additions and 586 deletions

View file

@ -13,7 +13,7 @@ use actix_web::{
};
use apub_core::{
digest::{Digest, DigestBuilder, DigestFactory},
ingest::{Authority, FullRepo, Ingest},
ingest::{is_local, Authority, Ingest},
repo::{Dereference, Repo, RepoFactory},
session::SessionFactory,
signature::{PrivateKeyBuilder, Verify, VerifyBuilder, VerifyFactory},
@ -60,6 +60,11 @@ impl Config {
format!("{}{}{}", self.scheme, self.local_host, uri)
}
}
/// Determine if a given URL is local
pub fn is_local(&self, url: &Url) -> bool {
is_local(&self.local_host, self.local_port, url)
}
}
/// The HTTP Signature config for Actix Web
@ -106,8 +111,8 @@ type PrivKeyError<R> = <<R as RepoFactory>::Crypto as PrivateKeyBuilder>::Error;
/// use apub_actix_web::{inbox, SignatureConfig};
/// use url::Host;
///
/// // FullRepo implements FullRepo
/// let full_repo = FullRepo::new();
/// // Ingest implements Ingest
/// let ingest = Ingest::new();
///
/// // verifier implements Repo, VerifyFactory, and DigestFactory
/// let verifier = DatabaseVerifier::new();
@ -123,26 +128,26 @@ type PrivKeyError<R> = <<R as RepoFactory>::Crypto as PrivateKeyBuilder>::Error;
/// web::scope("/shared_inbox")
/// .configure(inbox::<A, (), _, _, MyError>(
/// config,
/// full_repo,
/// ingest,
/// verifier,
/// SignatureConfig::default(),
/// true,
/// ))
/// );
/// ```
pub fn inbox<A, R, V, E>(
pub fn inbox<A, I, V, E>(
config: Config,
full_repo: R,
ingest: I,
verifier: V,
signature_config: SignatureConfig,
require_signature: bool,
) -> impl FnOnce(&mut ServiceConfig)
where
A: for<'de> serde::de::Deserialize<'de> + 'static,
R: FullRepo + SessionFactory + PrivateKeyRepoFactory + 'static,
R::Ingest: Ingest<A>,
<R::Ingest as Ingest<A>>::ActorId: FromRequest + AsRef<Url>,
R::Error: From<<R::Ingest as Ingest<A>>::Error>,
I: Ingest<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory + 'static,
I::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = I::Crypto>,
I::ActorId: FromRequest + AsRef<Url>,
<I as Ingest<A>>::Error: From<<I::Repo as Repo>::Error>,
V: RepoFactory
+ SessionFactory
+ PublicKeyRepoFactory
@ -151,6 +156,7 @@ where
+ DigestFactory
+ Clone
+ 'static,
V::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = V::Crypto>,
V::Crypto: PrivateKeyBuilder,
<V as DigestFactory>::Digest: Clone,
E: ResponseError
@ -177,10 +183,10 @@ where
service_config.service(
web::scope("")
.app_data(web::Data::new(full_repo))
.app_data(web::Data::new(ingest))
.wrap(digest)
.wrap(signature)
.route("", web::post().to(inbox_handler::<A, R>)),
.route("", web::post().to(inbox_handler::<A, I>)),
);
}
}
@ -234,6 +240,7 @@ where
+ VerifyFactory
+ Clone
+ 'static,
V::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = V::Crypto>,
V::Crypto: PrivateKeyBuilder,
E: ResponseError
+ From<VerifyError>
@ -265,43 +272,38 @@ where
}
}
async fn inbox_handler<A, R>(
full_repo: web::Data<R>,
async fn inbox_handler<A, I>(
ingest: web::Data<I>,
authority: Option<SignatureVerified>,
activity: web::Json<A>,
actor_id: <R::Ingest as Ingest<A>>::ActorId,
actor_id: I::ActorId,
) -> HttpResponse
where
A: for<'de> serde::de::Deserialize<'de> + 'static,
R: FullRepo + SessionFactory + PrivateKeyRepoFactory,
R::Ingest: Ingest<A>,
<R::Ingest as Ingest<A>>::ActorId: FromRequest + AsRef<Url>,
R::Error: From<<R::Ingest as Ingest<A>>::Error>,
I: Ingest<A> + PrivateKeyRepoFactory + RepoFactory + SessionFactory,
I::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = I::Crypto>,
I::ActorId: FromRequest + AsRef<Url>,
I::Error: From<<I::Repo as Repo>::Error>,
{
let url = actor_id.as_ref();
let private_key_repo = full_repo.build_private_key_repo();
let (key_id, private_key_pem) = match private_key_repo.fetch(url).await {
Ok(tup) => tup,
Err(_) => return HttpResponse::BadRequest().finish(),
};
let private_key = match R::Crypto::build(key_id.to_string(), &private_key_pem) {
let private_key_repo = ingest.build_private_key_repo();
let private_key = match private_key_repo.fetch(url).await {
Ok(private_key) => private_key,
Err(_) => return HttpResponse::BadRequest().finish(),
};
let activity = activity.into_inner();
let remote_repo = full_repo.remote(private_key);
let mut session = full_repo.build_session();
let remote_repo = ingest.build_repo(private_key);
let mut session = ingest.build_session();
if let Some(auth) = authority {
if let Ok(url) = auth.key_id().parse() {
if full_repo
.ingest_activity(
if ingest
.ingest(
Authority::Actor(url),
actor_id,
activity,
&activity,
&remote_repo,
&mut session,
)
@ -311,11 +313,11 @@ where
return HttpResponse::Accepted().finish();
}
}
} else if full_repo
.ingest_activity(
} else if ingest
.ingest(
Authority::None,
actor_id,
activity,
&activity,
&remote_repo,
&mut session,
)
@ -423,6 +425,7 @@ async fn verify<V, E>(
) -> Result<bool, E>
where
V: RepoFactory + SessionFactory + VerifyFactory + PublicKeyRepoFactory + PrivateKeyRepoFactory,
V::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = V::Crypto>,
V::Crypto: PrivateKeyBuilder,
<<V as VerifyFactory>::Verify as Verify>::Error: 'static,
E: ResponseError
@ -442,9 +445,7 @@ where
let private_key_repo = verifier.build_private_key_repo();
let (server_key_id, private_key_pem) = private_key_repo.fetch(&config.server_actor_id).await?;
let crypto = V::Crypto::build(server_key_id.to_string(), &private_key_pem)?;
let crypto = private_key_repo.fetch(&config.server_actor_id).await?;
let session = verifier.build_session();
let http_repo = verifier.build_repo(crypto);
@ -485,6 +486,7 @@ where
+ VerifyFactory
+ Clone
+ 'static,
V::PrivateKeyRepo: PrivateKeyRepo<PrivateKey = V::Crypto>,
V::Crypto: PrivateKeyBuilder,
E: ResponseError
+ From<VerifyError>

View file

@ -2,8 +2,8 @@
use crate::{
activitypub::{Activity, Actor},
ingest::{FullRepo, Ingest},
repo::Dereference,
ingest::Ingest,
repo::{Dereference, Repo},
session::Session,
};
use std::{rc::Rc, sync::Arc};
@ -28,23 +28,21 @@ pub trait ActivityExt: Activity {
fn object(&self) -> Option<Out<Self::ObjectId>>;
/// Pull the actor off the activity, or fetch it from a repo if it isn't present
async fn dereference_actor<R: FullRepo, S: Session>(
async fn dereference_actor<I: Ingest<Out<Self::ActorId>>, R: Repo, S: Session>(
&self,
actor_id: <R::Ingest as Ingest<Out<Self::ActorId>>>::ActorId,
repo: R,
remote: &R::Remote,
actor_id: I::ActorId,
ingest: I,
remote_repo: R,
session: S,
) -> Result<Option<Out<Self::ActorId>>, R::Error>
) -> Result<Option<Out<Self::ActorId>>, I::Error>
where
R::Ingest: Ingest<Out<Self::ActorId>>,
<R::Ingest as Ingest<Out<Self::ActorId>>>::ActorId: From<Url>,
R::Error: From<<R::Ingest as Ingest<Out<Self::ActorId>>>::Error>,
Out<Self::ActorId>: Clone,
I::Error: From<R::Error>,
I::ActorId: From<Url> + 'static,
{
dereference::<Self::ActorId, _, _>(
dereference::<Self::ActorId, I, R, S>(
actor_id,
repo,
remote,
ingest,
remote_repo,
session,
self.actor(),
self.actor_id(),
@ -53,23 +51,21 @@ pub trait ActivityExt: Activity {
}
/// Pull the object off the activity, or fetch it from a repo if it isn't present
async fn dereference_object<R: FullRepo, S: Session>(
async fn dereference_object<I: Ingest<Out<Self::ObjectId>>, R: Repo, S: Session>(
&self,
actor_id: <R::Ingest as Ingest<<Self::ObjectId as Dereference>::Output>>::ActorId,
repo: R,
remote: &R::Remote,
actor_id: I::ActorId,
ingest: I,
remote_repo: R,
session: S,
) -> Result<Option<Out<Self::ObjectId>>, R::Error>
) -> Result<Option<Out<Self::ObjectId>>, I::Error>
where
R::Ingest: Ingest<Out<Self::ObjectId>>,
<R::Ingest as Ingest<Out<Self::ObjectId>>>::ActorId: From<Url>,
R::Error: From<<R::Ingest as Ingest<Out<Self::ObjectId>>>::Error>,
Out<Self::ObjectId>: Clone,
I::Error: From<R::Error>,
I::ActorId: From<Url> + 'static,
{
dereference::<Self::ObjectId, _, _>(
dereference::<Self::ObjectId, I, R, S>(
actor_id,
repo,
remote,
ingest,
remote_repo,
session,
self.object(),
self.object_id(),
@ -88,23 +84,21 @@ pub trait ActorExt: Actor {
fn public_key(&self) -> Option<Out<Self::PublicKeyId>>;
/// Pull the public key off the acctor, or fetch it from a repo if it isn't present
async fn dereference_public_key<R: FullRepo, S: Session>(
async fn dereference_public_key<I: Ingest<Out<Self::PublicKeyId>>, R: Repo, S: Session>(
&self,
actor_id: <R::Ingest as Ingest<Out<Self::PublicKeyId>>>::ActorId,
repo: R,
remote: &R::Remote,
actor_id: I::ActorId,
ingest: I,
remote_repo: R,
session: S,
) -> Result<Option<Out<Self::PublicKeyId>>, R::Error>
) -> Result<Option<Out<Self::PublicKeyId>>, I::Error>
where
R::Ingest: Ingest<Out<Self::PublicKeyId>>,
<R::Ingest as Ingest<Out<Self::PublicKeyId>>>::ActorId: From<Url>,
R::Error: From<<R::Ingest as Ingest<Out<Self::PublicKeyId>>>::Error>,
Out<Self::PublicKeyId>: Clone,
I::Error: From<R::Error>,
I::ActorId: From<Url> + 'static,
{
dereference::<Self::PublicKeyId, _, _>(
dereference::<Self::PublicKeyId, I, R, S>(
actor_id,
repo,
remote,
ingest,
remote_repo,
session,
self.public_key(),
self.public_key_id(),
@ -113,26 +107,25 @@ pub trait ActorExt: Actor {
}
}
/// implementation to dereference an object through FullRepo if needed
pub async fn dereference<D: Dereference + From<Url>, R: FullRepo, S: Session>(
actor_id: <R::Ingest as Ingest<Out<D>>>::ActorId,
repo: R,
remote: &R::Remote,
/// implementation to dereference an object through Ingest if needed
pub async fn dereference<D: Dereference + From<Url>, I: Ingest<Out<D>>, R: Repo, S: Session>(
actor_id: I::ActorId,
ingest: I,
remote_repo: R,
session: S,
output: Option<D::Output>,
id: &Url,
) -> Result<Option<D::Output>, R::Error>
) -> Result<Option<D::Output>, I::Error>
where
R::Ingest: Ingest<Out<D>>,
<R::Ingest as Ingest<Out<D>>>::ActorId: From<Url>,
R::Error: From<<R::Ingest as Ingest<Out<D>>>::Error>,
D::Output: Clone,
I::ActorId: From<Url> + 'static,
I::Error: From<R::Error>,
{
if let Some(item) = output {
return Ok(Some(item));
}
repo.fetch(D::from(id.clone()), session, actor_id, remote)
ingest
.fetch(D::from(id.clone()), actor_id, remote_repo, session)
.await
}

View file

@ -1,10 +1,8 @@
//! traits around accepting activities
use crate::{
digest::DigestFactory,
repo::{Dereference, Repo},
session::Session,
signature::{PrivateKey, PrivateKeyBuilder},
};
use std::{rc::Rc, sync::Arc};
use url::{Host, Url};
@ -35,9 +33,6 @@ pub enum Authority {
/// This type is implemented by users of `apub` to hook into provied inbox methods
#[async_trait::async_trait(?Send)]
pub trait Ingest<Object> {
/// The error produced when ingesting activities
type Error;
/// The actor that is receiving the activity
///
/// e.g.
@ -45,111 +40,67 @@ pub trait Ingest<Object> {
/// - the server actor (in the case of the shared inbox)
type ActorId;
/// The local repository
type Local: Repo;
/// The error produced when ingesting activities
type Error: From<<Self::Local as Repo>::Error>;
/// Get the local repository
fn local_repo(&self) -> &Self::Local;
/// Determine if a given URL is local
fn is_local(&self, url: &Url) -> bool;
/// Accept and process a given activity
async fn ingest<R: FullRepo, S: Session>(
///
/// Args:
/// - authority: the source of the information, either the Actor that provided the object, or the URL it was fetched from
/// - actor_id: the ID of the actor accepting the object.
/// - activity: the Object being ingested
/// - remote_repo: a handle to a remote repository (probably an HTTP client) for ingesting further objects
/// - session: the request session associated with the Remote Repo
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
session: S,
) -> Result<(), R::Error>
where
R::Error: From<Self::Error>;
}
/// A trait representing a full local + remote + ingest combination repository
#[async_trait::async_trait(?Send)]
pub trait FullRepo {
/// Errors produced by the Full Repository
type Error: From<<Self::Local as Repo>::Error> + From<<Self::Remote as Repo>::Error>;
/// The local object repository
type Local: Repo;
/// The cryptography implementation for the remote repo
type Crypto: DigestFactory + PrivateKey + PrivateKeyBuilder;
/// The remote object repository
type Remote: Repo;
/// The ingester
type Ingest;
/// Domain or IP of the local server
fn local_domain(&self) -> &Host<String>;
/// Port of the local server
fn local_port(&self) -> Option<u16>;
/// get the local repo
fn local(&self) -> &Self::Local;
/// get the remote repo
fn remote(&self, crypto: Self::Crypto) -> Self::Remote;
/// get the ingester
fn ingest(&self) -> &Self::Ingest;
/// Determine if a given URL is local
fn is_local(&self, url: &Url) -> bool {
Some(borrow_host(self.local_domain())) == url.host() && self.local_port() == url.port()
}
/// Ingest an activity
async fn ingest_activity<Object, S: Session>(
&self,
authority: Authority,
actor_id: <Self::Ingest as Ingest<Object>>::ActorId,
activity: Object,
remote_repo: &Self::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), Self::Error>
where
Self: Sized,
Self::Ingest: Ingest<Object>,
Self::Error: From<<Self::Ingest as Ingest<Object>>::Error>,
{
self.ingest()
.ingest(authority, actor_id, activity, self, remote_repo, session)
.await
}
Self::Error: From<Remote::Error>;
/// Fetch the ID, ingesting it if it isn't local
async fn fetch<D: Dereference, S: Session>(
/// Dereference an ID from the local or remote repo
///
/// Args:
/// - id: the ID of the object to be fetched
/// - actor_id: the ID of the actor fetching the object.
/// - remote_repo: a handle to a remote repository (probably an HTTP client) for ingesting further objects
/// - session: the request session associated with the Remote Repo
async fn fetch<D: Dereference<Output = Object>, Remote: Repo, S: Session>(
&self,
id: D,
actor_id: Self::ActorId,
remote_repo: Remote,
mut session: S,
actor_id: <Self::Ingest as Ingest<D::Output>>::ActorId,
remote_repo: &Self::Remote,
) -> Result<Option<D::Output>, Self::Error>
where
Self: Sized,
D::Output: Clone,
Self::Ingest: Ingest<D::Output>,
<Self::Ingest as Ingest<D::Output>>::ActorId: From<Url>,
Self::Error: From<<Self::Ingest as Ingest<D::Output>>::Error>,
Self::ActorId: 'static,
Self::Error: From<Remote::Error>,
{
let url = id.url().clone();
let opt = self.local_repo().fetch(&id, &mut session).await?;
let opt = self.local().fetch(&id, &mut session).await?;
if self.is_local(&url) {
if self.is_local(id.url()) {
return Ok(opt);
}
let opt = remote_repo.fetch(id, &mut session).await?;
let opt = remote_repo.fetch(&id, &mut session).await?;
if let Some(value) = opt.clone() {
self.ingest_activity(
Authority::Server(url),
actor_id,
value,
remote_repo,
session,
)
.await?;
if let Some(object) = opt.as_ref() {
let authority = Authority::Server(id.url().clone());
self.ingest(authority, actor_id, object, remote_repo, session)
.await?;
}
Ok(opt)
@ -166,6 +117,11 @@ impl std::fmt::Display for Authority {
}
}
/// A helper for determining if a given URL matches a Host and Port
pub fn is_local(local_host: &Host<String>, local_port: Option<u16>, url: &Url) -> bool {
Some(borrow_host(local_host)) == url.host() && local_port == url.port()
}
fn borrow_host(host: &Host<String>) -> Host<&str> {
match host {
Host::Ipv4(ip) => Host::Ipv4(*ip),
@ -180,31 +136,30 @@ where
T: Ingest<Object>,
Object: 'static,
{
type Error = T::Error;
type Local = T::Local;
type ActorId = T::ActorId;
type Error = T::Error;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
T::local_repo(self)
}
fn is_local(&self, url: &Url) -> bool {
T::is_local(self, url)
}
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<Remote::Error>,
{
T::ingest(
self,
authority,
actor_id,
activity,
repo,
remote_repo,
session,
)
.await
T::ingest(self, authority, actor_id, activity, remote_repo, session).await
}
}
@ -214,31 +169,30 @@ where
T: Ingest<Object>,
Object: 'static,
{
type Local = T::Local;
type Error = T::Error;
type ActorId = T::ActorId;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
T::local_repo(self)
}
fn is_local(&self, url: &Url) -> bool {
T::is_local(self, url)
}
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<Remote::Error>,
{
T::ingest(
self,
authority,
actor_id,
activity,
repo,
remote_repo,
session,
)
.await
T::ingest(self, authority, actor_id, activity, remote_repo, session).await
}
}
@ -248,31 +202,30 @@ where
T: Ingest<Object>,
Object: 'static,
{
type Local = T::Local;
type Error = T::Error;
type ActorId = T::ActorId;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
T::local_repo(self)
}
fn is_local(&self, url: &Url) -> bool {
T::is_local(self, url)
}
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<Remote::Error>,
{
T::ingest(
self,
authority,
actor_id,
activity,
repo,
remote_repo,
session,
)
.await
T::ingest(self, authority, actor_id, activity, remote_repo, session).await
}
}
@ -282,31 +235,30 @@ where
T: Ingest<Object>,
Object: 'static,
{
type Local = T::Local;
type Error = T::Error;
type ActorId = T::ActorId;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
T::local_repo(self)
}
fn is_local(&self, url: &Url) -> bool {
T::is_local(self, url)
}
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<Remote::Error>,
{
T::ingest(
self,
authority,
actor_id,
activity,
repo,
remote_repo,
session,
)
.await
T::ingest(self, authority, actor_id, activity, remote_repo, session).await
}
}
@ -316,190 +268,29 @@ where
T: Ingest<Object>,
Object: 'static,
{
type Local = T::Local;
type Error = T::Error;
type ActorId = T::ActorId;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
T::local_repo(self)
}
fn is_local(&self, url: &Url) -> bool {
T::is_local(self, url)
}
async fn ingest<Remote: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: Object,
repo: R,
remote_repo: &R::Remote,
activity: &Object,
remote_repo: Remote,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<Remote::Error>,
{
T::ingest(
self,
authority,
actor_id,
activity,
repo,
remote_repo,
session,
)
.await
}
}
#[async_trait::async_trait(?Send)]
impl<'a, T> FullRepo for &'a T
where
T: FullRepo,
{
type Error = T::Error;
type Local = T::Local;
type Crypto = T::Crypto;
type Remote = T::Remote;
type Ingest = T::Ingest;
fn local_domain(&self) -> &Host<String> {
T::local_domain(self)
}
fn local_port(&self) -> Option<u16> {
T::local_port(self)
}
fn local(&self) -> &Self::Local {
T::local(self)
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
T::remote(self, crypto)
}
fn ingest(&self) -> &Self::Ingest {
T::ingest(self)
}
}
#[async_trait::async_trait(?Send)]
impl<'a, T> FullRepo for &'a mut T
where
T: FullRepo,
{
type Error = T::Error;
type Local = T::Local;
type Crypto = T::Crypto;
type Remote = T::Remote;
type Ingest = T::Ingest;
fn local_domain(&self) -> &Host<String> {
T::local_domain(self)
}
fn local_port(&self) -> Option<u16> {
T::local_port(self)
}
fn local(&self) -> &Self::Local {
T::local(self)
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
T::remote(self, crypto)
}
fn ingest(&self) -> &Self::Ingest {
T::ingest(self)
}
}
#[async_trait::async_trait(?Send)]
impl<T> FullRepo for Box<T>
where
T: FullRepo,
{
type Error = T::Error;
type Local = T::Local;
type Crypto = T::Crypto;
type Remote = T::Remote;
type Ingest = T::Ingest;
fn local_domain(&self) -> &Host<String> {
T::local_domain(self)
}
fn local_port(&self) -> Option<u16> {
T::local_port(self)
}
fn local(&self) -> &Self::Local {
T::local(self)
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
T::remote(self, crypto)
}
fn ingest(&self) -> &Self::Ingest {
T::ingest(self)
}
}
#[async_trait::async_trait(?Send)]
impl<T> FullRepo for Rc<T>
where
T: FullRepo,
{
type Error = T::Error;
type Local = T::Local;
type Crypto = T::Crypto;
type Remote = T::Remote;
type Ingest = T::Ingest;
fn local_domain(&self) -> &Host<String> {
T::local_domain(self)
}
fn local_port(&self) -> Option<u16> {
T::local_port(self)
}
fn local(&self) -> &Self::Local {
T::local(self)
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
T::remote(self, crypto)
}
fn ingest(&self) -> &Self::Ingest {
T::ingest(self)
}
}
#[async_trait::async_trait(?Send)]
impl<T> FullRepo for Arc<T>
where
T: FullRepo,
{
type Error = T::Error;
type Local = T::Local;
type Crypto = T::Crypto;
type Remote = T::Remote;
type Ingest = T::Ingest;
fn local_domain(&self) -> &Host<String> {
T::local_domain(self)
}
fn local_port(&self) -> Option<u16> {
T::local_port(self)
}
fn local(&self) -> &Self::Local {
T::local(self)
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
T::remote(self, crypto)
}
fn ingest(&self) -> &Self::Ingest {
T::ingest(self)
T::ingest(self, authority, actor_id, activity, remote_repo, session).await
}
}

View file

@ -34,6 +34,9 @@ pub trait PrivateKeyBuilder: PrivateKey {
fn build(key_id: String, private_key_pem: &str) -> Result<Self, Self::Error>
where
Self: Sized;
/// Retrieve the pem pkcs8 encoded private key from this builder
fn private_key_pem(&self) -> Result<String, Self::Error>;
}
/// Describes verifying signatures

View file

@ -1,6 +1,7 @@
use apub_core::{
activitypub::{Activity, DeliverableObject},
ingest::{Authority, FullRepo, Ingest},
ingest::{Authority, Ingest},
repo::Repo,
session::Session,
};
use url::Url;
@ -63,20 +64,28 @@ where
I: Ingest<A>,
I::Error: From<AuthorityError>,
{
type Error = I::Error;
type Local = I::Local;
type ActorId = I::ActorId;
type Error = I::Error;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
self.ingest.local_repo()
}
fn is_local(&self, url: &Url) -> bool {
self.ingest.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: A,
repo: R,
remote_repo: &R::Remote,
activity: &A,
remote_repo: R,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<R::Error>,
{
let activity_actor = activity.actor_id();
@ -97,7 +106,7 @@ where
}
self.ingest
.ingest(authority, actor_id, activity, repo, remote_repo, session)
.ingest(authority, actor_id, activity, remote_repo, session)
.await
}
}
@ -110,20 +119,28 @@ where
I::Error: From<InboxError>,
I::ActorId: InboxType,
{
type Error = I::Error;
type Local = I::Local;
type ActorId = I::ActorId;
type Error = I::Error;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
self.ingest.local_repo()
}
fn is_local(&self, url: &Url) -> bool {
self.ingest.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: A,
repo: R,
remote_repo: &R::Remote,
activity: &A,
remote_repo: R,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<R::Error>,
{
// delivered a public activity to a user inbox
if !actor_id.is_shared() && activity.is_public() {
@ -136,7 +153,7 @@ where
}
self.ingest
.ingest(authority, actor_id, activity, repo, remote_repo, session)
.ingest(authority, actor_id, activity, remote_repo, session)
.await
}
}
@ -148,20 +165,27 @@ where
I: Ingest<A>,
I::Error: From<HostError>,
{
type Error = I::Error;
type Local = I::Local;
type ActorId = I::ActorId;
type Error = I::Error;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
self.ingest.local_repo()
}
fn is_local(&self, url: &Url) -> bool {
self.ingest.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
authority: Authority,
actor_id: Self::ActorId,
activity: A,
repo: R,
remote_repo: &R::Remote,
activity: &A,
remote_repo: R,
session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<R::Error>,
{
if activity.id().host() != activity.actor_id().host()
|| activity.id().port() != activity.actor_id().port()
@ -170,7 +194,7 @@ where
}
self.ingest
.ingest(authority, actor_id, activity, repo, remote_repo, session)
.ingest(authority, actor_id, activity, remote_repo, session)
.await
}
}

View file

@ -138,6 +138,12 @@ impl apub_core::signature::PrivateKeyBuilder for OpenSsl {
private_key: PKey::private_key_from_pem(private_key_pem.as_bytes())?,
})
}
fn private_key_pem(&self) -> Result<String, Self::Error> {
self.private_key
.private_key_to_pem_pkcs8()
.map(|v| String::from_utf8_lossy(&v).to_string())
}
}
impl Debug for OpenSslDigest {

View file

@ -11,6 +11,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
apub-core = { version = "0.1.0", path = "../apub-core/" }
async-trait = "0.1.51"
serde = { version = "1", features = ["derive"] }
url = { version = "2", features = ["serde"] }

View file

@ -1,3 +1,4 @@
use apub_core::signature::PrivateKeyBuilder;
use std::{
ops::{Deref, DerefMut},
rc::Rc,
@ -8,16 +9,12 @@ use url::Url;
#[async_trait::async_trait(?Send)]
pub trait PrivateKeyRepo {
type PrivateKey: PrivateKeyBuilder;
type Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error>;
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error>;
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error>;
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error>;
}
pub trait PrivateKeyRepoFactory {
@ -83,18 +80,14 @@ impl<'a, T> PrivateKeyRepo for &'a T
where
T: PrivateKeyRepo,
{
type PrivateKey = T::PrivateKey;
type Error = T::Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
T::store(self, actor_id, key_id, private_key_pem).await
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
T::store(self, actor_id, key).await
}
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
T::fetch(self, actor_id).await
}
}
@ -104,18 +97,14 @@ impl<'a, T> PrivateKeyRepo for &'a mut T
where
T: PrivateKeyRepo,
{
type PrivateKey = T::PrivateKey;
type Error = T::Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
T::store(self, actor_id, key_id, private_key_pem).await
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
T::store(self, actor_id, key).await
}
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
T::fetch(self, actor_id).await
}
}
@ -125,18 +114,14 @@ impl<T> PrivateKeyRepo for Box<T>
where
T: PrivateKeyRepo,
{
type PrivateKey = T::PrivateKey;
type Error = T::Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
T::store(self, actor_id, key_id, private_key_pem).await
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
T::store(self, actor_id, key).await
}
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
T::fetch(self, actor_id).await
}
}
@ -146,18 +131,14 @@ impl<T> PrivateKeyRepo for Rc<T>
where
T: PrivateKeyRepo,
{
type PrivateKey = T::PrivateKey;
type Error = T::Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
T::store(self, actor_id, key_id, private_key_pem).await
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
T::store(self, actor_id, key).await
}
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
T::fetch(self, actor_id).await
}
}
@ -167,18 +148,14 @@ impl<T> PrivateKeyRepo for Arc<T>
where
T: PrivateKeyRepo,
{
type PrivateKey = T::PrivateKey;
type Error = T::Error;
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
T::store(self, actor_id, key_id, private_key_pem).await
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
T::store(self, actor_id, key).await
}
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
T::fetch(self, actor_id).await
}
}

View file

@ -4,7 +4,7 @@
use rsa::{
hash::Hash,
pkcs8::{FromPrivateKey, FromPublicKey},
pkcs8::{FromPrivateKey, FromPublicKey, ToPrivateKey},
PaddingScheme, PublicKey, RsaPrivateKey, RsaPublicKey,
};
use sha2::{Digest, Sha256};
@ -165,6 +165,13 @@ impl apub_core::signature::PrivateKeyBuilder for Rustcrypto {
private_key: RsaPrivateKey::from_pkcs8_pem(private_key_pem)?,
})
}
fn private_key_pem(&self) -> Result<String, Self::Error> {
self.private_key
.to_pkcs8_pem()
.map(|s| (*s).clone())
.map_err(RustcryptoError::from)
}
}
impl Debug for RsaSigner {

View file

@ -13,21 +13,15 @@ use apub::{
},
cryptography::{
rustcrypto::{RsaVerifier, RustcryptoError, Sha256Digest},
DigestFactory, Rustcrypto, VerifyFactory,
},
ingest::{
validate_authority, validate_hosts, validate_inbox,
validators::{
AuthorityError, HostError, InboxError, InboxType, ValidateAuthority, ValidateHosts,
ValidateInbox,
},
Authority,
signature::PrivateKeyBuilder,
DigestFactory, PrivateKey, Rustcrypto, VerifyFactory,
},
ingest::Authority,
servers::actix_web::{
inbox, serve_objects, Config, SignatureConfig as ActixWebSignatureConfig, VerifyError,
},
session::{BreakerSession, RequestCountSession, SessionFactory},
Dereference, FullRepo, Ingest, Repo, RepoFactory, Session,
Dereference, Ingest, Repo, RepoFactory, Session,
};
use dashmap::DashMap;
use example_types::{AcceptedActivity, ActivityType, ObjectId};
@ -65,7 +59,6 @@ struct RequestVerifier {
#[derive(Clone)]
struct ActivityIngester {
repo: MemoryRepo,
ingest: ValidateAuthority<ValidateInbox<ValidateHosts<MemoryRepo>>>,
private_key_store: PrivateKeyStore,
session: BreakerSession,
client: awc::Client,
@ -92,15 +85,6 @@ enum ServerError {
#[error(transparent)]
Key(#[from] KeyError),
#[error(transparent)]
Host(#[from] HostError),
#[error(transparent)]
Inbox(#[from] InboxError),
#[error(transparent)]
Authority(#[from] AuthorityError),
}
impl From<PublicKeyError<serde_json::Error, AwcError<RustcryptoError>>> for ServerError {
@ -165,12 +149,24 @@ impl PublicKeyRepo for MemoryRepo {
}
}
#[derive(Clone, Debug)]
struct KeyError;
#[derive(Debug)]
enum KeyError {
Missing,
Url,
Rustcrypto(RustcryptoError),
}
impl std::fmt::Display for KeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Key is missing")
match self {
Self::Missing => {
write!(f, "Key is missing")
}
Self::Url => {
write!(f, "Couldn't parse key id")
}
Self::Rustcrypto(rustcrypto_err) => std::fmt::Display::fmt(rustcrypto_err, f),
}
}
}
@ -178,22 +174,22 @@ impl std::error::Error for KeyError {}
#[async_trait::async_trait(?Send)]
impl PrivateKeyRepo for PrivateKeyStore {
type PrivateKey = Rustcrypto;
type Error = KeyError;
async fn fetch(&self, actor_id: &Url) -> Result<(KeyId, String), Self::Error> {
async fn fetch(&self, actor_id: &Url) -> Result<Self::PrivateKey, Self::Error> {
if let Some(tup_ref) = self.inner.get(actor_id) {
return Ok(tup_ref.clone());
return Rustcrypto::build(tup_ref.0.to_string(), &tup_ref.1)
.map_err(KeyError::Rustcrypto);
}
Err(KeyError)
Err(KeyError::Missing)
}
async fn store(
&self,
actor_id: Url,
key_id: KeyId,
private_key_pem: String,
) -> Result<(), Self::Error> {
async fn store(&self, actor_id: Url, key: &Self::PrivateKey) -> Result<(), Self::Error> {
let key_id = key.key_id().parse().map_err(|_| KeyError::Url)?;
let private_key_pem = key.private_key_pem().map_err(KeyError::Rustcrypto)?;
self.inner.insert(actor_id, (key_id, private_key_pem));
Ok(())
}
@ -269,11 +265,6 @@ impl FromRequest for InboxActor {
)))
}
}
impl InboxType for InboxActor {
fn is_shared(&self) -> bool {
true
}
}
impl AsRef<Url> for InboxActor {
fn as_ref(&self) -> &Url {
@ -282,34 +273,40 @@ impl AsRef<Url> for InboxActor {
}
#[async_trait::async_trait(?Send)]
impl Ingest<AcceptedActivity> for MemoryRepo {
impl Ingest<AcceptedActivity> for ActivityIngester {
type Local = MemoryRepo;
type Error = ServerError;
type ActorId = InboxActor;
async fn ingest<R: FullRepo, S: Session>(
fn local_repo(&self) -> &Self::Local {
&self.repo
}
fn is_local(&self, url: &Url) -> bool {
self.config.is_local(url)
}
async fn ingest<R: Repo, S: Session>(
&self,
_authority: Authority,
_actor_id: Self::ActorId,
activity: AcceptedActivity,
_repo: R,
_remote_repo: &R::Remote,
activity: &AcceptedActivity,
_remote_repo: R,
_session: S,
) -> Result<(), R::Error>
) -> Result<(), Self::Error>
where
R::Error: From<Self::Error>,
Self::Error: From<R::Error>,
{
Ok(self
.insert(activity.id().clone(), &activity)
.map_err(Self::Error::from)?)
Ok(self.repo.insert(activity.id().clone(), activity)?)
}
}
impl RepoFactory for ActivityIngester {
type Crypto = ();
type Repo = MemoryRepo;
type Crypto = Rustcrypto;
type Repo = AwcClient<Rustcrypto>;
fn build_repo(&self, _: Self::Crypto) -> Self::Repo {
self.repo.clone()
fn build_repo(&self, crypto: Self::Crypto) -> Self::Repo {
AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto)
}
}
@ -321,34 +318,6 @@ impl SessionFactory for ActivityIngester {
}
}
impl FullRepo for ActivityIngester {
type Error = ServerError;
type Local = MemoryRepo;
type Crypto = Rustcrypto;
type Remote = AwcClient<Rustcrypto>;
type Ingest = ValidateAuthority<ValidateInbox<ValidateHosts<MemoryRepo>>>;
fn local(&self) -> &Self::Local {
&self.repo
}
fn remote(&self, crypto: Self::Crypto) -> Self::Remote {
AwcClient::new(self.client.clone(), self.signature_config.clone(), crypto)
}
fn ingest(&self) -> &Self::Ingest {
&self.ingest
}
fn local_port(&self) -> Option<u16> {
self.config.local_port
}
fn local_domain(&self) -> &Host<String> {
&self.config.local_host
}
}
pub struct ActorId(Url);
impl Dereference for ActorId {
type Output = SimpleActor;
@ -415,9 +384,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
repo.insert(ActorId(actor_id.clone()), &server_actor)?;
private_key_store
.store(actor_id, key_id, private_key_pem.to_string())
.await?;
let key = Rustcrypto::build(key_id.to_string(), &private_key_pem)?;
private_key_store.store(actor_id, &key).await?;
HttpServer::new(move || {
let client = awc::Client::new();
@ -431,7 +400,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let ingester = ActivityIngester {
repo: repo.clone(),
ingest: validate_authority(validate_inbox(validate_hosts(repo.clone()))),
private_key_store: private_key_store.clone(),
session: session.clone(),
client,

View file

@ -21,17 +21,9 @@
//!
//! ## Accepting
//!
//! The [`Ingest`] trait is the main point for accepting activities into your system. It is designed
//! to be used from the context of the [`FullRepo`] trait, which combines a local repository, an
//! HTTP repository, and your custom [`Ingest`] implementation. Using a `FullRepo` provides an
//! `Ingest` with all the peices needed to fully ingest an activitypub activity. Since Activities
//! often reference each other, being able to pause ingesting one activity while you ingest one of
//! it's dependencies is super important. that's why `FullRepo` provides an `ingest_activity`
//! method, which can be called at any point within an Ingest implementation to recursively ingest a
//! new object.
//!
//! `Ingest` is generic over the type it ingests. This allows for a single type to accept multiple
//! kinds of activities with different implementations. It may be worthwile to implement
//! The [`Ingest`] trait is the main point for accepting activities into your system. `Ingest` is
//! generic over the type it ingests. This allows for a single type to accept multiple kinds of
//! activities with different implementations. It may be worthwile to implement
//! `Ingest<DeleteActivity>` differently from `Ingest<UndoActivity>`. Further, implementations can
//! implement `Ingest<A> where A: SomeTrait`. Creating generic `Ingest` implementations can enable
//! better code re-use between kinds of activities.
@ -101,7 +93,7 @@
//! [`PrivateKeyRepo`]: activitypub::keys::PrivateKeyRepo
pub use apub_core::{
deliver::{Deliver, Deliverable},
ingest::{FullRepo, Ingest},
ingest::Ingest,
object_id::ObjectId,
repo::{Dereference, Repo, RepoFactory},
session::Session,