pict-rs-aggregator/src/connection.rs

239 lines
7.2 KiB
Rust

use std::{sync::Arc, time::Duration};
use crate::{
pict::{Details, Extension, Images, Upload, Uploads},
Error,
};
use actix_web::{body::BodyStream, http::StatusCode, web, HttpRequest, HttpResponse};
use awc::Client;
use url::Url;
pub(crate) static VALID_SIZES: &[u16] = &[80, 160, 320, 640, 1080, 2160];
pub(crate) struct Connection {
upstream: Url,
client: Client,
}
impl std::fmt::Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("upstream", &self.upstream.as_str())
.field("client", &"Client")
.finish()
}
}
impl Connection {
pub(crate) fn new(upstream: Url, client: Client) -> Self {
Connection { upstream, client }
}
#[tracing::instrument(skip_all)]
pub(crate) async fn claim(&self, upload: Upload) -> Result<Images, Error> {
let mut attempts = 0;
const CLAIM_ATTEMPT_LIMIT: usize = 10;
loop {
match self.client.get(self.claim_url(&upload)).send().await {
Ok(mut res) => {
match res.status() {
StatusCode::OK => {
return res
.json::<Images>()
.await
.map_err(|_| UploadError::Json.into());
}
StatusCode::NO_CONTENT => {
// continue
}
_ => {
let images =
res.json::<Images>().await.map_err(|_| UploadError::Json)?;
let code = images.code().unwrap_or("uknown-error").to_string();
let msg = images.msg().to_string();
return Err(UploadError::UploadFailure(code, msg).into());
}
}
}
Err(_) => {
attempts += 1;
if attempts > CLAIM_ATTEMPT_LIMIT {
return Err(UploadError::Status.into());
}
tokio::time::sleep(Duration::from_secs(1)).await;
// continue
}
}
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn thumbnail(
&self,
size: u16,
file: &str,
extension: Extension,
req: &HttpRequest,
) -> Result<HttpResponse, Error> {
if !VALID_SIZES.contains(&size) {
return Err(UploadError::Size(size).into());
}
self.proxy(self.thumbnail_url(size, file, extension), req)
.await
}
#[tracing::instrument(skip_all)]
pub(crate) async fn image(&self, file: &str, req: &HttpRequest) -> Result<HttpResponse, Error> {
self.proxy(self.image_url(file), req).await
}
#[tracing::instrument(skip_all)]
pub(crate) async fn details(&self, file: &str) -> Result<Details, Error> {
let mut response = self
.client
.get(self.details_url(file))
.send()
.await
.map_err(|e| UploadError::Request(Arc::from(e.to_string())))?;
if !response.status().is_success() {
return Err(UploadError::Status.into());
}
response.json().await.map_err(|_| UploadError::Json.into())
}
#[tracing::instrument(skip_all)]
pub(crate) async fn upload(
&self,
req: &HttpRequest,
body: web::Payload,
) -> Result<Uploads, Error> {
let client_request = self.client.request_from(self.upload_url(), req.head());
let mut client_request = if let Some(addr) = req.head().peer_addr {
client_request.append_header(("X-Forwarded-For", addr.to_string()))
} else {
client_request
};
client_request.headers_mut().remove("Accept-Encoding");
let mut res = client_request
.send_stream(body)
.await
.map_err(|e| UploadError::Request(Arc::from(e.to_string())))?;
let uploads = res.json::<Uploads>().await.map_err(|_| UploadError::Json)?;
Ok(uploads)
}
#[tracing::instrument(skip_all)]
pub(crate) async fn delete(&self, file: &str, token: &str) -> Result<(), Error> {
let res = self
.client
.delete(self.delete_url(file, token))
.send()
.await
.map_err(|e| UploadError::Request(Arc::from(e.to_string())))?;
if !res.status().is_success() {
return Err(UploadError::Status.into());
}
Ok(())
}
fn claim_url(&self, upload: &Upload) -> String {
let mut url = self.upstream.clone();
url.set_path("/image/backgrounded/claim");
url.set_query(Some(&format!("upload_id={}", upload.id())));
url.to_string()
}
fn upload_url(&self) -> String {
let mut url = self.upstream.clone();
url.set_path("/image/backgrounded");
url.to_string()
}
fn thumbnail_url(&self, size: u16, file: &str, extension: Extension) -> String {
let mut url = self.upstream.clone();
url.set_path(&format!("/image/process.{extension}"));
url.set_query(Some(&format!("src={file}&resize={size}")));
url.to_string()
}
fn image_url(&self, file: &str) -> String {
let mut url = self.upstream.clone();
url.set_path(&format!("/image/original/{file}"));
url.to_string()
}
fn details_url(&self, file: &str) -> String {
let mut url = self.upstream.clone();
url.set_path(&format!("/image/details/original/{file}"));
url.to_string()
}
fn delete_url(&self, file: &str, token: &str) -> String {
let mut url = self.upstream.clone();
url.set_path(&format!("/image/delete/{token}/{file}"));
url.to_string()
}
#[tracing::instrument(skip_all)]
async fn proxy(&self, url: String, req: &HttpRequest) -> Result<HttpResponse, Error> {
let client_request = self.client.request_from(url, req.head());
let client_request = if let Some(addr) = req.head().peer_addr {
client_request.append_header(("X-Forwarded-For", addr.to_string()))
} else {
client_request
};
let res = client_request
.no_decompress()
.send()
.await
.map_err(|e| UploadError::Request(Arc::from(e.to_string())))?;
let mut client_res = HttpResponse::build(res.status());
for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") {
client_res.append_header((name.clone(), value.clone()));
}
Ok(client_res.body(BodyStream::new(res)))
}
}
#[derive(Clone, Debug, thiserror::Error)]
pub(crate) enum UploadError {
#[error("There was an error making the upstream request: {0}")]
Request(Arc<str>),
#[error("There was an error parsing the upstream response")]
Json,
#[error("Request returned bad HTTP status")]
Status,
#[error("Request failed with {0}: {1}")]
UploadFailure(String, String),
#[error("Requested size {0} is invalid")]
Size(u16),
}