apub/apub-reqwest/src/lib.rs
2021-11-17 22:17:36 -06:00

215 lines
5.9 KiB
Rust

use apub_deref::{Dereference, Repo};
use apub_digest::{Digest, DigestFactory};
use apub_session::Session;
use apub_signer::{Sign, SignFactory};
use http_signature_normalization_reqwest::{
digest::{DigestCreate, SignExt},
prelude::{Config, Sign as _, SignError},
};
use reqwest::{
header::{ACCEPT, CONTENT_TYPE, DATE},
Client,
};
use std::{future::Future, pin::Pin, time::SystemTime};
use url::Url;
pub struct ReqwestClient<'a, Session, Crypto> {
client: &'a Client,
session: &'a Session,
config: &'a Config,
crypto: Crypto,
}
#[derive(Debug, thiserror::Error)]
pub enum SignatureError<E: std::error::Error + Send> {
#[error(transparent)]
Sign(#[from] SignError),
#[error(transparent)]
Reqweset(#[from] reqwest::Error),
#[error(transparent)]
Signer(E),
}
#[derive(Debug, thiserror::Error)]
pub enum ReqwestError<E: std::error::Error + Send> {
#[error("Session indicated request should not procede")]
Session,
#[error(transparent)]
Reqweset(#[from] reqwest::Error),
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error("Invalid response code: {0}")]
Status(u16),
#[error(transparent)]
SignatureError(#[from] SignatureError<E>),
}
type SignTraitError<S> = <<S as SignFactory>::Signer as Sign>::Error;
struct DigestWrapper<D>(D);
impl<D> DigestCreate for DigestWrapper<D>
where
D: Digest + Clone,
{
const NAME: &'static str = D::NAME;
fn compute(&mut self, input: &[u8]) -> String {
self.0.clone().digest(input)
}
}
impl<'a, CurrentSession, Crypto> ReqwestClient<'a, CurrentSession, Crypto>
where
CurrentSession: Session,
Crypto: SignFactory + 'static,
{
pub fn new(
client: &'a Client,
session: &'a CurrentSession,
config: &'a Config,
crypto: Crypto,
) -> Self {
Self {
client,
session,
config,
crypto,
}
}
async fn do_fetch<Id>(
&self,
id: &Id,
) -> Result<Option<<Id as Dereference>::Output>, ReqwestError<SignTraitError<Crypto>>>
where
Id: Dereference,
{
let response = self
.client
.get(id.url().as_str())
.header(ACCEPT, "application/activity+json")
.header(DATE, httpdate::fmt_http_date(SystemTime::now()))
.signature(self.config, self.crypto.key_id(), {
let sign = self.crypto.signer();
move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer)
})?
.send()
.await?;
Ok(Some(response.json().await?))
}
async fn do_deliver<T: serde::ser::Serialize>(
&self,
url: &Url,
activity: T,
) -> Result<(), ReqwestError<SignTraitError<Crypto>>>
where
Crypto: DigestFactory,
<Crypto as DigestFactory>::Digest: Clone,
{
let activity_string = serde_json::to_string(&activity)?;
let response = self
.client
.post(url.as_str())
.header(CONTENT_TYPE, "application/activity+json")
.header(ACCEPT, "application/activity+json")
.header(DATE, httpdate::fmt_http_date(SystemTime::now()))
.signature_with_digest(
self.config.clone(),
self.crypto.key_id(),
DigestWrapper(self.crypto.digest()),
activity_string,
{
let sign = self.crypto.signer();
move |signing_string| sign.sign(signing_string).map_err(SignatureError::Signer)
},
)
.await?;
if !response.status().is_success() {
return Err(ReqwestError::Status(response.status().as_u16()));
}
Ok(())
}
}
impl<'a, Id, CurrentSession, Crypto> Repo<'a, Id> for ReqwestClient<'a, CurrentSession, Crypto>
where
Id: Dereference + Send + Sync,
<Id as Dereference>::Output: 'static,
CurrentSession: Session + Send + Sync,
Crypto: SignFactory + Send + Sync + 'static,
<Crypto as SignFactory>::KeyId: Send,
{
type Error = ReqwestError<SignTraitError<Crypto>>;
type Future = Pin<
Box<
dyn Future<Output = Result<Option<<Id as Dereference>::Output>, Self::Error>>
+ Send
+ 'a,
>,
>;
fn fetch(&'a self, id: &'a Id) -> Self::Future {
Box::pin(async move {
if !self.session.should_procede(id.url()) {
return Err(ReqwestError::Session);
}
match self.do_fetch(id).await {
Ok(opt) => {
self.session.mark_success(id.url());
Ok(opt)
}
Err(e) => {
self.session.mark_failure(id.url());
Err(e)
}
}
})
}
}
impl<'a, CurrentSession, Crypto> apub_deliver::Client<'a>
for ReqwestClient<'a, CurrentSession, Crypto>
where
CurrentSession: Session + Send + Sync,
Crypto: DigestFactory + SignFactory + Send + Sync + 'static,
<Crypto as SignFactory>::KeyId: Send,
<Crypto as DigestFactory>::Digest: Clone,
{
type Error = ReqwestError<SignTraitError<Crypto>>;
type Future = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + 'a>>;
fn deliver<T: serde::ser::Serialize + 'a>(&'a self, url: &'a Url, activity: T) -> Self::Future {
Box::pin(async move {
if !self.session.should_procede(url) {
return Err(ReqwestError::Session);
}
match self.do_deliver(url, activity).await {
Ok(opt) => {
self.session.mark_success(url);
Ok(opt)
}
Err(e) => {
self.session.mark_failure(url);
Err(e)
}
}
})
}
}