diff --git a/src/data/actor.rs b/src/data/actor.rs index 9c446f7..2966222 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -2,7 +2,7 @@ use crate::{ apub::AcceptedActors, db::{Actor, Db}, error::{Error, ErrorKind}, - requests::Requests, + requests::{BreakerStrategy, Requests}, }; use activitystreams::{iri_string::types::IriString, prelude::*}; use std::time::{Duration, SystemTime}; @@ -71,7 +71,9 @@ impl ActorCache { id: &IriString, requests: &Requests, ) -> Result { - let accepted_actor = requests.fetch::(id).await?; + let accepted_actor = requests + .fetch::(id, BreakerStrategy::Require2XX) + .await?; let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?; let accepted_actor_id = accepted_actor diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index f59affe..c3c3765 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -2,6 +2,7 @@ use crate::{ apub::AcceptedActors, error::{Error, ErrorKind}, jobs::JobState, + requests::BreakerStrategy, }; use activitystreams::{iri_string::types::IriString, object::Image, prelude::*}; use background_jobs::ActixJob; @@ -44,7 +45,7 @@ impl QueryContact { let contact = match state .state .requests - .fetch::(&self.contact_id) + .fetch::(&self.contact_id, BreakerStrategy::Allow404AndBelow) .await { Ok(contact) => contact, diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 10c5fce..0f936e4 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -1,6 +1,7 @@ use crate::{ error::Error, jobs::{debug_object, JobState}, + requests::BreakerStrategy, }; use activitystreams::iri_string::types::IriString; use background_jobs::{ActixJob, Backoff}; @@ -35,7 +36,12 @@ impl Deliver { #[tracing::instrument(name = "Deliver", skip(state))] async fn permform(self, state: JobState) -> Result<(), Error> { - if let Err(e) = state.state.requests.deliver(&self.to, &self.data).await { + if let Err(e) = state + .state + .requests + .deliver(&self.to, &self.data, BreakerStrategy::Allow401AndBelow) + .await + { if e.is_breaker() { tracing::debug!("Not trying due to failed breaker"); return Ok(()); diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 2b44e60..a0f3da1 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -2,6 +2,7 @@ use crate::{ config::UrlKind, error::{Error, ErrorKind}, jobs::{Boolish, JobState}, + requests::BreakerStrategy, }; use activitystreams::{iri, iri_string::types::IriString}; use background_jobs::ActixJob; @@ -42,7 +43,10 @@ impl QueryInstance { state .state .requests - .fetch_json::(&mastodon_instance_uri) + .fetch_json::( + &mastodon_instance_uri, + BreakerStrategy::Allow404AndBelow, + ) .await } InstanceApiType::Misskey => { @@ -50,7 +54,10 @@ impl QueryInstance { state .state .requests - .fetch_json_msky::(&msky_meta_uri) + .fetch_json_msky::( + &msky_meta_uri, + BreakerStrategy::Allow404AndBelow, + ) .await .map(|res| res.into()) } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index 60f41ce..f9d78a3 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -1,6 +1,7 @@ use crate::{ error::{Error, ErrorKind}, jobs::{Boolish, JobState, QueryContact}, + requests::BreakerStrategy, }; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; use background_jobs::ActixJob; @@ -45,7 +46,7 @@ impl QueryNodeinfo { let well_known = match state .state .requests - .fetch_json::(&well_known_uri) + .fetch_json::(&well_known_uri, BreakerStrategy::Allow404AndBelow) .await { Ok(well_known) => well_known, @@ -62,7 +63,12 @@ impl QueryNodeinfo { return Ok(()); }; - let nodeinfo = match state.state.requests.fetch_json::(&href).await { + let nodeinfo = match state + .state + .requests + .fetch_json::(&href, BreakerStrategy::Require2XX) + .await + { Ok(nodeinfo) => nodeinfo, Err(e) if e.is_breaker() => { tracing::debug!("Not retrying due to failed breaker"); diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index 8fa8bb0..91debe5 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -2,7 +2,7 @@ use crate::{ apub::AcceptedActors, data::{ActorCache, State}, error::{Error, ErrorKind}, - requests::Requests, + requests::{BreakerStrategy, Requests}, spawner::Spawner, }; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; @@ -70,7 +70,11 @@ impl MyVerify { actor_id } else { - match self.0.fetch::(&public_key_id).await { + match self + .0 + .fetch::(&public_key_id, BreakerStrategy::Require2XX) + .await + { Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId), Err(e) => { if e.is_gone() { diff --git a/src/requests.rs b/src/requests.rs index 1921f27..08c1002 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -24,6 +24,16 @@ const ONE_MINUTE: u64 = 60 * ONE_SECOND; const ONE_HOUR: u64 = 60 * ONE_MINUTE; const ONE_DAY: u64 = 24 * ONE_HOUR; +#[derive(Debug)] +pub(crate) enum BreakerStrategy { + // Requires a successful response + Require2XX, + // Allows HTTP 2xx-401 + Allow401AndBelow, + // Allows HTTP 2xx-404 + Allow404AndBelow, +} + #[derive(Clone)] pub(crate) struct Breakers { inner: Arc>, @@ -193,6 +203,7 @@ impl Requests { async fn check_response( &self, parsed_url: &IriString, + strategy: BreakerStrategy, res: Result, ) -> Result { if res.is_err() { @@ -203,7 +214,13 @@ impl Requests { let status = res.status(); - if status.is_server_error() { + let success = match strategy { + BreakerStrategy::Require2XX => status.is_success(), + BreakerStrategy::Allow401AndBelow => (200..=401).contains(&status.as_u16()), + BreakerStrategy::Allow404AndBelow => (200..=404).contains(&status.as_u16()), + }; + + if !success { self.breakers.fail(&parsed_url); if let Ok(s) = res.text().await { @@ -215,22 +232,33 @@ impl Requests { return Err(ErrorKind::Status(parsed_url.to_string(), status).into()); } - self.last_online.mark_seen(&parsed_url); - self.breakers.succeed(&parsed_url); + // only actually succeed a breaker on 2xx response + if status.is_success() { + self.last_online.mark_seen(&parsed_url); + self.breakers.succeed(&parsed_url); + } Ok(res) } #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch_json(&self, url: &IriString) -> Result + pub(crate) async fn fetch_json( + &self, + url: &IriString, + strategy: BreakerStrategy, + ) -> Result where T: serde::de::DeserializeOwned, { - self.do_fetch(url, "application/json").await + self.do_fetch(url, "application/json", strategy).await } #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch_json_msky(&self, url: &IriString) -> Result + pub(crate) async fn fetch_json_msky( + &self, + url: &IriString, + strategy: BreakerStrategy, + ) -> Result where T: serde::de::DeserializeOwned, { @@ -240,6 +268,7 @@ impl Requests { &serde_json::json!({}), "application/json", "application/json", + strategy, ) .await? .bytes() @@ -249,31 +278,50 @@ impl Requests { } #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch(&self, url: &IriString) -> Result + pub(crate) async fn fetch( + &self, + url: &IriString, + strategy: BreakerStrategy, + ) -> Result where T: serde::de::DeserializeOwned, { - self.do_fetch(url, "application/activity+json").await + self.do_fetch(url, "application/activity+json", strategy) + .await } - async fn do_fetch(&self, url: &IriString, accept: &str) -> Result + async fn do_fetch( + &self, + url: &IriString, + accept: &str, + strategy: BreakerStrategy, + ) -> Result where T: serde::de::DeserializeOwned, { - let body = self.do_fetch_response(url, accept).await?.bytes().await?; + let body = self + .do_fetch_response(url, accept, strategy) + .await? + .bytes() + .await?; Ok(serde_json::from_slice(&body)?) } #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] - pub(crate) async fn fetch_response(&self, url: &IriString) -> Result { - self.do_fetch_response(url, "*/*").await + pub(crate) async fn fetch_response( + &self, + url: &IriString, + strategy: BreakerStrategy, + ) -> Result { + self.do_fetch_response(url, "*/*", strategy).await } pub(crate) async fn do_fetch_response( &self, url: &IriString, accept: &str, + strategy: BreakerStrategy, ) -> Result { if !self.breakers.should_try(url) { return Err(ErrorKind::Breaker.into()); @@ -295,7 +343,7 @@ impl Requests { let res = self.client.execute(request).await; - let res = self.check_response(url, res).await?; + let res = self.check_response(url, strategy, res).await?; Ok(res) } @@ -305,7 +353,12 @@ impl Requests { skip_all, fields(inbox = inbox.to_string().as_str(), signing_string) )] - pub(crate) async fn deliver(&self, inbox: &IriString, item: &T) -> Result<(), Error> + pub(crate) async fn deliver( + &self, + inbox: &IriString, + item: &T, + strategy: BreakerStrategy, + ) -> Result<(), Error> where T: serde::ser::Serialize + std::fmt::Debug, { @@ -314,6 +367,7 @@ impl Requests { item, "application/activity+json", "application/activity+json", + strategy, ) .await?; Ok(()) @@ -325,6 +379,7 @@ impl Requests { item: &T, content_type: &str, accept: &str, + strategy: BreakerStrategy, ) -> Result where T: serde::ser::Serialize + std::fmt::Debug, @@ -357,7 +412,7 @@ impl Requests { let res = self.client.execute(request).await; - let res = self.check_response(inbox, res).await?; + let res = self.check_response(inbox, strategy, res).await?; Ok(res) } diff --git a/src/routes/media.rs b/src/routes/media.rs index b99caef..4c9b260 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -1,4 +1,8 @@ -use crate::{data::MediaCache, error::Error, requests::Requests}; +use crate::{ + data::MediaCache, + error::Error, + requests::{BreakerStrategy, Requests}, +}; use actix_web::{body::BodyStream, web, HttpResponse}; use uuid::Uuid; @@ -11,7 +15,9 @@ pub(crate) async fn route( let uuid = uuid.into_inner(); if let Some(url) = media.get_url(uuid).await? { - let res = requests.fetch_response(&url).await?; + let res = requests + .fetch_response(&url, BreakerStrategy::Allow404AndBelow) + .await?; let mut response = HttpResponse::build(res.status());