Add Ingest client for ingesting fetched objects

This commit is contained in:
Aode (lion) 2021-11-26 20:48:36 -06:00
parent 43b953d4df
commit d100172dfb
7 changed files with 358 additions and 15 deletions

View file

@ -13,7 +13,7 @@ with-background-jobs = ["apub-background-jobs"]
with-openssl = ["apub-openssl"]
with-reqwest = ["apub-reqwest"]
with-rustcrypto = ["apub-rustcrypto"]
utils = ["apub-breaker-session", "apub-deref-client"]
utils = ["apub-breaker-session", "apub-deref-client", "apub-ingest-client"]
[dependencies]
apub-actix-web = { version = "0.1.0", path = "./apub-actix-web/", optional = true }
@ -22,6 +22,7 @@ apub-background-jobs = { version = "0.1.0", path = "./apub-background-jobs/", op
apub-breaker-session = { version = "0.1.0", path = "./apub-breaker-session/", optional = true }
apub-core = { version = "0.1.0", path = "./apub-core/" }
apub-deref-client = { version = "0.1.0", path = "./apub-deref-client/", optional = true }
apub-ingest-client = { version = "0.1.0", path = "./apub-ingest-client/", optional = true }
apub-openssl = { version = "0.1.0", path = "./apub-openssl/", optional = true }
apub-reqwest = { version = "0.1.0", path = "./apub-reqwest/", optional = true }
apub-rustcrypto = { version = "0.1.0", path = "./apub-rustcrypto/", optional = true }
@ -43,6 +44,7 @@ members = [
"apub-breaker-session",
"apub-core",
"apub-deref-client",
"apub-ingest-client",
"apub-openssl",
"apub-reqwest",
"apub-rustcrypto",

View file

@ -13,7 +13,7 @@ use actix_web::{
use apub_core::{
deref::{Dereference, Repo},
digest::{Digest, DigestBuilder, DigestFactory},
ingest::Ingest,
ingest::{Authority, Ingest},
signature::{Verify, VerifyBuilder, VerifyFactory},
};
use http_signature_normalization_actix::{
@ -228,11 +228,19 @@ where
let activity = activity.into_inner();
if let Some(auth) = authority {
if let Ok(url) = auth.key_id().parse() {
if ingest.ingest(Some(url), activity, metadata).await.is_ok() {
if ingest
.ingest(Authority::Actor(url), activity, metadata)
.await
.is_ok()
{
return HttpResponse::Accepted().finish();
}
}
} else if ingest.ingest(None, activity, metadata).await.is_ok() {
} else if ingest
.ingest(Authority::None, activity, metadata)
.await
.is_ok()
{
return HttpResponse::Accepted().finish();
}

View file

@ -3,6 +3,27 @@
use std::{rc::Rc, sync::Arc};
use url::Url;
/// The Authority that is providing the ingested data
///
/// - An Authority of None means the data is untrustworthy, as no entity can be verified to have
/// provided this data
/// - An Authority of Server means the data to be ingested has been provided on behalf of it's
/// origin server. A URL is provided in the Server variant to describe the specific URL the data
/// originates from.
/// - An Authority of Actor(Url) means the data to be ingested has been provided on behalf of the
/// Actor identified by the associated URL
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum Authority {
/// No Authority provided
None,
/// The Authority for ingested data is the server the data is hosted on
Server(Url),
/// The Authority for the ingested data is the provided url
Actor(Url),
}
/// Describes accepting a new Activity into the system
///
/// This type is implemented by users of `apub` to hook into provied inbox methods
@ -16,7 +37,7 @@ pub trait Ingest<'a, Activity, Metadata> {
/// Accept and process a given activity
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future;
@ -31,7 +52,7 @@ where
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future {
@ -48,7 +69,7 @@ where
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future {
@ -65,7 +86,7 @@ where
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future {
@ -82,7 +103,7 @@ where
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future {
@ -99,7 +120,7 @@ where
fn ingest(
&'a self,
authority: Option<Url>,
authority: Authority,
activity: Activity,
metadata: Metadata,
) -> Self::Future {

View file

@ -0,0 +1,21 @@
[package]
name = "apub-ingest-client"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
apub-core = { version = "0.1.0", path = "../apub-core" }
thiserror = "1"
[dev-dependencies]
apub-openssl = { version = "0.1.0", path = "../apub-openssl/" }
apub-reqwest = { version = "0.1.0", path = "../apub-reqwest/" }
dashmap = "4.0.2"
openssl = "0.10"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.14", features = ["full"] }
url = { version = "2", features = ["serde"] }

View file

@ -0,0 +1,177 @@
//! The type that combines Remote repos with Ingest
//!
//! When `R` implements [`Repo`] and `I` implements [`Ingest`], IngestClient<R, I> implements
//! `Ingest`
//!
//! ```rust
//! use apub_core::{
//! deref::{Dereference, Repo},
//! ingest::{Authority, Ingest},
//! };
//! use apub_ingest_client::Client;
//! use apub_reqwest::{ReqwestClient, SignatureConfig};
//! use apub_openssl::OpenSsl;
//! use dashmap::DashMap;
//! use openssl::{
//! pkey::PKey,
//! rsa::Rsa,
//! };
//! use std::{
//! future::{ready, Ready},
//! sync::Arc,
//! time::Duration,
//! };
//! use url::Url;
//!
//! trait Object: serde::ser::Serialize + for<'de> serde::de::Deserialize<'de> {
//! fn id(&self) -> &Url;
//! }
//!
//! #[derive(Clone, Debug, Default)]
//! pub struct MemoryRepo {
//! inner: Arc<DashMap<Url, serde_json::Value>>,
//! }
//!
//! impl<'a, 'b, Activity> Ingest<'a, Activity, &'b ()> for MemoryRepo
//! where
//! Activity: Object,
//! {
//! type Error = serde_json::Error;
//! type Future = Ready<Result<(), Self::Error>>;
//!
//! fn ingest(&'a self, _authority: Authority, activity: Activity, _metadata: &'b ()) -> Self::Future {
//! let url = activity.id().clone();
//! ready(serde_json::to_value(activity).map(|value| {
//! self.inner.insert(url, value);
//! }))
//! }
//! }
//!
//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
//! #[serde(transparent)]
//! struct MyId(Url);
//!
//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
//! struct MyObject {
//! id: MyId,
//! content: String,
//! }
//!
//! impl Dereference for MyId {
//! type Output = MyObject;
//!
//! fn url(&self) -> &Url {
//! &self.0
//! }
//! }
//!
//! impl Object for MyObject {
//! fn id(&self) -> &Url {
//! &self.id.0
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let remote_url: Url = "https://masto.asonix.dog/users/asonix/statuses/107326693283572188".parse()?;
//!
//! let local_repo = MemoryRepo::default();
//!
//! let signature_config = SignatureConfig::default();
//! let private_key = PKey::from_rsa(Rsa::generate(1024)?)?;
//! let crypto = OpenSsl::new("key-id".to_string(), private_key);
//! let client = reqwest::Client::new();
//! let remote_repo = ReqwestClient::new(&client, (), &signature_config, &crypto);
//!
//! assert!(local_repo.inner.get(&remote_url).is_none());
//!
//! let ingest_client = Client::new(
//! &remote_repo,
//! &local_repo,
//! &(),
//! );
//!
//! let opt = ingest_client.fetch(MyId(remote_url.clone())).await?;
//! assert!(opt.is_some());
//! if let Some(my_object) = opt {
//! println!("{:?}", my_object)
//! }
//!
//! assert!(local_repo.inner.get(&remote_url).is_some());
//!
//! Ok(())
//! }
//! ```
#![deny(missing_docs)]
use apub_core::{
deref::{Dereference, Repo},
ingest::{Authority, Ingest},
};
use std::{future::Future, pin::Pin};
/// A Repo that ingests objects it fetches
pub struct Client<'b, R, I, M> {
repo: R,
ingest: I,
metadata: &'b M,
}
impl<'b, R, I, M> Client<'b, R, I, M> {
/// Create a new Client
pub fn new(repo: R, ingest: I, metadata: &'b M) -> Self {
Client {
repo,
ingest,
metadata,
}
}
}
/// Errors produced when fetching or ingesting
#[derive(Debug, thiserror::Error)]
pub enum Error<R, I> {
/// Error fetching
#[error("{0}")]
Repo(#[source] R),
/// Error ingesting
#[error("{0}")]
Ingest(#[source] I),
}
impl<'a, 'b, D, R, I, M> Repo<'a, D> for Client<'b, R, I, M>
where
'b: 'a,
D: Dereference,
D: 'a,
D::Output: Clone,
R: Repo<'a, D>,
I: Ingest<'a, D::Output, &'b M>,
I: 'a,
<R as Repo<'a, D>>::Error: std::error::Error + 'static,
<I as Ingest<'a, D::Output, &'b M>>::Error: std::error::Error + 'static,
{
type Error = ClientError<'a, 'b, D, R, I, M>;
type Future = Pin<Box<dyn Future<Output = Result<Option<D::Output>, Self::Error>> + 'a>>;
fn fetch(&'a self, id: D) -> Self::Future {
Box::pin(async move {
let url = id.url().clone();
if let Some(output) = self.repo.fetch(id).await.map_err(Error::Repo)? {
self.ingest
.ingest(Authority::Server(url), output.clone(), self.metadata)
.await
.map_err(Error::Ingest)?;
Ok(Some(output))
} else {
Ok(None)
}
})
}
}
type ClientError<'a, 'b, D, R, I, M> =
Error<<R as Repo<'a, D>>::Error, <I as Ingest<'a, <D as Dereference>::Output, &'b M>>::Error>;

View file

@ -3,8 +3,8 @@ use apub::{
clients::{Dereference, Repo},
crypto::{DigestFactory, RsaVerifier, RustcryptoError, Sha256Digest, VerifyFactory},
servers::{
inbox, serve_objects, ActixWebSignatureConfig, Ingest, RepoFactory, RepoFactoryX,
VerifyError,
inbox, serve_objects, ActixWebSignatureConfig, Authority, Ingest, RepoFactory,
RepoFactoryX, VerifyError,
},
};
use dashmap::DashMap;
@ -129,7 +129,7 @@ impl<'a> Ingest<'a, AcceptedActivity, ()> for ActivityIngester {
fn ingest(
&'a self,
_authority: Option<Url>,
_authority: Authority,
activity: AcceptedActivity,
_metadata: (),
) -> Self::Future {

View file

@ -26,7 +26,7 @@ pub mod clients {
#[cfg(feature = "apub-deref-client")]
/// The type that combines Local and Remote repos
///
/// when both `Local` and `Http` implement [`Repo`], so does Client<Local, Http>
/// When both `Local` and `Http` implement [`Repo`], so does CombinedClient<Local, Http>
///
/// ```rust
/// use apub::{
@ -140,6 +140,120 @@ pub mod clients {
/// ```
pub use apub_deref_client::Client as CombinedClient;
#[cfg(feature = "apub-ingest-client")]
/// The type that combines Remote repos with Ingest
///
/// When `R` implements [`Repo`] and `I` implements [`Ingest`], IngestClient<R, I> implements
/// `Ingest`
///
/// ```rust
/// use apub::{
/// clients::{
/// IngestClient,
/// Dereference,
/// Repo,
/// ReqwestClient,
/// ReqwestSignatureConfig,
/// },
/// servers::{Authority, Ingest},
/// crypto::OpenSsl,
/// };
/// use dashmap::DashMap;
/// use openssl::{
/// pkey::PKey,
/// rsa::Rsa,
/// };
/// use std::{
/// future::{ready, Ready},
/// sync::Arc,
/// time::Duration,
/// };
/// use url::Url;
///
/// trait Object: serde::ser::Serialize + for<'de> serde::de::Deserialize<'de> {
/// fn id(&self) -> &Url;
/// }
///
/// #[derive(Clone, Debug, Default)]
/// pub struct MemoryRepo {
/// inner: Arc<DashMap<Url, serde_json::Value>>,
/// }
///
/// impl<'a, 'b, Activity> Ingest<'a, Activity, &'b ()> for MemoryRepo
/// where
/// Activity: Object,
/// {
/// type Error = serde_json::Error;
/// type Future = Ready<Result<(), Self::Error>>;
///
/// fn ingest(&'a self, _authority: Authority, activity: Activity, _metadata: &'b ()) -> Self::Future {
/// let url = activity.id().clone();
/// ready(serde_json::to_value(activity).map(|value| {
/// self.inner.insert(url, value);
/// }))
/// }
/// }
///
/// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// #[serde(transparent)]
/// struct MyId(Url);
///
/// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
/// struct MyObject {
/// id: MyId,
/// content: String,
/// }
///
/// impl Dereference for MyId {
/// type Output = MyObject;
///
/// fn url(&self) -> &Url {
/// &self.0
/// }
/// }
///
/// impl Object for MyObject {
/// fn id(&self) -> &Url {
/// &self.id.0
/// }
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let remote_url: Url = "https://masto.asonix.dog/users/asonix/statuses/107326693283572188".parse()?;
///
/// let local_repo = MemoryRepo::default();
///
/// let signature_config = ReqwestSignatureConfig::default();
/// let private_key = PKey::from_rsa(Rsa::generate(1024)?)?;
/// let crypto = OpenSsl::new("key-id".to_string(), private_key);
/// let client = reqwest::Client::new();
/// let remote_repo = ReqwestClient::new(&client, (), &signature_config, &crypto);
///
/// assert!(local_repo.inner.get(&remote_url).is_none());
///
/// let ingest_client = IngestClient::new(
/// &remote_repo,
/// &local_repo,
/// &(),
/// );
///
/// let opt = ingest_client.fetch(MyId(remote_url.clone())).await?;
/// assert!(opt.is_some());
/// if let Some(my_object) = opt {
/// println!("{:?}", my_object)
/// }
///
/// assert!(local_repo.inner.get(&remote_url).is_some());
///
/// Ok(())
/// }
/// ```
pub use apub_ingest_client::Client as IngestClient;
#[cfg(feature = "apub-ingest-client")]
pub use apub_ingest_client::Error as IngestClientError;
#[cfg(feature = "apub-deref-client")]
pub use apub_deref_client::Error as CombinedError;
@ -157,7 +271,7 @@ pub mod clients {
}
pub mod servers {
pub use apub_core::ingest::Ingest;
pub use apub_core::ingest::{Authority, Ingest};
#[cfg(feature = "apub-actix-web")]
pub use apub_actix_web::{