From a1ea5d676c63298e52d6c345a728e60fd185a79e Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 25 Feb 2023 15:02:16 -0600 Subject: [PATCH] Rework misskey fetch to reuse deliver plumbing Only count server errors towards failed breakers --- src/data/actor.rs | 2 +- src/jobs/contact.rs | 2 +- src/jobs/deliver.rs | 2 +- src/jobs/instance.rs | 4 +- src/jobs/nodeinfo.rs | 4 +- src/middleware/verifier.rs | 6 +- src/requests.rs | 123 +++++++++++++++++-------------------- src/routes/media.rs | 2 +- 8 files changed, 67 insertions(+), 78 deletions(-) diff --git a/src/data/actor.rs b/src/data/actor.rs index 784e99f..af6fb78 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -71,7 +71,7 @@ impl ActorCache { id: &IriString, requests: &Requests, ) -> Result { - let accepted_actor = requests.fetch::(id.as_str()).await?; + let accepted_actor = requests.fetch::(id).await?; let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?; let accepted_actor_id = accepted_actor diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index a98ac8f..3c880af 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -42,7 +42,7 @@ impl QueryContact { let contact = match state .requests - .fetch::(self.contact_id.as_str()) + .fetch::(&self.contact_id) .await { Ok(contact) => contact, diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 223608e..72f4aec 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -35,7 +35,7 @@ impl Deliver { #[tracing::instrument(name = "Deliver", skip(state))] async fn permform(self, state: JobState) -> Result<(), Error> { - if let Err(e) = state.requests.deliver(self.to, &self.data).await { + if let Err(e) = state.requests.deliver(&self.to, &self.data).await { if e.is_breaker() { tracing::debug!("Not trying due to failed breaker"); return Ok(()); diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 4713525..59826b5 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -41,14 +41,14 @@ impl QueryInstance { let mastodon_instance_uri = iri!(format!("{scheme}://{authority}/api/v1/instance")); state .requests - .fetch_json::(mastodon_instance_uri.as_str()) + .fetch_json::(&mastodon_instance_uri) .await } InstanceApiType::Misskey => { let msky_meta_uri = iri!(format!("{scheme}://{authority}/api/meta")); state .requests - .fetch_json_msky::(msky_meta_uri.as_str()) + .fetch_json_msky::(&msky_meta_uri) .await .map(|res| res.into()) } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index 271b5a1..fa4812f 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -43,7 +43,7 @@ impl QueryNodeinfo { let well_known = match state .requests - .fetch_json::(well_known_uri.as_str()) + .fetch_json::(&well_known_uri) .await { Ok(well_known) => well_known, @@ -55,7 +55,7 @@ impl QueryNodeinfo { }; let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) { - link.href + iri!(&link.href) } else { return Ok(()); }; diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index 2ae9779..4e1e364 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -67,11 +67,7 @@ impl MyVerify { actor_id } else { - match self - .0 - .fetch::(public_key_id.as_str()) - .await - { + match self.0.fetch::(&public_key_id).await { Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId), Err(e) => { if e.is_gone() { diff --git a/src/requests.rs b/src/requests.rs index 304d0a6..06c7450 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -229,7 +229,7 @@ impl Requests { self.reset_err(); - if !res.status().is_success() { + if res.status().is_server_error() { self.breakers.fail(&parsed_url); if let Ok(bytes) = res.body().await { @@ -250,7 +250,7 @@ impl Requests { } #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch_json(&self, url: &str) -> Result + pub(crate) async fn fetch_json(&self, url: &IriString) -> Result where T: serde::de::DeserializeOwned, { @@ -258,75 +258,40 @@ impl Requests { } #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch_json_msky(&self, url: &str) -> Result + pub(crate) async fn fetch_json_msky(&self, url: &IriString) -> Result where T: serde::de::DeserializeOwned, { - self.do_fetch_msky(url, "application/json").await + let mut res = self + .do_deliver( + url, + &serde_json::json!({}), + "application/json", + "application/json", + ) + .await?; + + let body = res + .body() + .await + .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?; + + Ok(serde_json::from_slice(body.as_ref())?) } #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] - pub(crate) async fn fetch(&self, url: &str) -> Result + pub(crate) async fn fetch(&self, url: &IriString) -> Result where T: serde::de::DeserializeOwned, { self.do_fetch(url, "application/activity+json").await } - async fn do_fetch(&self, url: &str, accept: &str) -> Result + async fn do_fetch(&self, url: &IriString, accept: &str) -> Result where T: serde::de::DeserializeOwned, { - self.do_fetch_inner(url, accept, false).await - } - - async fn do_fetch_msky(&self, url: &str, accept: &str) -> Result - where - T: serde::de::DeserializeOwned, - { - self.do_fetch_inner(url, accept, true).await - } - - async fn do_fetch_inner(&self, url: &str, accept: &str, use_post: bool) -> Result - where - T: serde::de::DeserializeOwned, - { - let parsed_url = url.parse::()?; - - if !self.breakers.should_try(&parsed_url) { - return Err(ErrorKind::Breaker.into()); - } - - let signer = self.signer(); - let span = tracing::Span::current(); - - let client: Client = self.client.borrow().clone(); - let client_req = match use_post { - true => client.post(url), - false => client.get(url), - }; - let client_signed = client_req - .insert_header(("Accept", accept)) - .insert_header(Date(SystemTime::now().into())) - .signature( - self.config.clone(), - self.key_id.clone(), - move |signing_string| { - span.record("signing_string", signing_string); - span.in_scope(|| signer.sign(signing_string)) - }, - ) - .await?; - let res = match use_post { - true => { - let dummy = serde_json::json!({}); - client_signed.send_json(&dummy) - } - false => client_signed.send(), - } - .await; - - let mut res = self.check_response(&parsed_url, res).await?; + let mut res = self.do_fetch_response(url, accept).await?; let body = res .body() @@ -337,8 +302,16 @@ impl Requests { } #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] - pub(crate) async fn fetch_response(&self, url: IriString) -> Result { - if !self.breakers.should_try(&url) { + pub(crate) async fn fetch_response(&self, url: &IriString) -> Result { + self.do_fetch_response(url, "*/*").await + } + + pub(crate) async fn do_fetch_response( + &self, + url: &IriString, + accept: &str, + ) -> Result { + if !self.breakers.should_try(url) { return Err(ErrorKind::Breaker.into()); } @@ -348,7 +321,7 @@ impl Requests { let client: Client = self.client.borrow().clone(); let res = client .get(url.as_str()) - .insert_header(("Accept", "*/*")) + .insert_header(("Accept", accept)) .insert_header(Date(SystemTime::now().into())) .no_decompress() .signature( @@ -363,7 +336,7 @@ impl Requests { .send() .await; - let res = self.check_response(&url, res).await?; + let res = self.check_response(url, res).await?; Ok(res) } @@ -373,7 +346,27 @@ impl Requests { skip_all, fields(inbox = inbox.to_string().as_str(), signing_string) )] - pub(crate) async fn deliver(&self, inbox: IriString, item: &T) -> Result<(), Error> + pub(crate) async fn deliver(&self, inbox: &IriString, item: &T) -> Result<(), Error> + where + T: serde::ser::Serialize + std::fmt::Debug, + { + self.do_deliver( + inbox, + item, + "application/activity+json", + "application/activity+json", + ) + .await?; + Ok(()) + } + + async fn do_deliver( + &self, + inbox: &IriString, + item: &T, + content_type: &str, + accept: &str, + ) -> Result where T: serde::ser::Serialize + std::fmt::Debug, { @@ -388,8 +381,8 @@ impl Requests { let client: Client = self.client.borrow().clone(); let (req, body) = client .post(inbox.as_str()) - .insert_header(("Accept", "application/activity+json")) - .insert_header(("Content-Type", "application/activity+json")) + .insert_header(("Accept", accept)) + .insert_header(("Content-Type", content_type)) .insert_header(Date(SystemTime::now().into())) .signature_with_digest( self.config.clone(), @@ -406,9 +399,9 @@ impl Requests { let res = req.send_body(body).await; - self.check_response(&inbox, res).await?; + let res = self.check_response(inbox, res).await?; - Ok(()) + Ok(res) } fn signer(&self) -> Signer { diff --git a/src/routes/media.rs b/src/routes/media.rs index 8a9de62..7cc3ed9 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -11,7 +11,7 @@ pub(crate) async fn route( let uuid = uuid.into_inner(); if let Some(url) = media.get_url(uuid).await? { - let res = requests.fetch_response(url).await?; + let res = requests.fetch_response(&url).await?; let mut response = HttpResponse::build(res.status());