From 9fc507210a33474f11f045afa4c3a9626425176f Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 17 Mar 2020 12:15:16 -0500 Subject: [PATCH] Move requests to requests module, respond with 200 --- src/inbox.rs | 124 ++++-------------------------------------------- src/main.rs | 16 ++++++- src/requests.rs | 108 +++++++++++++++++++++++++++++++++++++++++ src/verifier.rs | 4 +- 4 files changed, 132 insertions(+), 120 deletions(-) create mode 100644 src/requests.rs diff --git a/src/inbox.rs b/src/inbox.rs index 0ceaa5c..6b2c0fd 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -1,7 +1,10 @@ use crate::{ apub::{AcceptedActors, AcceptedObjects, ValidTypes}, + db::{add_listener, remove_listener}, db_actor::{DbActor, DbQuery, Pool}, error::MyError, + requests::{deliver, deliver_many, fetch_actor}, + response, state::{State, UrlKind}, }; use activitystreams::{ @@ -41,15 +44,6 @@ pub async fn inbox( } } -pub fn response(item: T) -> HttpResponse -where - T: serde::ser::Serialize, -{ - HttpResponse::Accepted() - .content_type("application/activity+json") - .json(item) -} - async fn handle_undo( db_actor: web::Data>, state: web::Data, @@ -69,12 +63,10 @@ async fn handle_undo( async move { let conn = pool.get().await?; - crate::db::remove_listener(&conn, &inbox) - .await - .map_err(|e| { - error!("Error removing listener, {}", e); - e - }) + remove_listener(&conn, &inbox).await.map_err(|e| { + error!("Error removing listener, {}", e); + e + }) } })); @@ -189,7 +181,7 @@ async fn handle_follow( async move { let conn = pool.get().await?; - crate::db::add_listener(&conn, &inbox).await.map_err(|e| { + add_listener(&conn, &inbox).await.map_err(|e| { error!("Error adding listener, {}", e); e }) @@ -225,106 +217,6 @@ async fn handle_follow( Ok(response(accept)) } -pub async fn fetch_actor( - state: std::sync::Arc, - client: std::sync::Arc, - actor_id: &XsdAnyUri, -) -> Result { - if let Some(actor) = state.get_actor(actor_id).await { - return Ok(actor); - } - - let actor: AcceptedActors = client - .get(actor_id.as_str()) - .header("Accept", "application/activity+json") - .send() - .await - .map_err(|e| { - error!("Couldn't send request to {} for actor, {}", actor_id, e); - MyError::SendRequest - })? - .json() - .await - .map_err(|e| { - error!("Coudn't fetch actor from {}, {}", actor_id, e); - MyError::ReceiveResponse - })?; - - state.cache_actor(actor_id.to_owned(), actor.clone()).await; - - Ok(actor) -} - -fn deliver_many( - state: web::Data, - client: web::Data, - inboxes: Vec, - item: T, -) where - T: serde::ser::Serialize + 'static, -{ - let client = client.into_inner(); - let state = state.into_inner(); - - actix::Arbiter::spawn(async move { - use futures::stream::StreamExt; - - let mut unordered = futures::stream::FuturesUnordered::new(); - - for inbox in inboxes { - unordered.push(deliver(&state, &client, inbox, &item)); - } - - while let Some(_) = unordered.next().await {} - }); -} - -async fn deliver( - state: &std::sync::Arc, - client: &std::sync::Arc, - inbox: XsdAnyUri, - item: &T, -) -> Result<(), MyError> -where - T: serde::ser::Serialize, -{ - use http_signature_normalization_actix::prelude::*; - use sha2::{Digest, Sha256}; - - let config = Config::default(); - let mut digest = Sha256::new(); - - let key_id = state.generate_url(UrlKind::Actor); - - let item_string = serde_json::to_string(item)?; - - let res = client - .post(inbox.as_str()) - .header("Accept", "application/activity+json") - .header("Content-Type", "application/activity+json") - .header("User-Agent", "Aode Relay v0.1.0") - .signature_with_digest( - &config, - &key_id, - &mut digest, - item_string, - |signing_string| state.sign(signing_string), - )? - .send() - .await - .map_err(|e| { - error!("Couldn't send deliver request to {}, {}", inbox, e); - MyError::SendRequest - })?; - - if !res.status().is_success() { - error!("Invalid response status from {}, {}", inbox, res.status()); - return Err(MyError::Status); - } - - Ok(()) -} - async fn get_inboxes( state: &web::Data, actor: &AcceptedActors, diff --git a/src/main.rs b/src/main.rs index b805e34..69542d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties}; -use actix_web::{client::Client, middleware::Logger, web, App, HttpServer, Responder}; +use actix_web::{ + client::Client, middleware::Logger, web, App, HttpResponse, HttpServer, Responder, +}; use bb8_postgres::tokio_postgres; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use rsa_pem::KeyExt; @@ -12,6 +14,7 @@ mod error; mod inbox; mod label; mod notify; +mod requests; mod state; mod verifier; mod webfinger; @@ -26,6 +29,15 @@ use self::{ webfinger::RelayResolver, }; +pub fn response(item: T) -> HttpResponse +where + T: serde::ser::Serialize, +{ + HttpResponse::Ok() + .content_type("application/activity+json") + .json(item) +} + async fn index() -> impl Responder { "hewwo, mr obama" } @@ -59,7 +71,7 @@ async fn actor_route(state: web::Data) -> Result public_key_pem: state.settings.public_key.to_pem_pkcs8()?, }; - Ok(inbox::response(public_key.extend(application))) + Ok(response(public_key.extend(application))) } #[actix_rt::main] diff --git a/src/requests.rs b/src/requests.rs new file mode 100644 index 0000000..36865ae --- /dev/null +++ b/src/requests.rs @@ -0,0 +1,108 @@ +use crate::{ + apub::AcceptedActors, + error::MyError, + state::{State, UrlKind}, +}; +use activitystreams::primitives::XsdAnyUri; +use actix_web::{client::Client, web}; +use log::error; + +pub async fn fetch_actor( + state: std::sync::Arc, + client: std::sync::Arc, + actor_id: &XsdAnyUri, +) -> Result { + if let Some(actor) = state.get_actor(actor_id).await { + return Ok(actor); + } + + let actor: AcceptedActors = client + .get(actor_id.as_str()) + .header("Accept", "application/activity+json") + .send() + .await + .map_err(|e| { + error!("Couldn't send request to {} for actor, {}", actor_id, e); + MyError::SendRequest + })? + .json() + .await + .map_err(|e| { + error!("Coudn't fetch actor from {}, {}", actor_id, e); + MyError::ReceiveResponse + })?; + + state.cache_actor(actor_id.to_owned(), actor.clone()).await; + + Ok(actor) +} + +pub fn deliver_many( + state: web::Data, + client: web::Data, + inboxes: Vec, + item: T, +) where + T: serde::ser::Serialize + 'static, +{ + let client = client.into_inner(); + let state = state.into_inner(); + + actix::Arbiter::spawn(async move { + use futures::stream::StreamExt; + + let mut unordered = futures::stream::FuturesUnordered::new(); + + for inbox in inboxes { + unordered.push(deliver(&state, &client, inbox, &item)); + } + + while let Some(_) = unordered.next().await {} + }); +} + +pub async fn deliver( + state: &std::sync::Arc, + client: &std::sync::Arc, + inbox: XsdAnyUri, + item: &T, +) -> Result<(), MyError> +where + T: serde::ser::Serialize, +{ + use http_signature_normalization_actix::prelude::*; + use sha2::{Digest, Sha256}; + + let config = Config::default(); + let mut digest = Sha256::new(); + + let key_id = state.generate_url(UrlKind::Actor); + + let item_string = serde_json::to_string(item)?; + + let res = client + .post(inbox.as_str()) + .header("Accept", "application/activity+json") + .header("Content-Type", "application/activity+json") + .header("User-Agent", "Aode Relay v0.1.0") + .signature_with_digest( + &config, + &key_id, + &mut digest, + item_string, + |signing_string| state.sign(signing_string), + )? + .send() + .await + .map_err(|e| { + error!("Couldn't send deliver request to {}, {}", inbox, e); + MyError::SendRequest + })?; + + if !res.status().is_success() { + error!("Invalid response status from {}, {}", inbox, res.status()); + return Err(MyError::Status); + } + + Ok(()) +} diff --git a/src/verifier.rs b/src/verifier.rs index dd707cd..582b844 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,4 +1,4 @@ -use crate::{error::MyError, state::State}; +use crate::{error::MyError, requests::fetch_actor, state::State}; use actix_web::client::Client; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey}; @@ -28,7 +28,7 @@ impl SignatureVerify for MyVerify { let client = Arc::new(self.1.clone()); Box::pin(async move { - let actor = crate::inbox::fetch_actor(state, client, &key_id.parse()?).await?; + let actor = fetch_actor(state, client, &key_id.parse()?).await?; let public_key = actor.public_key.ok_or(MyError::MissingKey)?;