From d100172dfb162b4e76365440ed0bf0351c8178ca Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 26 Nov 2021 20:48:36 -0600 Subject: [PATCH] Add Ingest client for ingesting fetched objects --- Cargo.toml | 4 +- apub-actix-web/src/lib.rs | 14 +- apub-core/src/ingest.rs | 33 ++++- apub-ingest-client/Cargo.toml | 21 +++ apub-ingest-client/src/lib.rs | 177 +++++++++++++++++++++++++ examples/actix-web-example/src/main.rs | 6 +- src/lib.rs | 118 ++++++++++++++++- 7 files changed, 358 insertions(+), 15 deletions(-) create mode 100644 apub-ingest-client/Cargo.toml create mode 100644 apub-ingest-client/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8e2a004..9d5c7c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ with-background-jobs = ["apub-background-jobs"] with-openssl = ["apub-openssl"] with-reqwest = ["apub-reqwest"] with-rustcrypto = ["apub-rustcrypto"] -utils = ["apub-breaker-session", "apub-deref-client"] +utils = ["apub-breaker-session", "apub-deref-client", "apub-ingest-client"] [dependencies] apub-actix-web = { version = "0.1.0", path = "./apub-actix-web/", optional = true } @@ -22,6 +22,7 @@ apub-background-jobs = { version = "0.1.0", path = "./apub-background-jobs/", op apub-breaker-session = { version = "0.1.0", path = "./apub-breaker-session/", optional = true } apub-core = { version = "0.1.0", path = "./apub-core/" } apub-deref-client = { version = "0.1.0", path = "./apub-deref-client/", optional = true } +apub-ingest-client = { version = "0.1.0", path = "./apub-ingest-client/", optional = true } apub-openssl = { version = "0.1.0", path = "./apub-openssl/", optional = true } apub-reqwest = { version = "0.1.0", path = "./apub-reqwest/", optional = true } apub-rustcrypto = { version = "0.1.0", path = "./apub-rustcrypto/", optional = true } @@ -43,6 +44,7 @@ members = [ "apub-breaker-session", "apub-core", "apub-deref-client", + "apub-ingest-client", "apub-openssl", "apub-reqwest", "apub-rustcrypto", diff --git a/apub-actix-web/src/lib.rs b/apub-actix-web/src/lib.rs index b0d55fd..5c37ea4 100644 --- a/apub-actix-web/src/lib.rs +++ b/apub-actix-web/src/lib.rs @@ -13,7 +13,7 @@ use actix_web::{ use apub_core::{ deref::{Dereference, Repo}, digest::{Digest, DigestBuilder, DigestFactory}, - ingest::Ingest, + ingest::{Authority, Ingest}, signature::{Verify, VerifyBuilder, VerifyFactory}, }; use http_signature_normalization_actix::{ @@ -228,11 +228,19 @@ where let activity = activity.into_inner(); if let Some(auth) = authority { if let Ok(url) = auth.key_id().parse() { - if ingest.ingest(Some(url), activity, metadata).await.is_ok() { + if ingest + .ingest(Authority::Actor(url), activity, metadata) + .await + .is_ok() + { return HttpResponse::Accepted().finish(); } } - } else if ingest.ingest(None, activity, metadata).await.is_ok() { + } else if ingest + .ingest(Authority::None, activity, metadata) + .await + .is_ok() + { return HttpResponse::Accepted().finish(); } diff --git a/apub-core/src/ingest.rs b/apub-core/src/ingest.rs index d2e88a0..2df5f8b 100644 --- a/apub-core/src/ingest.rs +++ b/apub-core/src/ingest.rs @@ -3,6 +3,27 @@ use std::{rc::Rc, sync::Arc}; use url::Url; +/// The Authority that is providing the ingested data +/// +/// - An Authority of None means the data is untrustworthy, as no entity can be verified to have +/// provided this data +/// - An Authority of Server means the data to be ingested has been provided on behalf of it's +/// origin server. A URL is provided in the Server variant to describe the specific URL the data +/// originates from. +/// - An Authority of Actor(Url) means the data to be ingested has been provided on behalf of the +/// Actor identified by the associated URL +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum Authority { + /// No Authority provided + None, + + /// The Authority for ingested data is the server the data is hosted on + Server(Url), + + /// The Authority for the ingested data is the provided url + Actor(Url), +} + /// Describes accepting a new Activity into the system /// /// This type is implemented by users of `apub` to hook into provied inbox methods @@ -16,7 +37,7 @@ pub trait Ingest<'a, Activity, Metadata> { /// Accept and process a given activity fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future; @@ -31,7 +52,7 @@ where fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future { @@ -48,7 +69,7 @@ where fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future { @@ -65,7 +86,7 @@ where fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future { @@ -82,7 +103,7 @@ where fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future { @@ -99,7 +120,7 @@ where fn ingest( &'a self, - authority: Option, + authority: Authority, activity: Activity, metadata: Metadata, ) -> Self::Future { diff --git a/apub-ingest-client/Cargo.toml b/apub-ingest-client/Cargo.toml new file mode 100644 index 0000000..c878a13 --- /dev/null +++ b/apub-ingest-client/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "apub-ingest-client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-core = { version = "0.1.0", path = "../apub-core" } +thiserror = "1" + +[dev-dependencies] +apub-openssl = { version = "0.1.0", path = "../apub-openssl/" } +apub-reqwest = { version = "0.1.0", path = "../apub-reqwest/" } +dashmap = "4.0.2" +openssl = "0.10" +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1.14", features = ["full"] } +url = { version = "2", features = ["serde"] } diff --git a/apub-ingest-client/src/lib.rs b/apub-ingest-client/src/lib.rs new file mode 100644 index 0000000..9fd58d5 --- /dev/null +++ b/apub-ingest-client/src/lib.rs @@ -0,0 +1,177 @@ +//! The type that combines Remote repos with Ingest +//! +//! When `R` implements [`Repo`] and `I` implements [`Ingest`], IngestClient implements +//! `Ingest` +//! +//! ```rust +//! use apub_core::{ +//! deref::{Dereference, Repo}, +//! ingest::{Authority, Ingest}, +//! }; +//! use apub_ingest_client::Client; +//! use apub_reqwest::{ReqwestClient, SignatureConfig}; +//! use apub_openssl::OpenSsl; +//! use dashmap::DashMap; +//! use openssl::{ +//! pkey::PKey, +//! rsa::Rsa, +//! }; +//! use std::{ +//! future::{ready, Ready}, +//! sync::Arc, +//! time::Duration, +//! }; +//! use url::Url; +//! +//! trait Object: serde::ser::Serialize + for<'de> serde::de::Deserialize<'de> { +//! fn id(&self) -> &Url; +//! } +//! +//! #[derive(Clone, Debug, Default)] +//! pub struct MemoryRepo { +//! inner: Arc>, +//! } +//! +//! impl<'a, 'b, Activity> Ingest<'a, Activity, &'b ()> for MemoryRepo +//! where +//! Activity: Object, +//! { +//! type Error = serde_json::Error; +//! type Future = Ready>; +//! +//! fn ingest(&'a self, _authority: Authority, activity: Activity, _metadata: &'b ()) -> Self::Future { +//! let url = activity.id().clone(); +//! ready(serde_json::to_value(activity).map(|value| { +//! self.inner.insert(url, value); +//! })) +//! } +//! } +//! +//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +//! #[serde(transparent)] +//! struct MyId(Url); +//! +//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +//! struct MyObject { +//! id: MyId, +//! content: String, +//! } +//! +//! impl Dereference for MyId { +//! type Output = MyObject; +//! +//! fn url(&self) -> &Url { +//! &self.0 +//! } +//! } +//! +//! impl Object for MyObject { +//! fn id(&self) -> &Url { +//! &self.id.0 +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let remote_url: Url = "https://masto.asonix.dog/users/asonix/statuses/107326693283572188".parse()?; +//! +//! let local_repo = MemoryRepo::default(); +//! +//! let signature_config = SignatureConfig::default(); +//! let private_key = PKey::from_rsa(Rsa::generate(1024)?)?; +//! let crypto = OpenSsl::new("key-id".to_string(), private_key); +//! let client = reqwest::Client::new(); +//! let remote_repo = ReqwestClient::new(&client, (), &signature_config, &crypto); +//! +//! assert!(local_repo.inner.get(&remote_url).is_none()); +//! +//! let ingest_client = Client::new( +//! &remote_repo, +//! &local_repo, +//! &(), +//! ); +//! +//! let opt = ingest_client.fetch(MyId(remote_url.clone())).await?; +//! assert!(opt.is_some()); +//! if let Some(my_object) = opt { +//! println!("{:?}", my_object) +//! } +//! +//! assert!(local_repo.inner.get(&remote_url).is_some()); +//! +//! Ok(()) +//! } +//! ``` + +#![deny(missing_docs)] + +use apub_core::{ + deref::{Dereference, Repo}, + ingest::{Authority, Ingest}, +}; +use std::{future::Future, pin::Pin}; + +/// A Repo that ingests objects it fetches +pub struct Client<'b, R, I, M> { + repo: R, + ingest: I, + metadata: &'b M, +} + +impl<'b, R, I, M> Client<'b, R, I, M> { + /// Create a new Client + pub fn new(repo: R, ingest: I, metadata: &'b M) -> Self { + Client { + repo, + ingest, + metadata, + } + } +} + +/// Errors produced when fetching or ingesting +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Error fetching + #[error("{0}")] + Repo(#[source] R), + + /// Error ingesting + #[error("{0}")] + Ingest(#[source] I), +} + +impl<'a, 'b, D, R, I, M> Repo<'a, D> for Client<'b, R, I, M> +where + 'b: 'a, + D: Dereference, + D: 'a, + D::Output: Clone, + R: Repo<'a, D>, + I: Ingest<'a, D::Output, &'b M>, + I: 'a, + >::Error: std::error::Error + 'static, + >::Error: std::error::Error + 'static, +{ + type Error = ClientError<'a, 'b, D, R, I, M>; + type Future = Pin, Self::Error>> + 'a>>; + + fn fetch(&'a self, id: D) -> Self::Future { + Box::pin(async move { + let url = id.url().clone(); + if let Some(output) = self.repo.fetch(id).await.map_err(Error::Repo)? { + self.ingest + .ingest(Authority::Server(url), output.clone(), self.metadata) + .await + .map_err(Error::Ingest)?; + + Ok(Some(output)) + } else { + Ok(None) + } + }) + } +} + +type ClientError<'a, 'b, D, R, I, M> = + Error<>::Error, ::Output, &'b M>>::Error>; diff --git a/examples/actix-web-example/src/main.rs b/examples/actix-web-example/src/main.rs index ca1fb41..5adbfe7 100644 --- a/examples/actix-web-example/src/main.rs +++ b/examples/actix-web-example/src/main.rs @@ -3,8 +3,8 @@ use apub::{ clients::{Dereference, Repo}, crypto::{DigestFactory, RsaVerifier, RustcryptoError, Sha256Digest, VerifyFactory}, servers::{ - inbox, serve_objects, ActixWebSignatureConfig, Ingest, RepoFactory, RepoFactoryX, - VerifyError, + inbox, serve_objects, ActixWebSignatureConfig, Authority, Ingest, RepoFactory, + RepoFactoryX, VerifyError, }, }; use dashmap::DashMap; @@ -129,7 +129,7 @@ impl<'a> Ingest<'a, AcceptedActivity, ()> for ActivityIngester { fn ingest( &'a self, - _authority: Option, + _authority: Authority, activity: AcceptedActivity, _metadata: (), ) -> Self::Future { diff --git a/src/lib.rs b/src/lib.rs index 2a46a9e..dd0e930 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub mod clients { #[cfg(feature = "apub-deref-client")] /// The type that combines Local and Remote repos /// - /// when both `Local` and `Http` implement [`Repo`], so does Client + /// When both `Local` and `Http` implement [`Repo`], so does CombinedClient /// /// ```rust /// use apub::{ @@ -140,6 +140,120 @@ pub mod clients { /// ``` pub use apub_deref_client::Client as CombinedClient; + #[cfg(feature = "apub-ingest-client")] + /// The type that combines Remote repos with Ingest + /// + /// When `R` implements [`Repo`] and `I` implements [`Ingest`], IngestClient implements + /// `Ingest` + /// + /// ```rust + /// use apub::{ + /// clients::{ + /// IngestClient, + /// Dereference, + /// Repo, + /// ReqwestClient, + /// ReqwestSignatureConfig, + /// }, + /// servers::{Authority, Ingest}, + /// crypto::OpenSsl, + /// }; + /// use dashmap::DashMap; + /// use openssl::{ + /// pkey::PKey, + /// rsa::Rsa, + /// }; + /// use std::{ + /// future::{ready, Ready}, + /// sync::Arc, + /// time::Duration, + /// }; + /// use url::Url; + /// + /// trait Object: serde::ser::Serialize + for<'de> serde::de::Deserialize<'de> { + /// fn id(&self) -> &Url; + /// } + /// + /// #[derive(Clone, Debug, Default)] + /// pub struct MemoryRepo { + /// inner: Arc>, + /// } + /// + /// impl<'a, 'b, Activity> Ingest<'a, Activity, &'b ()> for MemoryRepo + /// where + /// Activity: Object, + /// { + /// type Error = serde_json::Error; + /// type Future = Ready>; + /// + /// fn ingest(&'a self, _authority: Authority, activity: Activity, _metadata: &'b ()) -> Self::Future { + /// let url = activity.id().clone(); + /// ready(serde_json::to_value(activity).map(|value| { + /// self.inner.insert(url, value); + /// })) + /// } + /// } + /// + /// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] + /// #[serde(transparent)] + /// struct MyId(Url); + /// + /// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] + /// struct MyObject { + /// id: MyId, + /// content: String, + /// } + /// + /// impl Dereference for MyId { + /// type Output = MyObject; + /// + /// fn url(&self) -> &Url { + /// &self.0 + /// } + /// } + /// + /// impl Object for MyObject { + /// fn id(&self) -> &Url { + /// &self.id.0 + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let remote_url: Url = "https://masto.asonix.dog/users/asonix/statuses/107326693283572188".parse()?; + /// + /// let local_repo = MemoryRepo::default(); + /// + /// let signature_config = ReqwestSignatureConfig::default(); + /// let private_key = PKey::from_rsa(Rsa::generate(1024)?)?; + /// let crypto = OpenSsl::new("key-id".to_string(), private_key); + /// let client = reqwest::Client::new(); + /// let remote_repo = ReqwestClient::new(&client, (), &signature_config, &crypto); + /// + /// assert!(local_repo.inner.get(&remote_url).is_none()); + /// + /// let ingest_client = IngestClient::new( + /// &remote_repo, + /// &local_repo, + /// &(), + /// ); + /// + /// let opt = ingest_client.fetch(MyId(remote_url.clone())).await?; + /// assert!(opt.is_some()); + /// if let Some(my_object) = opt { + /// println!("{:?}", my_object) + /// } + /// + /// assert!(local_repo.inner.get(&remote_url).is_some()); + /// + /// Ok(()) + /// } + /// ``` + pub use apub_ingest_client::Client as IngestClient; + + #[cfg(feature = "apub-ingest-client")] + pub use apub_ingest_client::Error as IngestClientError; + #[cfg(feature = "apub-deref-client")] pub use apub_deref_client::Error as CombinedError; @@ -157,7 +271,7 @@ pub mod clients { } pub mod servers { - pub use apub_core::ingest::Ingest; + pub use apub_core::ingest::{Authority, Ingest}; #[cfg(feature = "apub-actix-web")] pub use apub_actix_web::{