commit cea3fc807e8304ccd35d76e7e55b4c7f03450519 Author: Aode (lion) Date: Wed Nov 17 22:17:36 2021 -0600 Oh gosh oh heck diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5c56390 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "apub" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +[workspace] +members = [ + "apub-awc", + "apub-breaker-session", + "apub-deliver", + "apub-deref", + "apub-deref-client", + "apub-digest", + "apub-objectid", + "apub-openssl", + "apub-reqwest", + "apub-rustcrypto", + "apub-session", + "apub-signer", + "examples/awc-example", + "examples/example-types", + "examples/reqwest-example" +] diff --git a/apub-awc/Cargo.toml b/apub-awc/Cargo.toml new file mode 100644 index 0000000..7bbf68c --- /dev/null +++ b/apub-awc/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "apub-awc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-http = { version = "3.0.0-beta.12", default-features = false } +apub-deliver = { version = "0.1.0", path = "../apub-deliver/" } +apub-digest = { version = "0.1.0", path = "../apub-digest/" } +apub-deref = { version = "0.1.0", path = "../apub-deref/" } +apub-session = { version = "0.1.0", path = "../apub-session/" } +apub-signer = { version = "0.1.0", path = "../apub-signer/" } +awc = { version = "3.0.0-beta.10", default-features = false } +http-signature-normalization-actix = { version = "0.5.0-beta.12", default-features = false, features = ["client", "digest"] } +serde = "1" +serde_json = "1" +thiserror = "1" +tracing-awc = { version = "0.1.0-beta.8", git = "https://git.asonix.dog/asonix/tracing-awc" } +url = "2" diff --git a/apub-awc/src/lib.rs b/apub-awc/src/lib.rs new file mode 100644 index 0000000..b0386a8 --- /dev/null +++ b/apub-awc/src/lib.rs @@ -0,0 +1,223 @@ +use actix_http::error::BlockingError; +use apub_deref::{Dereference, Repo}; +use apub_digest::{Digest, DigestFactory}; +use apub_session::Session; +use apub_signer::{Sign, SignFactory}; +use awc::{http::header::HttpDate, Client}; +use http_signature_normalization_actix::{ + digest::DigestName, + prelude::{Config, DigestCreate, InvalidHeaderValue, PrepareSignError, Sign as _, SignExt}, +}; +use std::{future::Future, pin::Pin, time::SystemTime}; +use tracing_awc::Propagate; +use url::Url; + +pub struct AwcClient<'a, Session, Crypto> { + client: &'a Client, + session: &'a Session, + config: Config, + crypto: Crypto, +} + +#[derive(Debug, thiserror::Error)] +pub enum SignatureError { + #[error(transparent)] + Header(#[from] InvalidHeaderValue), + + #[error(transparent)] + Sign(#[from] PrepareSignError), + + #[error(transparent)] + Blocking(#[from] BlockingError), + + #[error(transparent)] + Signer(E), +} + +#[derive(Debug, thiserror::Error)] +pub enum AwcError { + #[error("Session indicated request should not procede")] + Session, + + #[error(transparent)] + Request(#[from] awc::error::SendRequestError), + + #[error(transparent)] + Response(#[from] awc::error::JsonPayloadError), + + #[error(transparent)] + Json(#[from] serde_json::Error), + + #[error("Invalid response code: {0}")] + Status(u16), + + #[error(transparent)] + SignatureError(#[from] SignatureError), +} + +type SignError = <::Signer as Sign>::Error; + +struct DigestWrapper(D); + +impl DigestName for DigestWrapper +where + D: Digest, +{ + const NAME: &'static str = D::NAME; +} + +impl DigestCreate for DigestWrapper +where + D: Digest + Clone, +{ + fn compute(&mut self, input: &[u8]) -> String { + self.0.clone().digest(input) + } +} + +impl<'a, CurrentSession, Crypto> AwcClient<'a, CurrentSession, Crypto> +where + CurrentSession: Session, + Crypto: SignFactory + 'static, +{ + pub fn new( + client: &'a Client, + session: &'a CurrentSession, + config: Config, + crypto: Crypto, + ) -> Self { + Self { + client, + session, + config, + crypto, + } + } + + async fn do_fetch( + &self, + id: &Id, + ) -> Result::Output>, AwcError>> { + let mut response = self + .client + .get(id.url().as_str()) + .insert_header(("Accept", "application/activity+json")) + .insert_header(("Date", HttpDate::from(SystemTime::now()))) + .signature(self.config.clone(), self.crypto.key_id(), { + let sign = self.crypto.signer(); + + move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer) + }) + .await? + .propagate() + .send() + .await?; + + Ok(Some(response.json().await?)) + } + + async fn do_deliver( + &self, + inbox: &Url, + activity: T, + ) -> Result<(), AwcError>> + where + Crypto: DigestFactory, + ::Digest: Clone, + T: serde::ser::Serialize, + { + let activity_string = serde_json::to_string(&activity)?; + + let (req, body) = self + .client + .post(inbox.as_str()) + .content_type("application/activity+json") + .insert_header(("Accept", "application/activity+json")) + .insert_header(("Date", HttpDate::from(SystemTime::now()))) + .signature_with_digest( + self.config.clone(), + self.crypto.key_id(), + DigestWrapper(self.crypto.digest()), + activity_string, + { + let sign = self.crypto.signer(); + + move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer) + }, + ) + .await? + .split(); + + let response = req.propagate().send_body(body).await?; + + if !response.status().is_success() { + return Err(AwcError::Status(response.status().as_u16())); + } + + Ok(()) + } +} + +impl<'a, Id, CurrentSession, Crypto> Repo<'a, Id> for AwcClient<'a, CurrentSession, Crypto> +where + Id: Dereference + 'a, + CurrentSession: Session, + Crypto: SignFactory + 'static, +{ + type Error = AwcError>; + type Future = Pin< + Box::Output>, Self::Error>> + 'a>, + >; + + fn fetch(&'a self, id: &'a Id) -> Self::Future { + Box::pin(async move { + if !self.session.should_procede(id.url()) { + return Err(AwcError::Session); + } + + match self.do_fetch(id).await { + Ok(opt) => { + self.session.mark_success(id.url()); + Ok(opt) + } + Err(e) => { + self.session.mark_failure(id.url()); + Err(e) + } + } + }) + } +} + +impl<'a, CurrentSession, Crypto> apub_deliver::Client<'a> for AwcClient<'a, CurrentSession, Crypto> +where + CurrentSession: Session, + Crypto: DigestFactory + SignFactory + 'static, + ::Digest: Clone, +{ + type Error = AwcError>; + type Future = Pin> + 'a>>; + + fn deliver( + &'a self, + inbox: &'a Url, + activity: T, + ) -> Self::Future { + Box::pin(async move { + if !self.session.should_procede(inbox) { + return Err(AwcError::Session); + } + + match self.do_deliver(inbox, activity).await { + Ok(()) => { + self.session.mark_success(inbox); + Ok(()) + } + Err(e) => { + self.session.mark_failure(inbox); + Err(e) + } + } + }) + } +} diff --git a/apub-breaker-session/Cargo.toml b/apub-breaker-session/Cargo.toml new file mode 100644 index 0000000..ebbc926 --- /dev/null +++ b/apub-breaker-session/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "apub-breaker-session" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-session = { version = "0.1.0", path = "../apub-session/" } +dashmap = "4.0.2" +url = "2" diff --git a/apub-breaker-session/src/lib.rs b/apub-breaker-session/src/lib.rs new file mode 100644 index 0000000..ee374fb --- /dev/null +++ b/apub-breaker-session/src/lib.rs @@ -0,0 +1,82 @@ +use apub_session::Session; +use dashmap::DashMap; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use url::{Host, Url}; + +#[derive(Debug)] +struct Breaker { + failure_count: usize, + broken_at: Instant, +} + +#[derive(Clone, Debug)] +pub struct BreakerSession { + limit: usize, + breaker_duration: Duration, + hosts: Arc, Option), Breaker>>, +} + +impl BreakerSession { + pub fn limit(limit: usize, breaker_duration: Duration) -> Self { + Self { + limit, + breaker_duration, + hosts: Arc::new(DashMap::new()), + } + } +} + +impl Session for BreakerSession { + fn should_procede(&self, url: &Url) -> bool { + if let Some(host) = url.host() { + let key = (host.to_owned(), url.port()); + + let mut breaker = self.hosts.entry(key).or_default(); + + if breaker.failure_count < self.limit { + return true; + } + + if Instant::now() > breaker.broken_at + self.breaker_duration { + breaker.failure_count = 0; + } + + false + } else { + true + } + } + + fn mark_success(&self, url: &Url) { + if let Some(host) = url.host() { + let key = (host.to_owned(), url.port()); + let mut breaker = self.hosts.entry(key).or_default(); + + breaker.failure_count = 0; + } + } + + fn mark_failure(&self, url: &Url) { + if let Some(host) = url.host() { + let key = (host.to_owned(), url.port()); + let mut breaker = self.hosts.entry(key).or_default(); + + breaker.failure_count += 1; + if breaker.failure_count >= self.limit { + breaker.broken_at = Instant::now(); + } + } + } +} + +impl Default for Breaker { + fn default() -> Self { + Self { + failure_count: 0, + broken_at: Instant::now(), + } + } +} diff --git a/apub-deliver/Cargo.toml b/apub-deliver/Cargo.toml new file mode 100644 index 0000000..9b167e5 --- /dev/null +++ b/apub-deliver/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "apub-deliver" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = "1" +url = "2" diff --git a/apub-deliver/src/lib.rs b/apub-deliver/src/lib.rs new file mode 100644 index 0000000..e8fc793 --- /dev/null +++ b/apub-deliver/src/lib.rs @@ -0,0 +1,13 @@ +use std::future::Future; +use url::Url; + +pub trait Client<'a> { + type Error: std::error::Error + 'static; + type Future: Future> + 'a; + + fn deliver( + &'a self, + inbox: &'a Url, + activity: T, + ) -> Self::Future; +} diff --git a/apub-deref-client/Cargo.toml b/apub-deref-client/Cargo.toml new file mode 100644 index 0000000..ad72f88 --- /dev/null +++ b/apub-deref-client/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "apub-deref-client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-deref = { version = "0.1.0", path = "../apub-deref/" } +pin-project-lite = "0.2.7" +thiserror = "1" +url = "2" diff --git a/apub-deref-client/src/lib.rs b/apub-deref-client/src/lib.rs new file mode 100644 index 0000000..7c26079 --- /dev/null +++ b/apub-deref-client/src/lib.rs @@ -0,0 +1,141 @@ +use apub_deref::{Dereference, Repo}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pub struct Client { + local_domain: String, + local: Local, + http: Http, +} + +#[derive(Clone, Debug, thiserror::Error)] +pub enum Error { + #[error("{0}")] + Left(#[source] Left), + + #[error("{0}")] + Right(#[source] Right), +} + +type ClientError<'a, Id, Local, Http> = + Error<>::Error, >::Error>; + +impl Client { + pub fn new(local_domain: String, local: Local, http: Http) -> Self { + Client { + local_domain, + local, + http, + } + } + + pub async fn dereference<'a, Id>( + &'a self, + id: &'a Id, + ) -> Result::Output>, ClientError<'a, Id, Local, Http>> + where + Id: Dereference, + Local: Repo<'a, Id> + 'a, + Http: Repo<'a, Id> + 'a, + { + self.fetch(id).await + } +} + +impl<'a, Id, Local, Http> Repo<'a, Id> for Client +where + Id: Dereference + 'a, + Local: Repo<'a, Id> + 'a, + Http: Repo<'a, Id> + 'a, +{ + type Error = ClientError<'a, Id, Local, Http>; + type Future = DereferenceFuture<'a, Id, Local, Http>; + + fn fetch(&'a self, id: &'a Id) -> Self::Future { + DereferenceFuture { + id, + client: self, + state: DereferenceFutureInner::Pending, + } + } +} + +pin_project_lite::pin_project! { + #[project = DereferenceFutureInnerProj] + #[project_replace = DereferenceFutureInnerProjReplace] + enum DereferenceFutureInner<'a, Id: Dereference, Local, Http> + where + Local: Repo<'a, Id>, + Http: Repo<'a, Id>, + { + Pending, + Local { + #[pin] + future: >::Future, + }, + Http { + #[pin] + future: >::Future, + } + } +} + +pin_project_lite::pin_project! { + pub struct DereferenceFuture<'a, Id: Dereference, Local, Http> + where + Local: Repo<'a, Id>, + Http: Repo<'a, Id>, + { + id: &'a Id, + client: &'a Client, + + #[pin] + state: DereferenceFutureInner<'a, Id, Local, Http>, + } +} + +impl<'a, Id, Local, Http> Future for DereferenceFuture<'a, Id, Local, Http> +where + Id: Dereference + 'a, + Local: Repo<'a, Id>, + Http: Repo<'a, Id>, +{ + type Output = Result::Output>, ClientError<'a, Id, Local, Http>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); + + match this.state.as_mut().project() { + DereferenceFutureInnerProj::Pending => { + this.state.project_replace(DereferenceFutureInner::Local { + future: this.client.local.fetch(this.id), + }); + + self.poll(cx) + } + DereferenceFutureInnerProj::Local { future } => match future.poll(cx) { + Poll::Ready(Ok(None)) + if this.id.url().domain() == Some(&this.client.local_domain) => + { + Poll::Ready(Ok(None)) + } + Poll::Ready(Ok(None)) => { + this.state.project_replace(DereferenceFutureInner::Http { + future: this.client.http.fetch(this.id), + }); + + self.poll(cx) + } + Poll::Ready(Ok(opt)) => Poll::Ready(Ok(opt)), + Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Left(e))), + Poll::Pending => Poll::Pending, + }, + DereferenceFutureInnerProj::Http { future } => { + future.poll(cx).map(|res| res.map_err(Error::Right)) + } + } + } +} diff --git a/apub-deref/.gitignore b/apub-deref/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/apub-deref/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/apub-deref/Cargo.toml b/apub-deref/Cargo.toml new file mode 100644 index 0000000..3297ced --- /dev/null +++ b/apub-deref/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "apub-deref" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = "1" +url = "2" diff --git a/apub-deref/src/lib.rs b/apub-deref/src/lib.rs new file mode 100644 index 0000000..4c9596a --- /dev/null +++ b/apub-deref/src/lib.rs @@ -0,0 +1,15 @@ +use std::future::Future; +use url::Url; + +pub trait Repo<'a, D: Dereference> { + type Error: std::error::Error + 'static; + type Future: Future::Output>, Self::Error>> + 'a; + + fn fetch(&'a self, id: &'a D) -> Self::Future; +} + +pub trait Dereference { + type Output: serde::de::DeserializeOwned; + + fn url(&self) -> &Url; +} diff --git a/apub-digest/Cargo.toml b/apub-digest/Cargo.toml new file mode 100644 index 0000000..7168aa7 --- /dev/null +++ b/apub-digest/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "apub-digest" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/apub-digest/src/lib.rs b/apub-digest/src/lib.rs new file mode 100644 index 0000000..8e9a3e9 --- /dev/null +++ b/apub-digest/src/lib.rs @@ -0,0 +1,11 @@ +pub trait Digest { + const NAME: &'static str; + + fn digest(self, input: &[u8]) -> String; +} + +pub trait DigestFactory { + type Digest: Digest + Send; + + fn digest(&self) -> Self::Digest; +} diff --git a/apub-objectid/Cargo.toml b/apub-objectid/Cargo.toml new file mode 100644 index 0000000..3d0350b --- /dev/null +++ b/apub-objectid/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "apub-objectid" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = "1" +url = { version = "2", features = ["serde"] } diff --git a/apub-objectid/src/lib.rs b/apub-objectid/src/lib.rs new file mode 100644 index 0000000..0df78f4 --- /dev/null +++ b/apub-objectid/src/lib.rs @@ -0,0 +1,59 @@ +use std::{ + fmt::Display, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; +use url::Url; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ObjectId { + url: Url, + _kind: PhantomData Kind>, +} + +impl ObjectId { + pub fn new(url: Url) -> Self { + Self { + url, + _kind: PhantomData, + } + } +} + +impl Deref for ObjectId { + type Target = Url; + + fn deref(&self) -> &Self::Target { + &self.url + } +} + +impl DerefMut for ObjectId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.url + } +} + +impl Display for ObjectId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.url.fmt(f) + } +} + +impl serde::ser::Serialize for ObjectId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.url.serialize(serializer) + } +} + +impl<'de, Kind> serde::de::Deserialize<'de> for ObjectId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Url::deserialize(deserializer).map(ObjectId::new) + } +} diff --git a/apub-openssl/Cargo.toml b/apub-openssl/Cargo.toml new file mode 100644 index 0000000..5dcb1e6 --- /dev/null +++ b/apub-openssl/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "apub-openssl" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-digest = { version = "0.1.0", path = "../apub-digest/" } +apub-signer = { version = "0.1.0", path = "../apub-signer/" } +openssl = "0.10.36" diff --git a/apub-openssl/src/lib.rs b/apub-openssl/src/lib.rs new file mode 100644 index 0000000..cd85946 --- /dev/null +++ b/apub-openssl/src/lib.rs @@ -0,0 +1,103 @@ +use openssl::{ + error::ErrorStack, + hash::MessageDigest, + pkey::{PKey, Private}, + sha::Sha256, + sign::Signer, +}; +use std::fmt::Debug; + +#[derive(Clone)] +pub struct OpenSslDigest { + digest: Sha256, +} + +pub struct OpenSslSigner { + private_key: PKey, +} + +pub struct OpenSsl { + key_id: String, + private_key: PKey, +} + +impl OpenSsl { + pub fn new(key_id: String, private_key: PKey) -> Self { + Self { + key_id, + private_key, + } + } +} + +impl<'a> apub_digest::Digest for OpenSslDigest { + const NAME: &'static str = "SHA-256"; + + fn digest(mut self, input: &[u8]) -> String { + self.digest.update(input); + let bytes = self.digest.finish(); + + openssl::base64::encode_block(&bytes) + } +} + +impl apub_signer::Sign for OpenSslSigner { + type Error = ErrorStack; + + fn sign(&self, signing_string: &str) -> Result { + let mut signer = Signer::new(MessageDigest::sha256(), &self.private_key)?; + signer.update(signing_string.as_bytes())?; + + Ok(openssl::base64::encode_block(&signer.sign_to_vec()?)) + } +} + +impl apub_digest::DigestFactory for OpenSsl { + type Digest = OpenSslDigest; + + fn digest(&self) -> Self::Digest { + OpenSslDigest { + digest: Sha256::new(), + } + } +} + +impl apub_signer::SignFactory for OpenSsl { + type Signer = OpenSslSigner; + type KeyId = String; + + fn key_id(&self) -> Self::KeyId { + self.key_id.clone() + } + + fn signer(&self) -> Self::Signer { + OpenSslSigner { + private_key: self.private_key.clone(), + } + } +} + +impl Debug for OpenSslDigest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenSslDigest") + .field("digest", &"Sha256") + .finish() + } +} + +impl Debug for OpenSslSigner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenSslSigner") + .field("private_key", &"hidden") + .finish() + } +} + +impl Debug for OpenSsl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenSsl") + .field("key_id", &self.key_id) + .field("private_key", &"hidden") + .finish() + } +} diff --git a/apub-reqwest/Cargo.toml b/apub-reqwest/Cargo.toml new file mode 100644 index 0000000..4f33a5d --- /dev/null +++ b/apub-reqwest/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "apub-reqwest" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-deliver = { version = "0.1.0", path = "../apub-deliver/" } +apub-deref = { version = "0.1.0", path = "../apub-deref/" } +apub-digest = { version = "0.1.0", path = "../apub-digest/" } +apub-session = { version = "0.1.0", path = "../apub-session/" } +apub-signer = { version = "0.1.0", path = "../apub-signer/" } +http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["digest"] } +httpdate = "1.0.2" +reqwest = { version = "0.11.6", default-features = false, features = ["json"] } +serde = "1" +serde_json = "1" +thiserror = "1" +url = "2" diff --git a/apub-reqwest/src/lib.rs b/apub-reqwest/src/lib.rs new file mode 100644 index 0000000..b5022b6 --- /dev/null +++ b/apub-reqwest/src/lib.rs @@ -0,0 +1,214 @@ +use apub_deref::{Dereference, Repo}; +use apub_digest::{Digest, DigestFactory}; +use apub_session::Session; +use apub_signer::{Sign, SignFactory}; +use http_signature_normalization_reqwest::{ + digest::{DigestCreate, SignExt}, + prelude::{Config, Sign as _, SignError}, +}; +use reqwest::{ + header::{ACCEPT, CONTENT_TYPE, DATE}, + Client, +}; +use std::{future::Future, pin::Pin, time::SystemTime}; +use url::Url; + +pub struct ReqwestClient<'a, Session, Crypto> { + client: &'a Client, + session: &'a Session, + config: &'a Config, + crypto: Crypto, +} + +#[derive(Debug, thiserror::Error)] +pub enum SignatureError { + #[error(transparent)] + Sign(#[from] SignError), + + #[error(transparent)] + Reqweset(#[from] reqwest::Error), + + #[error(transparent)] + Signer(E), +} + +#[derive(Debug, thiserror::Error)] +pub enum ReqwestError { + #[error("Session indicated request should not procede")] + Session, + + #[error(transparent)] + Reqweset(#[from] reqwest::Error), + + #[error(transparent)] + Json(#[from] serde_json::Error), + + #[error("Invalid response code: {0}")] + Status(u16), + + #[error(transparent)] + SignatureError(#[from] SignatureError), +} + +type SignTraitError = <::Signer as Sign>::Error; + +struct DigestWrapper(D); + +impl DigestCreate for DigestWrapper +where + D: Digest + Clone, +{ + const NAME: &'static str = D::NAME; + + fn compute(&mut self, input: &[u8]) -> String { + self.0.clone().digest(input) + } +} + +impl<'a, CurrentSession, Crypto> ReqwestClient<'a, CurrentSession, Crypto> +where + CurrentSession: Session, + Crypto: SignFactory + 'static, +{ + pub fn new( + client: &'a Client, + session: &'a CurrentSession, + config: &'a Config, + crypto: Crypto, + ) -> Self { + Self { + client, + session, + config, + crypto, + } + } + + async fn do_fetch( + &self, + id: &Id, + ) -> Result::Output>, ReqwestError>> + where + Id: Dereference, + { + let response = self + .client + .get(id.url().as_str()) + .header(ACCEPT, "application/activity+json") + .header(DATE, httpdate::fmt_http_date(SystemTime::now())) + .signature(self.config, self.crypto.key_id(), { + let sign = self.crypto.signer(); + + move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer) + })? + .send() + .await?; + + Ok(Some(response.json().await?)) + } + + async fn do_deliver( + &self, + url: &Url, + activity: T, + ) -> Result<(), ReqwestError>> + where + Crypto: DigestFactory, + ::Digest: Clone, + { + let activity_string = serde_json::to_string(&activity)?; + + let response = self + .client + .post(url.as_str()) + .header(CONTENT_TYPE, "application/activity+json") + .header(ACCEPT, "application/activity+json") + .header(DATE, httpdate::fmt_http_date(SystemTime::now())) + .signature_with_digest( + self.config.clone(), + self.crypto.key_id(), + DigestWrapper(self.crypto.digest()), + activity_string, + { + let sign = self.crypto.signer(); + + move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer) + }, + ) + .await?; + + if !response.status().is_success() { + return Err(ReqwestError::Status(response.status().as_u16())); + } + + Ok(()) + } +} + +impl<'a, Id, CurrentSession, Crypto> Repo<'a, Id> for ReqwestClient<'a, CurrentSession, Crypto> +where + Id: Dereference + Send + Sync, + ::Output: 'static, + CurrentSession: Session + Send + Sync, + Crypto: SignFactory + Send + Sync + 'static, + ::KeyId: Send, +{ + type Error = ReqwestError>; + type Future = Pin< + Box< + dyn Future::Output>, Self::Error>> + + Send + + 'a, + >, + >; + + fn fetch(&'a self, id: &'a Id) -> Self::Future { + Box::pin(async move { + if !self.session.should_procede(id.url()) { + return Err(ReqwestError::Session); + } + + match self.do_fetch(id).await { + Ok(opt) => { + self.session.mark_success(id.url()); + Ok(opt) + } + Err(e) => { + self.session.mark_failure(id.url()); + Err(e) + } + } + }) + } +} + +impl<'a, CurrentSession, Crypto> apub_deliver::Client<'a> + for ReqwestClient<'a, CurrentSession, Crypto> +where + CurrentSession: Session + Send + Sync, + Crypto: DigestFactory + SignFactory + Send + Sync + 'static, + ::KeyId: Send, + ::Digest: Clone, +{ + type Error = ReqwestError>; + type Future = Pin> + 'a>>; + + fn deliver(&'a self, url: &'a Url, activity: T) -> Self::Future { + Box::pin(async move { + if !self.session.should_procede(url) { + return Err(ReqwestError::Session); + } + + match self.do_deliver(url, activity).await { + Ok(opt) => { + self.session.mark_success(url); + Ok(opt) + } + Err(e) => { + self.session.mark_failure(url); + Err(e) + } + } + }) + } +} diff --git a/apub-rustcrypto/Cargo.toml b/apub-rustcrypto/Cargo.toml new file mode 100644 index 0000000..5dffbd6 --- /dev/null +++ b/apub-rustcrypto/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "apub-rustcrypto" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +base64 = "0.13.0" +rsa = "0.5.0" +sha2 = "0.9.0" +apub-digest = { version = "0.1.0", path = "../apub-digest/" } +apub-signer = { version = "0.1.0", path = "../apub-signer" } diff --git a/apub-rustcrypto/src/lib.rs b/apub-rustcrypto/src/lib.rs new file mode 100644 index 0000000..d767e7e --- /dev/null +++ b/apub-rustcrypto/src/lib.rs @@ -0,0 +1,94 @@ +use rsa::{hash::Hash, PaddingScheme, RsaPrivateKey}; +use sha2::{Digest, Sha256}; +use std::fmt::Debug; + +#[derive(Debug, Clone)] +pub struct Sha256Digest { + digest: Sha256, +} + +pub struct RsaSigner { + private_key: RsaPrivateKey, +} + +pub struct Rustcrypto { + key_id: String, + private_key: RsaPrivateKey, +} + +impl Rustcrypto { + pub fn new(key_id: String, private_key: RsaPrivateKey) -> Self { + Self { + key_id, + private_key, + } + } +} + +impl apub_digest::Digest for Sha256Digest { + const NAME: &'static str = "SHA-256"; + + fn digest(mut self, input: &[u8]) -> String { + self.digest.update(input); + let bytes = self.digest.finalize(); + + base64::encode(&bytes) + } +} + +impl apub_signer::Sign for RsaSigner { + type Error = rsa::errors::Error; + + fn sign(&self, signing_string: &str) -> Result { + let hashed = Sha256::digest(signing_string.as_bytes()); + let bytes = self.private_key.sign( + PaddingScheme::PKCS1v15Sign { + hash: Some(Hash::SHA2_256), + }, + &hashed, + )?; + Ok(base64::encode(bytes)) + } +} + +impl apub_digest::DigestFactory for Rustcrypto { + type Digest = Sha256Digest; + + fn digest(&self) -> Self::Digest { + Sha256Digest { + digest: Sha256::new(), + } + } +} + +impl apub_signer::SignFactory for Rustcrypto { + type Signer = RsaSigner; + type KeyId = String; + + fn key_id(&self) -> Self::KeyId { + self.key_id.clone() + } + + fn signer(&self) -> Self::Signer { + RsaSigner { + private_key: self.private_key.clone(), + } + } +} + +impl Debug for RsaSigner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RsaSigner") + .field("private_key", &"hidden") + .finish() + } +} + +impl Debug for Rustcrypto { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Rustcrypto") + .field("key_id", &self.key_id) + .field("private_key", &"hidden") + .finish() + } +} diff --git a/apub-session/Cargo.toml b/apub-session/Cargo.toml new file mode 100644 index 0000000..6fbe33c --- /dev/null +++ b/apub-session/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "apub-session" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +url = "2" diff --git a/apub-session/src/lib.rs b/apub-session/src/lib.rs new file mode 100644 index 0000000..215bbc3 --- /dev/null +++ b/apub-session/src/lib.rs @@ -0,0 +1,277 @@ +use std::{ + rc::Rc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use url::Url; + +pub trait Session { + fn should_procede(&self, url: &Url) -> bool; + + fn mark_success(&self, url: &Url); + fn mark_failure(&self, url: &Url); +} + +pub struct RequestCountSession { + count: AtomicUsize, + max_requests: usize, +} + +impl RequestCountSession { + pub fn max(max_requests: usize) -> Self { + Self { + count: AtomicUsize::new(0), + max_requests, + } + } +} + +impl Session for RequestCountSession { + fn should_procede(&self, _: &Url) -> bool { + self.count.load(Ordering::Acquire) < self.max_requests + } + + fn mark_success(&self, _: &Url) { + self.count.fetch_add(1, Ordering::AcqRel); + } + + fn mark_failure(&self, _: &Url) { + self.count.fetch_add(1, Ordering::AcqRel); + } +} + +impl Session for Arc +where + T: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + T::should_procede(&self, url) + } + + fn mark_success(&self, url: &Url) { + T::mark_success(&self, url) + } + + fn mark_failure(&self, url: &Url) { + T::mark_failure(&self, url) + } +} + +impl Session for Rc +where + T: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + T::should_procede(&self, url) + } + + fn mark_success(&self, url: &Url) { + T::mark_success(&self, url) + } + + fn mark_failure(&self, url: &Url) { + T::mark_failure(&self, url) + } +} + +impl Session for Box +where + T: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + T::should_procede(&self, url) + } + + fn mark_success(&self, url: &Url) { + T::mark_success(&self, url) + } + + fn mark_failure(&self, url: &Url) { + T::mark_failure(&self, url) + } +} + +impl Session for (T, U) +where + T: Session, + U: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) && self.1.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + } +} + +impl Session for (T, U, V) +where + T: Session, + U: Session, + V: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) && self.1.should_procede(url) && self.2.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + self.2.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + self.2.mark_failure(url); + } +} + +impl Session for (T, U, V, W) +where + T: Session, + U: Session, + V: Session, + W: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) + && self.1.should_procede(url) + && self.2.should_procede(url) + && self.3.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + self.2.mark_success(url); + self.3.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + self.2.mark_failure(url); + self.3.mark_failure(url); + } +} + +impl Session for (T, U, V, W, X) +where + T: Session, + U: Session, + V: Session, + W: Session, + X: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) + && self.1.should_procede(url) + && self.2.should_procede(url) + && self.3.should_procede(url) + && self.4.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + self.2.mark_success(url); + self.3.mark_success(url); + self.4.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + self.2.mark_failure(url); + self.3.mark_failure(url); + self.4.mark_failure(url); + } +} + +impl Session for (T, U, V, W, X, Y) +where + T: Session, + U: Session, + V: Session, + W: Session, + X: Session, + Y: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) + && self.1.should_procede(url) + && self.2.should_procede(url) + && self.3.should_procede(url) + && self.4.should_procede(url) + && self.5.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + self.2.mark_success(url); + self.3.mark_success(url); + self.4.mark_success(url); + self.5.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + self.2.mark_failure(url); + self.3.mark_failure(url); + self.4.mark_failure(url); + self.5.mark_failure(url); + } +} + +impl Session for (T, U, V, W, X, Y, Z) +where + T: Session, + U: Session, + V: Session, + W: Session, + X: Session, + Y: Session, + Z: Session, +{ + fn should_procede(&self, url: &Url) -> bool { + self.0.should_procede(url) + && self.1.should_procede(url) + && self.2.should_procede(url) + && self.3.should_procede(url) + && self.4.should_procede(url) + && self.5.should_procede(url) + && self.6.should_procede(url) + } + + fn mark_success(&self, url: &Url) { + self.0.mark_success(url); + self.1.mark_success(url); + self.2.mark_success(url); + self.3.mark_success(url); + self.4.mark_success(url); + self.5.mark_success(url); + self.6.mark_success(url); + } + + fn mark_failure(&self, url: &Url) { + self.0.mark_failure(url); + self.1.mark_failure(url); + self.2.mark_failure(url); + self.3.mark_failure(url); + self.4.mark_failure(url); + self.5.mark_failure(url); + self.6.mark_failure(url); + } +} diff --git a/apub-signer/Cargo.toml b/apub-signer/Cargo.toml new file mode 100644 index 0000000..1b034b2 --- /dev/null +++ b/apub-signer/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "apub-signer" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/apub-signer/src/lib.rs b/apub-signer/src/lib.rs new file mode 100644 index 0000000..03533bc --- /dev/null +++ b/apub-signer/src/lib.rs @@ -0,0 +1,15 @@ +use std::fmt::Display; + +pub trait Sign { + type Error: std::error::Error + Send; + + fn sign(&self, signing_string: &str) -> Result; +} + +pub trait SignFactory { + type Signer: Sign + Send + 'static; + type KeyId: Display + 'static; + + fn key_id(&self) -> Self::KeyId; + fn signer(&self) -> Self::Signer; +} diff --git a/examples/awc-example/Cargo.toml b/examples/awc-example/Cargo.toml new file mode 100644 index 0000000..971e338 --- /dev/null +++ b/examples/awc-example/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "awc-example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-rt = "2.4.0" +apub-awc = { version = "0.1.0", path = "../../apub-awc/" } +apub-breaker-session = { version = "0.1.0", path = "../../apub-breaker-session/" } +apub-session = { version = "0.1.0", path = "../../apub-session/" } +apub-rustcrypto = { version = "0.1.0", path = "../../apub-rustcrypto/" } +awc = { version = "3.0.0-beta.10", default-features = false, features = ["rustls"] } +example-types = { version = "0.1.0", path = "../example-types/" } +http-signature-normalization-actix = { version = "0.5.0-beta.12", default-features = false, features = ["client"] } +rand = "0.8.4" +rsa = "0.5.0" +serde = { version = "1", features = ["derive"] } diff --git a/examples/awc-example/src/main.rs b/examples/awc-example/src/main.rs new file mode 100644 index 0000000..e45f562 --- /dev/null +++ b/examples/awc-example/src/main.rs @@ -0,0 +1,35 @@ +use apub_awc::AwcClient; +use apub_breaker_session::BreakerSession; +use apub_rustcrypto::Rustcrypto; +use apub_session::RequestCountSession; +use example_types::{object_id, NoteType, ObjectId}; +use http_signature_normalization_actix::Config; +use rsa::RsaPrivateKey; +use std::time::Duration; + +#[actix_rt::main] +async fn main() -> Result<(), Box> { + let private_key = RsaPrivateKey::new(&mut rand::thread_rng(), 1024)?; + let crypto = Rustcrypto::new("key-id".to_string(), private_key); + let config = Config::default(); + + let breakers = BreakerSession::limit(10, Duration::from_secs(60 * 60)); + + let session = (RequestCountSession::max(30), breakers); + + let client = awc::Client::new(); + + let awc_client = AwcClient::new(&client, &session, config, crypto); + + let id: ObjectId = + object_id("https://masto.asonix.dog/users/asonix/statuses/107289461429162429".parse()?); + + if let Some(note) = id.dereference(&awc_client).await? { + println!("id: {}, content: {}", note.id, note.content); + + let inbox = "https://masto.asonix.dog/inbox".parse()?; + note.deliver(&inbox, &awc_client).await?; + } + + Ok(()) +} diff --git a/examples/example-types/Cargo.toml b/examples/example-types/Cargo.toml new file mode 100644 index 0000000..4a164e6 --- /dev/null +++ b/examples/example-types/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "example-types" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-deliver = { version = "0.1.0", path = "../../apub-deliver/" } +apub-deref = { version = "0.1.0", path = "../../apub-deref/" } +apub-objectid = { version = "0.1.0", path = "../../apub-objectid/" } +serde = { version = "1", features = ["derive"] } +url = { version = "2", features = ["serde"] } diff --git a/examples/example-types/src/lib.rs b/examples/example-types/src/lib.rs new file mode 100644 index 0000000..174c788 --- /dev/null +++ b/examples/example-types/src/lib.rs @@ -0,0 +1,89 @@ +use apub_deliver::Client; +use apub_deref::{Dereference, Repo}; +use std::{ + fmt::Display, + ops::{Deref, DerefMut}, +}; +use url::Url; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(transparent)] +pub struct ObjectId(apub_objectid::ObjectId); + +pub fn object_id(id: Url) -> ObjectId +where + ObjectId: Dereference, +{ + ObjectId(apub_objectid::ObjectId::new(id)) +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum NoteType { + Note, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct Note { + pub id: ObjectId, + + #[serde(rename = "type")] + pub kind: NoteType, + + pub content: String, +} + +impl Deref for ObjectId { + type Target = Url; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ObjectId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Display for ObjectId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl Dereference for ObjectId { + type Output = Note; + + fn url(&self) -> &Url { + &self + } +} + +impl ObjectId +where + Self: Dereference, +{ + pub async fn dereference<'a, R>( + &'a self, + repo: &'a R, + ) -> Result::Output>, Box> + where + R: Repo<'a, Self>, + { + Ok(repo.fetch(self).await?) + } +} + +impl Note { + pub async fn deliver<'a, C>( + &'a self, + inbox: &'a Url, + client: &'a C, + ) -> Result<(), Box> + where + C: Client<'a>, + { + Ok(client.deliver(inbox, self).await?) + } +} diff --git a/examples/reqwest-example/Cargo.toml b/examples/reqwest-example/Cargo.toml new file mode 100644 index 0000000..0ac207a --- /dev/null +++ b/examples/reqwest-example/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "reqwest-example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +apub-breaker-session = { version = "0.1.0", path = "../../apub-breaker-session/" } +apub-session = { version = "0.1.0", path = "../../apub-session/" } +apub-reqwest = { version = "0.1.0", path = "../../apub-reqwest/" } +apub-openssl = { version = "0.1.0", path = "../../apub-openssl/" } +example-types = { version = "0.1.0", path = "../example-types/" } +http-signature-normalization-reqwest = "0.2.0" +openssl = "0.10.13" +reqwest = "0.11.6" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1.14.0", features = ["full"] } diff --git a/examples/reqwest-example/src/main.rs b/examples/reqwest-example/src/main.rs new file mode 100644 index 0000000..003acc9 --- /dev/null +++ b/examples/reqwest-example/src/main.rs @@ -0,0 +1,34 @@ +use apub_breaker_session::BreakerSession; +use apub_openssl::OpenSsl; +use apub_reqwest::ReqwestClient; +use apub_session::RequestCountSession; +use example_types::{object_id, NoteType, ObjectId}; +use http_signature_normalization_reqwest::Config; +use openssl::{pkey::PKey, rsa::Rsa}; +use std::time::Duration; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let private_key = PKey::from_rsa(Rsa::generate(1024)?)?; + let crypto = OpenSsl::new("key-id".to_string(), private_key); + let config = Config::default(); + + let breakers = BreakerSession::limit(10, Duration::from_secs(60 * 60)); + + let session = (RequestCountSession::max(30), breakers); + let client = reqwest::Client::new(); + + let reqwest_client = ReqwestClient::new(&client, &session, &config, crypto); + + let id: ObjectId = + object_id("https://masto.asonix.dog/users/asonix/statuses/107289461429162429".parse()?); + + if let Some(note) = id.dereference(&reqwest_client).await? { + println!("id: {}, content: {}", note.id, note.content); + + let inbox = "https://masto.asonix.dog/inbox".parse()?; + note.deliver(&inbox, &reqwest_client).await?; + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e69de29