diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..02e1a31 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,64 @@ +use actix_web::{http::StatusCode, HttpResponse, ResponseError}; + +#[derive(Debug, thiserror::Error)] +pub enum UploadError { + #[error("Invalid content type provided, {0}")] + ContentType(mime::Mime), + + #[error("Couln't upload file, {0}")] + Upload(String), + + #[error("Couldn't save file, {0}")] + Save(#[from] actix_fs::Error), + + #[error("Error in DB, {0}")] + Db(#[from] sled::Error), + + #[error("Error parsing string, {0}")] + ParseString(#[from] std::string::FromUtf8Error), + + #[error("Error processing image, {0}")] + Image(#[from] image::error::ImageError), + + #[error("Panic in blocking operation")] + Canceled, + + #[error("No files present in upload")] + NoFiles, + + #[error("Uploaded image could not be served, extension is missing")] + MissingExtension, +} + +impl From for UploadError { + fn from(e: actix_form_data::Error) -> Self { + UploadError::Upload(e.to_string()) + } +} + +impl From> for UploadError +where + T: Into + std::fmt::Debug, +{ + fn from(e: actix_web::error::BlockingError) -> Self { + match e { + actix_web::error::BlockingError::Error(e) => e.into(), + _ => UploadError::Canceled, + } + } +} + +impl ResponseError for UploadError { + fn status_code(&self) -> StatusCode { + match self { + UploadError::NoFiles | UploadError::ContentType(_) | UploadError::Upload(_) => { + StatusCode::BAD_REQUEST + } + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()).json(serde_json::json!({ "msg": self.to_string() })) + } +} diff --git a/src/main.rs b/src/main.rs index eeece96..be23b13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,18 @@ use actix_form_data::{Field, Form, Value}; use actix_web::{ guard, - http::{ - header::{CacheControl, CacheDirective}, - StatusCode, - }, + http::header::{CacheControl, CacheDirective}, middleware::Logger, - web, App, HttpResponse, HttpServer, ResponseError, + web, App, HttpResponse, HttpServer, }; -use futures::stream::{Stream, StreamExt, TryStreamExt}; +use futures::stream::{Stream, TryStreamExt}; use log::{error, info}; -use sha2::Digest; -use std::{path::PathBuf, pin::Pin, sync::Arc}; +use std::path::PathBuf; + +mod error; +mod upload_manager; + +use self::{error::UploadError, upload_manager::UploadManager}; const ACCEPTED_MIMES: &[mime::Mime] = &[ mime::IMAGE_BMP, @@ -23,216 +24,34 @@ const ACCEPTED_MIMES: &[mime::Mime] = &[ const MEGABYTES: usize = 1024 * 1024; const HOURS: u32 = 60 * 60; -#[derive(Clone)] -struct UploadManager { - inner: Arc, -} - -struct UploadManagerInner { - hasher: sha2::Sha256, - base_dir: PathBuf, - db: sled::Db, -} - -#[derive(Debug, thiserror::Error)] -enum UploadError { - #[error("Invalid content type provided, {0}")] - ContentType(mime::Mime), - - #[error("Couln't upload file, {0}")] - Upload(String), - - #[error("Couldn't save file, {0}")] - Save(#[from] actix_fs::Error), - - #[error("Error in DB, {0}")] - Db(#[from] sled::Error), - - #[error("Error parsing string, {0}")] - ParseString(#[from] std::string::FromUtf8Error), - - #[error("Error processing image, {0}")] - Image(#[from] image::error::ImageError), - - #[error("Panic in blocking operation")] - Canceled, - - #[error("No files present in upload")] - NoFiles, - - #[error("Uploaded image could not be served, extension is missing")] - MissingExtension, -} - -impl From for UploadError { - fn from(e: actix_form_data::Error) -> Self { - UploadError::Upload(e.to_string()) - } -} - -impl From> for UploadError -where - T: Into + std::fmt::Debug, -{ - fn from(e: actix_web::error::BlockingError) -> Self { - match e { - actix_web::error::BlockingError::Error(e) => e.into(), - _ => UploadError::Canceled, - } - } -} - -impl ResponseError for UploadError { - fn status_code(&self) -> StatusCode { - match self { - UploadError::ContentType(_) | UploadError::Upload(_) => StatusCode::BAD_REQUEST, - _ => StatusCode::INTERNAL_SERVER_ERROR, - } +// Try writing to a file +async fn safe_save_file(path: PathBuf, bytes: bytes::Bytes) -> Result<(), UploadError> { + if let Some(path) = path.parent() { + // create the directory for the file + actix_fs::create_dir_all(path.to_owned()).await?; } - fn error_response(&self) -> HttpResponse { - HttpResponse::build(self.status_code()).json(serde_json::json!({ "msg": self.to_string() })) - } -} - -type UploadStream = Pin>>>; - -enum Dup { - Exists, - New, -} - -impl Dup { - fn exists(&self) -> bool { - match self { - Dup::Exists => true, - _ => false, - } - } -} - -impl UploadManager { - async fn new(mut root_dir: PathBuf) -> Result { - let mut sled_dir = root_dir.clone(); - sled_dir.push("db"); - let db = sled::open(sled_dir)?; - - root_dir.push("files"); - - actix_fs::create_dir_all(root_dir.clone()).await?; - - Ok(UploadManager { - inner: Arc::new(UploadManagerInner { - hasher: sha2::Sha256::new(), - base_dir: root_dir, - db, - }), - }) - } - - async fn upload( - &self, - _filename: String, - content_type: mime::Mime, - mut stream: UploadStream, - ) -> Result, UploadError> { - if ACCEPTED_MIMES.iter().all(|valid| *valid != content_type) { - return Err(UploadError::ContentType(content_type)); - } - - let mut bytes = bytes::BytesMut::new(); - - while let Some(res) = stream.next().await { - bytes.extend(res?); - } - - let bytes = bytes.freeze(); - - let hash = self.hash(bytes.clone()).await?; - - let (dup, path) = self.check_duplicate(hash, content_type).await?; - - if dup.exists() { - return Ok(Some(path)); - } - - let file = actix_fs::file::create(path.clone()).await?; - - if let Err(e) = actix_fs::file::write(file, bytes).await { - error!("Error saving file, {}", e); - actix_fs::remove_file(path).await?; + // Only write the file if it doesn't already exist + if let Err(e) = actix_fs::metadata(path.clone()).await { + if e.kind() != Some(std::io::ErrorKind::NotFound) { return Err(e.into()); } - - Ok(Some(path)) + } else { + return Ok(()); } - async fn hash(&self, bytes: bytes::Bytes) -> Result, UploadError> { - let mut hasher = self.inner.hasher.clone(); - let hash = web::block(move || { - hasher.input(&bytes); - Ok(hasher.result().to_vec()) as Result<_, UploadError> - }) - .await?; + // Open the file for writing + let file = actix_fs::file::create(path.clone()).await?; - Ok(hash) + // try writing + if let Err(e) = actix_fs::file::write(file, bytes).await { + error!("Error writing file, {}", e); + // remove file if writing failed before completion + actix_fs::remove_file(path).await?; + return Err(e.into()); } - async fn check_duplicate( - &self, - hash: Vec, - content_type: mime::Mime, - ) -> Result<(Dup, PathBuf), UploadError> { - let mut path = self.inner.base_dir.clone(); - let db = self.inner.db.clone(); - - let filename = self.next_file(content_type).await?; - - let filename2 = filename.clone(); - let res = web::block(move || { - db.compare_and_swap(hash, None as Option, Some(filename2.as_bytes())) - }) - .await?; - - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let name = String::from_utf8(ivec.to_vec())?; - path.push(name); - - return Ok((Dup::Exists, path)); - } - - path.push(filename); - - Ok((Dup::New, path)) - } - - async fn next_file(&self, content_type: mime::Mime) -> Result { - let base_dir = self.inner.base_dir.clone(); - use rand::distributions::{Alphanumeric, Distribution}; - let mut limit: usize = 10; - let rng = rand::thread_rng(); - loop { - let mut path = base_dir.clone(); - let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); - - let filename = format!("{}{}", s, to_ext(content_type.clone())); - - path.push(filename.clone()); - - if let Err(e) = actix_fs::metadata(path).await { - if e.kind() == Some(std::io::ErrorKind::NotFound) { - return Ok(filename); - } - return Err(e.into()); - } - - limit += 1; - } - } + Ok(()) } fn to_ext(mime: mime::Mime) -> &'static str { @@ -256,6 +75,7 @@ fn from_ext(ext: std::ffi::OsString) -> mime::Mime { } } +/// Handle responding to succesful uploads async fn upload(value: Value) -> Result { let images = value .map() @@ -279,12 +99,14 @@ async fn upload(value: Value) -> Result { Ok(HttpResponse::Created().json(serde_json::json!({ "msg": "ok", "files": files }))) } +/// Serve original files async fn serve( manager: web::Data, filename: web::Path, ) -> Result { - let mut path = manager.inner.base_dir.clone(); + let mut path = manager.image_dir(); path.push(filename.into_inner()); + let ext = path .extension() .ok_or(UploadError::MissingExtension)? @@ -296,47 +118,54 @@ async fn serve( Ok(srv_response(stream, ext)) } +/// Serve resized files async fn serve_resized( manager: web::Data, filename: web::Path<(u32, String)>, ) -> Result { use image::GenericImageView; - let mut path = manager.inner.base_dir.clone(); + let mut path = manager.image_dir(); let (size, name) = filename.into_inner(); path.push(size.to_string()); path.push(name.clone()); + let ext = path .extension() .ok_or(UploadError::MissingExtension)? .to_owned(); let ext = from_ext(ext); + // If the thumbnail doesn't exist, we need to create it if let Err(e) = actix_fs::metadata(path.clone()).await { if e.kind() != Some(std::io::ErrorKind::NotFound) { error!("Error looking up thumbnail, {}", e); return Err(e.into()); } - let mut original_path = manager.inner.base_dir.clone(); + let mut original_path = manager.image_dir(); original_path.push(name.clone()); + // Read the image file & produce a DynamicImage + // + // Drop bytes so we don't keep it around in memory longer than we need to let (img, format) = { let bytes = actix_fs::read(original_path.clone()).await?; let format = image::guess_format(&bytes)?; - let img = image::load_from_memory(&bytes)?; + let img = web::block(move || image::load_from_memory(&bytes)).await?; (img, format) }; + // return original image if resize target is larger if !img.in_bounds(size, size) { - // return original image if resize target is larger drop(img); let stream = actix_fs::read_to_stream(original_path).await?; return Ok(srv_response(stream, ext)); } + // perform thumbnail operation in a blocking thread let img_bytes: bytes::Bytes = web::block(move || { let mut bytes = std::io::Cursor::new(vec![]); img.thumbnail(size, size).write_to(&mut bytes, format)?; @@ -347,21 +176,10 @@ async fn serve_resized( let path2 = path.clone(); let img_bytes2 = img_bytes.clone(); + // Save the file in another task, we want to return the thumbnail now actix_rt::spawn(async move { - if let Some(path) = path2.parent() { - if let Err(e) = actix_fs::create_dir_all(path.to_owned()).await { - error!("Couldn't create directory for thumbnail, {}", e); - } - } - - if let Err(e) = actix_fs::metadata(path2.clone()).await { - if e.kind() == Some(std::io::ErrorKind::NotFound) { - if let Err(e) = actix_fs::write(path2, img_bytes2).await { - error!("Error saving image, {}", e); - } - } else { - error!("Error checking image, {}", e); - } + if let Err(e) = safe_save_file(path2, img_bytes2).await { + error!("Error saving file, {}", e); } }); @@ -378,6 +196,7 @@ async fn serve_resized( Ok(srv_response(stream, ext)) } +// A helper method to produce responses with proper cache headers fn srv_response(stream: S, ext: mime::Mime) -> HttpResponse where S: Stream> + Unpin + 'static, @@ -398,6 +217,9 @@ async fn main() -> Result<(), anyhow::Error> { env_logger::init(); let manager = UploadManager::new("data/".to_string().into()).await?; + // Create a new Multipart Form validator + // + // This form is expecting a single array field, 'images' with at most 10 files in it let manager2 = manager.clone(); let form = Form::new() .max_files(10) diff --git a/src/upload_manager.rs b/src/upload_manager.rs new file mode 100644 index 0000000..792f9c2 --- /dev/null +++ b/src/upload_manager.rs @@ -0,0 +1,173 @@ +use crate::{error::UploadError, safe_save_file, to_ext, ACCEPTED_MIMES}; +use actix_web::web; +use futures::stream::{Stream, StreamExt}; +use sha2::Digest; +use std::{path::PathBuf, pin::Pin, sync::Arc}; + +#[derive(Clone)] +pub struct UploadManager { + inner: Arc, +} + +struct UploadManagerInner { + hasher: sha2::Sha256, + image_dir: PathBuf, + db: sled::Db, +} + +type UploadStream = Pin>>>; + +enum Dup { + Exists, + New, +} + +impl Dup { + fn exists(&self) -> bool { + match self { + Dup::Exists => true, + _ => false, + } + } +} + +impl UploadManager { + /// Get the image directory + pub(crate) fn image_dir(&self) -> PathBuf { + self.inner.image_dir.clone() + } + + /// Create a new UploadManager + pub(crate) async fn new(mut root_dir: PathBuf) -> Result { + let mut sled_dir = root_dir.clone(); + sled_dir.push("db"); + // sled automatically creates it's own directories + // + // This is technically a blocking operation but it's fine because it happens before we + // start handling requests + let db = sled::open(sled_dir)?; + + root_dir.push("files"); + + // Ensure file dir exists + actix_fs::create_dir_all(root_dir.clone()).await?; + + Ok(UploadManager { + inner: Arc::new(UploadManagerInner { + hasher: sha2::Sha256::new(), + image_dir: root_dir, + db, + }), + }) + } + + /// Upload the file, discarding bytes if it's already present, or saving if it's new + pub(crate) async fn upload( + &self, + _filename: String, + content_type: mime::Mime, + mut stream: UploadStream, + ) -> Result, UploadError> { + if ACCEPTED_MIMES.iter().all(|valid| *valid != content_type) { + return Err(UploadError::ContentType(content_type)); + } + + // -- READ IN BYTES FROM CLIENT -- + let mut bytes = bytes::BytesMut::new(); + + while let Some(res) = stream.next().await { + bytes.extend(res?); + } + + let bytes = bytes.freeze(); + + // -- DUPLICATE CHECKS -- + + // Cloning bytes is fine because it's actually a pointer + let hash = self.hash(bytes.clone()).await?; + + let (dup, path) = self.check_duplicate(hash, content_type).await?; + + // bail early with path to existing file if this is a duplicate + if dup.exists() { + return Ok(Some(path)); + } + + // TODO: validate image before saving + + // -- WRITE NEW FILE -- + safe_save_file(path.clone(), bytes).await?; + + Ok(Some(path)) + } + + // produce a sh256sum of the uploaded file + async fn hash(&self, bytes: bytes::Bytes) -> Result, UploadError> { + let mut hasher = self.inner.hasher.clone(); + let hash = web::block(move || { + hasher.input(&bytes); + Ok(hasher.result().to_vec()) as Result<_, UploadError> + }) + .await?; + + Ok(hash) + } + + // check for an already-uploaded image with this hash, returning the path to the target file + async fn check_duplicate( + &self, + hash: Vec, + content_type: mime::Mime, + ) -> Result<(Dup, PathBuf), UploadError> { + let mut path = self.inner.image_dir.clone(); + let db = self.inner.db.clone(); + + let filename = self.next_file(content_type).await?; + + let filename2 = filename.clone(); + let res = web::block(move || { + db.compare_and_swap(hash, None as Option, Some(filename2.as_bytes())) + }) + .await?; + + if let Err(sled::CompareAndSwapError { + current: Some(ivec), + .. + }) = res + { + let name = String::from_utf8(ivec.to_vec())?; + path.push(name); + + return Ok((Dup::Exists, path)); + } + + path.push(filename); + + Ok((Dup::New, path)) + } + + // generate a short filename that isn't already in-use + async fn next_file(&self, content_type: mime::Mime) -> Result { + let image_dir = self.inner.image_dir.clone(); + use rand::distributions::{Alphanumeric, Distribution}; + let mut limit: usize = 10; + let rng = rand::thread_rng(); + loop { + let mut path = image_dir.clone(); + let s: String = Alphanumeric.sample_iter(rng).take(limit).collect(); + + let filename = format!("{}{}", s, to_ext(content_type.clone())); + + path.push(filename.clone()); + + if let Err(e) = actix_fs::metadata(path).await { + if e.kind() == Some(std::io::ErrorKind::NotFound) { + return Ok(filename); + } + return Err(e.into()); + } + + limit += 1; + } + } +}