// need this for ructe #![allow(clippy::needless_borrow)] use actix_web::{ body::BodyStream, http::{ header::{CacheControl, CacheDirective, ContentType, LastModified, LOCATION}, StatusCode, }, middleware::NormalizePath, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, ResponseError, }; use awc::Client; use clap::Parser; use console_subscriber::ConsoleLayer; use once_cell::sync::Lazy; use opentelemetry::{ sdk::{propagation::TraceContextPropagator, Resource}, KeyValue, }; use opentelemetry_otlp::WithExportConfig; use std::{ io::Cursor, net::SocketAddr, time::{Duration, SystemTime}, }; use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; use tracing_error::{ErrorLayer, SpanTrace}; use tracing_log::LogTracer; use tracing_subscriber::{ filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, registry::LookupSpan, Layer, Registry, }; use url::Url; include!(concat!(env!("OUT_DIR"), "/templates.rs")); const HOURS: u32 = 60 * 60; const DAYS: u32 = 24 * HOURS; /// Simple proxy service to demonstrate pict-rs's functionality #[derive(Clone, Debug, Parser)] #[command(author, version, about, long_about = None)] struct Config { #[arg( short, long, env = "PICTRS_PROXY_ADDR", default_value = "0.0.0.0:8081", help = "The address and port the server binds to" )] addr: SocketAddr, #[arg( short, long, env = "PICTRS_PROXY_UPSTREAM", default_value = "http://localhost:8080", help = "The url of the upstream pict-rs server" )] upstream: Url, #[arg( short, long, env = "PICTRS_PROXY_DOMAIN", default_value = "http://localhost:8081", help = "The scheme, domain, and optional port of the pict-rs proxy server" )] domain: Url, #[arg( long, env = "PICTRS_PROXY_CONSOLE_BUFFER_SIZE", help = "Number of events to buffer for the console subscriber. When unset, console will be disabled" )] console_event_buffer_size: Option, #[arg( short, long, env = "PICTRS_PROXY_OPENTELEMETRY_URL", help = "URL of OpenTelemetry Collector" )] opentelemetry_url: Option, } impl Config { fn domain(&self) -> Option<&str> { CONFIG.domain.domain() } fn upstream_upload_url(&self) -> String { let mut url = self.upstream.clone(); url.set_path("image/backgrounded"); url.to_string() } fn upstream_claim_url(&self, upload_id: &str) -> String { let mut url = self.upstream.clone(); url.set_path("image/backgrounded/claim"); url.set_query(Some(&format!("upload_id={}", upload_id))); url.to_string() } fn upstream_details_url(&self, name: &str) -> String { let mut url = self.upstream.clone(); url.set_path(&format!("image/details/original/{}", name)); url.to_string() } fn upstream_image_url(&self, name: &str) -> String { let mut url = self.upstream.clone(); url.set_path(&format!("image/original/{}", name)); url.to_string() } fn upstream_thumbnail_url(&self, size: u64, name: &str, filetype: FileType) -> String { let mut url = self.upstream.clone(); url.set_path(&format!("image/process.{}", filetype.as_str())); url.set_query(Some(&format!("src={}&thumbnail={}", name, size))); url.to_string() } fn upstream_delete_url(&self, token: &str, name: &str) -> String { let mut url = self.upstream.clone(); url.set_path(&format!("image/delete/{}/{}", token, name)); url.to_string() } fn image_url(&self, name: &str) -> String { let mut url = self.domain.clone(); url.set_path(&format!("image/{}", name)); url.to_string() } fn thumbnail_url(&self, size: u64, name: &str, filetype: FileType) -> String { let mut url = self.domain.clone(); url.set_path(&format!("thumb/{}/{}/{}", size, filetype.as_str(), name)); url.to_string() } fn view_url(&self, size: Option, name: &str) -> String { let mut url = self.domain.clone(); if let Some(size) = size { url.set_path(&format!("view/{}/{}", size, name)); } else { url.set_path(&format!("view/{}", name)); } url.to_string() } fn thumbnails_url(&self, name: &str) -> String { let mut url = self.domain.clone(); url.set_path("/thumbnails"); url.set_query(Some(&format!("image={}", name))); url.to_string() } fn delete_url(&self, token: &str, name: &str) -> String { let mut url = self.domain.clone(); url.set_path("delete"); url.set_query(Some(&format!("file={}&token={}", name, token))); url.to_string() } fn confirm_delete_url(&self, token: &str, name: &str) -> String { let mut url = self.domain.clone(); url.set_path("delete"); url.set_query(Some(&format!("file={}&token={}&confirm=true", name, token))); url.to_string() } } static CONFIG: Lazy = Lazy::new(Config::parse); pub enum UploadResult<'a> { Image(Image), UploadId(&'a str), Error(Error), } #[derive(Debug, serde::Deserialize)] enum FileType { #[serde(rename = "avif")] Avif, #[serde(rename = "jpg")] Jpg, #[serde(rename = "webp")] Webp, } impl FileType { fn as_str(&self) -> &'static str { match self { Self::Avif => "avif", Self::Jpg => "jpg", Self::Webp => "webp", } } } #[derive(Debug, serde::Deserialize)] pub struct Images { msg: String, files: Option>, } #[derive(Debug, serde::Deserialize)] pub struct Upload { // This is technically a UUID, but we don't care in this program upload_id: String, } #[derive(Debug, serde::Deserialize)] pub struct Uploads { msg: String, uploads: Option>, } #[derive(Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)] pub struct UploadQuery { #[serde(default)] uploads: Vec, #[serde(default)] files: Vec<(String, String)>, } impl UploadQuery { fn try_from_uploads(uploads: Uploads) -> Result { if let Some(uploads) = uploads.uploads { Ok(UploadQuery { uploads: uploads.into_iter().map(|u| u.upload_id).collect(), files: vec![], }) } else { Err(uploads.msg) } } } impl Images { fn files(&self) -> Option<&[Image]> { self.files.as_ref().map(|v| v.as_ref()) } fn msg(&self) -> &str { &self.msg } fn is_ok(&self) -> bool { self.files().is_some() } fn message(&self) -> &'static str { if self.is_ok() { "Images Uploaded" } else { "Image Upload Failed" } } } #[derive(Debug, serde::Deserialize)] pub struct Details { height: usize, width: usize, content_type: String, } #[derive(Debug, serde::Deserialize)] pub struct Image { file: String, delete_token: String, details: Details, } impl Image { fn filename(&self) -> &str { &self.file } fn is_video(&self) -> bool { self.details.content_type.starts_with("video") } fn width(&self) -> usize { self.details.width } fn height(&self) -> usize { self.details.height } fn mime(&self) -> &str { &self.details.content_type } fn link(&self) -> String { CONFIG.image_url(&self.file) } fn thumbnails(&self) -> String { CONFIG.thumbnails_url(&self.file) } fn view(&self, size: Option) -> String { CONFIG.view_url(size, &self.file) } fn thumb(&self, size: u64, filetype: FileType) -> String { CONFIG.thumbnail_url(size, &self.file, filetype) } fn delete(&self) -> String { CONFIG.delete_url(&self.delete_token, &self.file) } fn confirm_delete(&self) -> String { CONFIG.confirm_delete_url(&self.delete_token, &self.file) } } fn statics(file: &str) -> String { format!("/static/{}", file) } #[derive(Debug)] pub struct Error { context: SpanTrace, kind: ErrorKind, } impl Error { pub(crate) fn upstream_error(&self) -> Option<&str> { match self.kind { ErrorKind::UploadFailed(ref msg) => Some(msg), _ => None, } } } impl From for Error where ErrorKind: From, { fn from(error: T) -> Self { Error { context: SpanTrace::capture(), kind: error.into(), } } } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut source: Option<&dyn std::error::Error> = Some(&self.kind); while let Some(error) = source { writeln!(f, "{}", error)?; source = std::error::Error::source(error); } std::fmt::Display::fmt(&self.context, f) } } impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { self.kind.source() } } impl ResponseError for Error { fn status_code(&self) -> StatusCode { StatusCode::INTERNAL_SERVER_ERROR } fn error_response(&self) -> HttpResponse { match render(HttpResponse::build(self.status_code()), |cursor| { self::templates::error_html(cursor, &self.kind.to_string()) }) { Ok(res) => res, Err(_) => HttpResponse::build(self.status_code()) .content_type(mime::TEXT_PLAIN.essence_str()) .body(self.kind.to_string()), } } } #[derive(Debug, thiserror::Error)] enum ErrorKind { #[error("Upload failed: {0}")] UploadFailed(String), #[error("Error transfering data")] Io(#[from] std::io::Error), #[error("Upstream request failed")] SendRequest(#[from] awc::error::SendRequestError), #[error("Could not parse response")] JsonPayload(#[from] awc::error::JsonPayloadError), #[error("Invalid query string")] Query(#[from] serde_qs::Error), #[error("Operation panicked")] Canceled(#[from] actix_rt::task::JoinError), } #[tracing::instrument(name = "Upload Page")] async fn index() -> Result { render(HttpResponse::Ok(), |cursor| { self::templates::index_html(cursor, "/upload", "images[]") }) } #[tracing::instrument(name = "List Uploads", skip(client))] async fn list_uploads( query: serde_qs::actix::QsQuery, client: web::Data, ) -> Result { let query = query.into_inner(); let mut upload_handles = Vec::new(); let mut details_handles = Vec::new(); for upload_id in &query.uploads { let claim_url = CONFIG.upstream_claim_url(upload_id.as_str()); let client = client.clone(); upload_handles.push(actix_rt::spawn(async move { let mut res = client.get(claim_url).send().await?; if res.status() == 204 { return Ok(None) as Result, Error>; } let result = res.json::().await?; match result.files { Some(files) if files.len() == 1 => Ok(files.into_iter().next()), Some(_) => Err(ErrorKind::UploadFailed("Bad response".into()).into()), None => Err(ErrorKind::UploadFailed(result.msg).into()), } })); } for (file, delete_token) in &query.files { let details_url = CONFIG.upstream_details_url(file); let file = file.clone(); let delete_token = delete_token.clone(); let client = client.clone(); details_handles.push(actix_rt::spawn(async move { let mut res = client.get(details_url).send().await?; let details = res.json::
().await?; Ok(Image { file, delete_token, details, }) })) } let mut results = Vec::new(); let mut any_incomplete = false; for handle in details_handles { let res = handle.await.map_err(Error::from).and_then(|res| res); match res { Ok(image) => { results.push(UploadResult::Image(image)); } Err(e) => { results.push(UploadResult::Error(e)); } } } for (handle, upload_id) in upload_handles.into_iter().zip(&query.uploads) { let res = handle.await.map_err(Error::from).and_then(|res| res); match res { Ok(Some(image)) => { results.push(UploadResult::Image(image)); } Ok(None) => { any_incomplete = true; results.push(UploadResult::UploadId(upload_id)); } Err(e) => { results.push(UploadResult::Error(e)); } } } if any_incomplete { let new_query = results .into_iter() .fold(UploadQuery::default(), |mut query, res| match res { UploadResult::Image(img) => { query.files.push((img.file, img.delete_token)); query } UploadResult::UploadId(id) => { query.uploads.push(id.to_owned()); query } _ => query, }); if new_query != query { let query = serde_qs::to_string(&new_query)?; return Ok(HttpResponse::SeeOther() .insert_header((LOCATION, format!("/upload?{}", query))) .finish()); } return render(HttpResponse::Ok(), |cursor| { self::templates::uploads_html(cursor) }); } render(HttpResponse::Ok(), |cursor| { self::templates::finished_uploads_html(cursor, results) }) } #[tracing::instrument(name = "Upload", skip(req, body, client))] async fn upload( req: HttpRequest, body: web::Payload, client: web::Data, ) -> Result { let client_request = client.request_from(CONFIG.upstream_upload_url(), req.head()); let client_request = if let Some(addr) = req.head().peer_addr { client_request.append_header(("X-Forwarded-For", addr.to_string())) } else { client_request }; let mut res = client_request.send_stream(body).await?; let uploads = res.json::().await?; let query = UploadQuery::try_from_uploads(uploads).map_err(ErrorKind::UploadFailed)?; let query = serde_qs::to_string(&query)?; Ok(HttpResponse::SeeOther() .insert_header((LOCATION, format!("/upload?{}", query))) .finish()) } const THUMBNAIL_SIZES: &[u64] = &[40, 50, 80, 100, 200, 400, 800, 1200]; #[derive(Debug, serde::Deserialize)] struct ThumbnailQuery { image: String, } #[tracing::instrument(name = "Thumbs", skip(client))] async fn thumbs( query: web::Query, client: web::Data, ) -> Result { let file = query.into_inner().image; let url = CONFIG.upstream_details_url(&file); let mut res = client.get(url).send().await?; if res.status() == StatusCode::NOT_FOUND { return Ok(to_404()); } let details: Details = res.json().await?; let image = Image { file, delete_token: String::new(), details, }; render(HttpResponse::Ok(), |cursor| { self::templates::thumbnails_html(cursor, image, THUMBNAIL_SIZES) }) } #[tracing::instrument(name = "Image", skip(req, client))] async fn image( url: String, req: HttpRequest, client: web::Data, ) -> Result { let client_request = client.request_from(url, req.head()); let client_request = if let Some(addr) = req.head().peer_addr { client_request.insert_header(("X-Forwarded-For", addr.to_string())) } else { client_request }; let res = client_request.no_decompress().send().await?; if res.status() == StatusCode::NOT_FOUND { return Ok(to_404()); } let mut client_res = HttpResponse::build(res.status()); for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") { client_res.insert_header((name.clone(), value.clone())); } Ok(client_res.body(BodyStream::new(res))) } #[tracing::instrument(name = "View original", skip(client))] async fn view_original( file: web::Path, client: web::Data, ) -> Result { let file = file.into_inner(); let url = CONFIG.upstream_details_url(&file); let mut res = client.get(url).send().await?; if res.status() == StatusCode::NOT_FOUND { return Ok(to_404()); } let details: Details = res.json().await?; let image = Image { file, delete_token: String::new(), details, }; render(HttpResponse::Ok(), |cursor| { self::templates::view_html(cursor, image, None, THUMBNAIL_SIZES.last()) }) } #[tracing::instrument(name = "View", skip(client))] async fn view( parts: web::Path<(u64, String)>, client: web::Data, ) -> Result { let (size, file) = parts.into_inner(); if !valid_thumbnail_size(size) { return Ok(to_404()); } let url = CONFIG.upstream_details_url(&file); let mut res = client.get(url).send().await?; if res.status() == StatusCode::NOT_FOUND { return Ok(to_404()); } let details: Details = res.json().await?; let image = Image { file, delete_token: String::new(), details, }; render(HttpResponse::Ok(), |cursor| { self::templates::view_html(cursor, image, Some(size), THUMBNAIL_SIZES.last()) }) } #[tracing::instrument(name = "Thumbnail", skip(req, client))] async fn thumbnail( parts: web::Path<(u64, FileType, String)>, req: HttpRequest, client: web::Data, ) -> Result { let (size, filetype, file) = parts.into_inner(); if valid_thumbnail_size(size) { let url = CONFIG.upstream_thumbnail_url(size, &file, filetype); return image(url, req, client).await; } Ok(to_404()) } fn valid_thumbnail_size(size: u64) -> bool { THUMBNAIL_SIZES.contains(&size) } #[tracing::instrument(name = "Full resolution", skip(req, client))] async fn full_res( filename: web::Path, req: HttpRequest, client: web::Data, ) -> Result { let url = CONFIG.upstream_image_url(&filename.into_inner()); image(url, req, client).await } #[allow(clippy::async_yields_async)] #[tracing::instrument(name = "Static files")] async fn static_files(filename: web::Path) -> HttpResponse { let filename = filename.into_inner(); if let Some(data) = self::templates::statics::StaticFile::get(&filename) { return HttpResponse::Ok() .insert_header(LastModified(SystemTime::now().into())) .insert_header(CacheControl(vec![ CacheDirective::Public, CacheDirective::MaxAge(365 * DAYS), CacheDirective::Extension("immutable".to_owned(), None), ])) .insert_header(ContentType(data.mime.clone())) .body(data.content); } to_404() } #[derive(Debug, serde::Deserialize)] struct DeleteQuery { token: String, file: String, #[serde(default)] confirm: bool, } #[tracing::instrument(name = "Delete", skip(client))] async fn delete( query: web::Query, client: web::Data, ) -> Result { let DeleteQuery { token, file, confirm, } = query.into_inner(); let url = CONFIG.upstream_details_url(&file); let mut res = client.get(url).send().await?; if res.status() == StatusCode::NOT_FOUND { return Ok(to_404()); } if confirm { let url = CONFIG.upstream_delete_url(&token, &file); client.delete(url).send().await?; render(HttpResponse::Ok(), |cursor| { self::templates::deleted_html(cursor, &file) }) } else { let details: Details = res.json().await?; render(HttpResponse::Ok(), move |cursor| { self::templates::confirm_delete_html( cursor, &Image { file, delete_token: token, details, }, ) }) } } fn to_404() -> HttpResponse { HttpResponse::TemporaryRedirect() .insert_header((LOCATION, "/404")) .finish() } #[tracing::instrument(name = "Not Found")] async fn not_found() -> Result { render(HttpResponse::NotFound(), |cursor| { self::templates::not_found_html(cursor) }) } async fn go_home() -> HttpResponse { HttpResponse::TemporaryRedirect() .insert_header((LOCATION, "/")) .finish() } #[tracing::instrument(name = "Render", skip(builder, f))] fn render( mut builder: HttpResponseBuilder, f: impl FnOnce(&mut Cursor<&mut Vec>) -> Result<(), std::io::Error>, ) -> Result { let min = { let mut bytes = vec![]; (f)(&mut Cursor::new(&mut bytes))?; minify_html::minify(&bytes, &minify_html::Cfg::spec_compliant()) }; Ok(builder .content_type(mime::TEXT_HTML.essence_str()) .body(min)) } fn init_tracing( service_name: &'static str, opentelemetry_url: Option<&Url>, console_event_buffer_size: Option, ) -> Result<(), anyhow::Error> { opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); LogTracer::init()?; let targets: Targets = std::env::var("RUST_LOG") .unwrap_or_else(|_| "info".into()) .parse()?; let format_layer = tracing_subscriber::fmt::layer() .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .with_filter(targets.clone()); let subscriber = Registry::default() .with(format_layer) .with(ErrorLayer::default()); if let Some(buffer_size) = console_event_buffer_size { let console_layer = ConsoleLayer::builder() .with_default_env() .server_addr(([0, 0, 0, 0], 6669)) .event_buffer_capacity(buffer_size) .spawn(); let subscriber = subscriber.with(console_layer); init_subscriber(subscriber, targets, opentelemetry_url, service_name) } else { init_subscriber(subscriber, targets, opentelemetry_url, service_name) } } fn init_subscriber( subscriber: S, targets: Targets, opentelemetry_url: Option<&Url>, service_name: &'static str, ) -> anyhow::Result<()> where S: SubscriberExt + Send + Sync, for<'a> S: LookupSpan<'a>, { if let Some(url) = opentelemetry_url { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_trace_config(opentelemetry::sdk::trace::config().with_resource( Resource::new(vec![KeyValue::new("service.name", service_name)]), )) .with_exporter( opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(url.as_str()), ) .install_batch(opentelemetry::runtime::Tokio)?; let otel_layer = tracing_opentelemetry::layer() .with_tracer(tracer) .with_filter(targets); let subscriber = subscriber.with(otel_layer); tracing::subscriber::set_global_default(subscriber)?; } else { tracing::subscriber::set_global_default(subscriber)?; } Ok(()) } #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); init_tracing( "pict-rs-proxy", CONFIG.opentelemetry_url.as_ref(), CONFIG.console_event_buffer_size, )?; HttpServer::new(move || { let client = Client::builder() .wrap(Tracing) .add_default_header(("User-Agent", "pict-rs-frontend, v0.5.0-alpha.1")) .timeout(Duration::from_secs(30)) .disable_redirects() .finish(); App::new() .app_data(web::Data::new(client)) .wrap(NormalizePath::trim()) .wrap(TracingLogger::default()) .service(web::resource("/").route(web::get().to(index))) .service( web::resource("/upload") .route(web::post().to(upload)) .route(web::get().to(list_uploads)), ) .service(web::resource("/image/{filename}").route(web::get().to(full_res))) .service(web::resource("/thumbnails").route(web::get().to(thumbs))) .service(web::resource("/view/{size}/{filename}").route(web::get().to(view))) .service(web::resource("/view/{filename}").route(web::get().to(view_original))) .service( web::resource("/thumb/{size}/{filetype}/{filename}") .route(web::get().to(thumbnail)), ) .service(web::resource("/static/{filename}").route(web::get().to(static_files))) .service(web::resource("/delete").route(web::get().to(delete))) .service(web::resource("/404").route(web::get().to(not_found))) .default_service(web::get().to(go_home)) }) .bind(CONFIG.addr)? .run() .await?; Ok(()) }