diff --git a/Cargo.lock b/Cargo.lock index 03701f0..34fbfa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,9 +20,9 @@ dependencies = [ [[package]] name = "actix-form-data" -version = "0.6.0-beta.5" +version = "0.6.0-beta.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d6edb0d647fc6899a8dafc38c4c492f33f5cce500776b16c744f9be40c9fc70" +checksum = "c6552c90f3283caa08a8114d49f82cb3eacd6038168bc0ffb199b9304f615be2" dependencies = [ "actix-multipart", "actix-rt", diff --git a/src/main.rs b/src/main.rs index c8bc42e..ba9a705 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,7 +45,7 @@ use self::{ config::{Config, Format}, error::UploadError, middleware::{Deadline, Internal, Tracing}, - upload_manager::{Details, UploadManager}, + upload_manager::{Details, UploadManager, UploadManagerSession}, validate::{image_webp, video_mp4}, }; @@ -252,7 +252,7 @@ fn to_ext(mime: mime::Mime) -> Result<&'static str, UploadError> { /// Handle responding to succesful uploads #[instrument(skip(value, manager))] async fn upload( - value: Value, + value: Value, manager: web::Data, ) -> Result { let images = value @@ -262,15 +262,14 @@ async fn upload( .ok_or(UploadError::NoFiles)?; let mut files = Vec::new(); - for image in images.into_iter().filter_map(|i| i.file()) { - if let Some(alias) = image - .saved_as - .as_ref() - .and_then(|s| s.file_name()) - .and_then(|s| s.to_str()) - { + let images = images + .into_iter() + .filter_map(|i| i.file()) + .collect::>(); + for image in &images { + if let Some(alias) = image.result.alias() { info!("Uploaded {} as {:?}", image.filename, alias); - let delete_token = manager.delete_token(alias.to_owned()).await?; + let delete_token = image.result.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; let mut path = manager.image_dir(); @@ -300,6 +299,9 @@ async fn upload( } } + for image in images { + image.result.succeed(); + } Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": files @@ -329,9 +331,10 @@ async fn download( let stream = Box::pin(once(fut)); let permit = PROCESS_SEMAPHORE.acquire().await?; - let alias = manager.upload(stream).await?; + let session = manager.session().upload(stream).await?; + let alias = session.alias().unwrap().to_owned(); drop(permit); - let delete_token = manager.delete_token(alias.clone()).await?; + let delete_token = session.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; let mut path = manager.image_dir(); @@ -349,6 +352,7 @@ async fn download( new_details }; + session.succeed(); Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", "files": [{ @@ -802,11 +806,7 @@ async fn main() -> Result<(), anyhow::Error> { let permit = PROCESS_SEMAPHORE.acquire().await?; - let res = manager.upload(stream).await.map(|alias| { - let mut path = PathBuf::new(); - path.push(alias); - Some(path) - }); + let res = manager.session().upload(stream).await; drop(permit); drop(entered); @@ -836,13 +836,9 @@ async fn main() -> Result<(), anyhow::Error> { let permit = PROCESS_SEMAPHORE.acquire().await?; let res = manager + .session() .import(filename, content_type, validate_imports, stream) - .await - .map(|alias| { - let mut path = PathBuf::new(); - path.push(alias); - Some(path) - }); + .await; drop(permit); drop(entered); diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 95ad1a5..8922bf6 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -33,6 +33,55 @@ pub struct UploadManager { inner: Arc, } +pub struct UploadManagerSession { + manager: UploadManager, + alias: Option, + finished: bool, +} + +impl UploadManagerSession { + pub(crate) fn succeed(mut self) { + self.finished = true; + } + + pub(crate) fn alias(&self) -> Option<&str> { + self.alias.as_deref() + } +} + +impl Drop for UploadManagerSession { + fn drop(&mut self) { + if self.finished { + return; + } + + if let Some(alias) = self.alias.take() { + let manager = self.manager.clone(); + let span = Span::current(); + actix_rt::spawn(async move { + let entered = span.entered(); + // undo alias -> hash mapping + debug!("Remove alias -> hash mapping"); + if let Ok(Some(hash)) = manager.inner.alias_tree.remove(&alias) { + // undo alias -> id mapping + debug!("Remove alias -> id mapping"); + let key = alias_id_key(&alias); + if let Ok(Some(id)) = manager.inner.alias_tree.remove(&key) { + // undo hash/id -> alias mapping + debug!("Remove hash/id -> alias mapping"); + let id = String::from_utf8_lossy(&id); + let key = alias_key(&hash, &id); + let _ = manager.inner.main_tree.remove(&key); + } + + let _ = manager.check_delete_files(hash).await; + } + drop(entered); + }); + } + } +} + pub struct Hasher { inner: I, hasher: D, @@ -446,6 +495,10 @@ impl UploadManager { }) .await??; + self.check_delete_files(hash).await + } + + async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), UploadError> { // -- CHECK IF ANY OTHER ALIASES EXIST -- let main_tree = self.inner.main_tree.clone(); let (start, end) = alias_key_bounds(&hash); @@ -491,123 +544,6 @@ impl UploadManager { Ok(()) } - /// Generate a delete token for an alias - #[instrument(skip(self))] - pub(crate) async fn delete_token(&self, alias: String) -> Result { - debug!("Generating delete token"); - use rand::distributions::{Alphanumeric, Distribution}; - let rng = rand::thread_rng(); - let s: String = Alphanumeric - .sample_iter(rng) - .take(10) - .map(char::from) - .collect(); - let delete_token = s.clone(); - - debug!("Saving delete token"); - let alias_tree = self.inner.alias_tree.clone(); - let key = delete_key(&alias); - let res = web::block(move || { - alias_tree.compare_and_swap( - key.as_bytes(), - None as Option, - Some(s.as_bytes()), - ) - }) - .await??; - - if let Err(sled::CompareAndSwapError { - current: Some(ivec), - .. - }) = res - { - let s = String::from_utf8(ivec.to_vec())?; - - debug!("Returning existing delete token, {}", s); - return Ok(s); - } - - debug!("Returning new delete token, {}", delete_token); - Ok(delete_token) - } - - /// Upload the file while preserving the filename, optionally validating the uploaded image - #[instrument(skip(self, stream))] - pub(crate) async fn import( - &self, - alias: String, - content_type: mime::Mime, - validate: bool, - mut stream: UploadStream, - ) -> Result - where - UploadError: From, - E: Unpin + 'static, - { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (content_type, validated_reader) = - crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone()) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone()); - - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Storing alias"); - self.add_existing_alias(&hash, &alias).await?; - - debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; - - // Return alias to file - Ok(alias) - } - - /// Upload the file, discarding bytes if it's already present, or saving if it's new - #[instrument(skip(self, stream))] - pub(crate) async fn upload(&self, mut stream: UploadStream) -> Result - where - UploadError: From, - { - let mut bytes_mut = actix_web::web::BytesMut::new(); - - debug!("Reading stream to memory"); - while let Some(res) = stream.next().await { - let bytes = res?; - bytes_mut.extend_from_slice(&bytes); - } - - debug!("Validating bytes"); - let (content_type, validated_reader) = - crate::validate::validate_image_bytes(bytes_mut.freeze(), self.inner.format.clone()) - .await?; - - let mut hasher_reader = Hasher::new(validated_reader, self.inner.hasher.clone()); - - let tmpfile = crate::tmp_file(); - safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; - let hash = hasher_reader.finalize_reset().await?; - - debug!("Adding alias"); - let alias = self.add_alias(&hash, content_type.clone()).await?; - - debug!("Saving file"); - self.save_upload(tmpfile, hash, content_type).await?; - - // Return alias to file - Ok(alias) - } - /// Fetch the real on-disk filename given an alias #[instrument(skip(self))] pub(crate) async fn from_alias(&self, alias: String) -> Result { @@ -628,6 +564,14 @@ impl UploadManager { Ok(filename) } + pub(crate) fn session(&self) -> UploadManagerSession { + UploadManagerSession { + manager: self.clone(), + alias: None, + finished: false, + } + } + // Find image variants and remove them from the DB and the disk #[instrument(skip(self))] async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), UploadError> { @@ -682,6 +626,138 @@ impl UploadManager { } Ok(()) } +} + +impl UploadManagerSession { + /// Generate a delete token for an alias + #[instrument(skip(self))] + pub(crate) async fn delete_token(&self) -> Result { + let alias = self + .alias + .clone() + .ok_or_else(|| UploadError::MissingAlias)?; + + debug!("Generating delete token"); + use rand::distributions::{Alphanumeric, Distribution}; + let rng = rand::thread_rng(); + let s: String = Alphanumeric + .sample_iter(rng) + .take(10) + .map(char::from) + .collect(); + let delete_token = s.clone(); + + debug!("Saving delete token"); + let alias_tree = self.manager.inner.alias_tree.clone(); + let key = delete_key(&alias); + let res = web::block(move || { + alias_tree.compare_and_swap( + key.as_bytes(), + None as Option, + Some(s.as_bytes()), + ) + }) + .await??; + + if let Err(sled::CompareAndSwapError { + current: Some(ivec), + .. + }) = res + { + let s = String::from_utf8(ivec.to_vec())?; + + debug!("Returning existing delete token, {}", s); + return Ok(s); + } + + debug!("Returning new delete token, {}", delete_token); + Ok(delete_token) + } + + /// Upload the file while preserving the filename, optionally validating the uploaded image + #[instrument(skip(self, stream))] + pub(crate) async fn import( + mut self, + alias: String, + content_type: mime::Mime, + validate: bool, + mut stream: UploadStream, + ) -> Result + where + UploadError: From, + E: Unpin + 'static, + { + let mut bytes_mut = actix_web::web::BytesMut::new(); + + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } + + debug!("Validating bytes"); + let (content_type, validated_reader) = crate::validate::validate_image_bytes( + bytes_mut.freeze(), + self.manager.inner.format.clone(), + ) + .await?; + + let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); + + let tmpfile = crate::tmp_file(); + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; + + debug!("Storing alias"); + self.alias = Some(alias.clone()); + self.add_existing_alias(&hash, &alias).await?; + + debug!("Saving file"); + self.save_upload(tmpfile, hash, content_type).await?; + + // Return alias to file + Ok(self) + } + + /// Upload the file, discarding bytes if it's already present, or saving if it's new + #[instrument(skip(self, stream))] + pub(crate) async fn upload( + mut self, + mut stream: UploadStream, + ) -> Result + where + UploadError: From, + { + let mut bytes_mut = actix_web::web::BytesMut::new(); + + debug!("Reading stream to memory"); + while let Some(res) = stream.next().await { + let bytes = res?; + bytes_mut.extend_from_slice(&bytes); + } + + debug!("Validating bytes"); + let (content_type, validated_reader) = crate::validate::validate_image_bytes( + bytes_mut.freeze(), + self.manager.inner.format.clone(), + ) + .await?; + + let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); + + let tmpfile = crate::tmp_file(); + safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?; + let hash = hasher_reader.finalize_reset().await?; + + debug!("Adding alias"); + self.add_alias(&hash, content_type.clone()).await?; + + debug!("Saving file"); + self.save_upload(tmpfile, hash, content_type).await?; + + // Return alias to file + Ok(self) + } // check duplicates & store image if new async fn save_upload( @@ -699,7 +775,7 @@ impl UploadManager { } // -- WRITE NEW FILE -- - let mut real_path = self.image_dir(); + let mut real_path = self.manager.image_dir(); real_path.push(name); crate::safe_move_file(tmpfile, real_path).await?; @@ -714,7 +790,7 @@ impl UploadManager { hash: Hash, content_type: mime::Mime, ) -> Result<(Dup, String), UploadError> { - let main_tree = self.inner.main_tree.clone(); + let main_tree = self.manager.inner.main_tree.clone(); let filename = self.next_file(content_type).await?; let filename2 = filename.clone(); @@ -739,7 +815,7 @@ impl UploadManager { return Ok((Dup::Exists, name)); } - let fname_tree = self.inner.filename_tree.clone(); + let fname_tree = self.manager.inner.filename_tree.clone(); let filename2 = filename.clone(); debug!("Saving filename -> hash relation"); web::block(move || fname_tree.insert(filename2, hash.inner)).await??; @@ -750,7 +826,7 @@ impl UploadManager { // generate a short filename that isn't already in-use #[instrument(skip(self, content_type))] async fn next_file(&self, content_type: mime::Mime) -> Result { - let image_dir = self.image_dir(); + let image_dir = self.manager.image_dir(); use rand::distributions::{Alphanumeric, Distribution}; let mut limit: usize = 10; let mut rng = rand::thread_rng(); @@ -783,9 +859,9 @@ impl UploadManager { #[instrument(skip(self, hash, alias))] async fn add_existing_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { - self.save_alias(hash, alias).await??; + self.save_alias_hash_mapping(hash, alias).await??; - self.store_alias(hash, alias).await?; + self.store_hash_id_alias_mapping(hash, alias).await?; Ok(()) } @@ -795,30 +871,40 @@ impl UploadManager { // This will help if multiple 'users' upload the same file, and one of them wants to delete it #[instrument(skip(self, hash, content_type))] async fn add_alias( - &self, + &mut self, hash: &Hash, content_type: mime::Mime, - ) -> Result { + ) -> Result<(), UploadError> { let alias = self.next_alias(hash, content_type).await?; - self.store_alias(hash, &alias).await?; + self.store_hash_id_alias_mapping(hash, &alias).await?; - Ok(alias) + Ok(()) } // Add a pre-defined alias to an existin file // // DANGER: this can cause BAD BAD BAD conflicts if the same alias is used for multiple files #[instrument(skip(self, hash))] - async fn store_alias(&self, hash: &Hash, alias: &str) -> Result<(), UploadError> { + async fn store_hash_id_alias_mapping( + &self, + hash: &Hash, + alias: &str, + ) -> Result<(), UploadError> { let alias = alias.to_string(); loop { debug!("hash -> alias save loop"); - let db = self.inner.db.clone(); + let db = self.manager.inner.db.clone(); let id = web::block(move || db.generate_id()).await??.to_string(); + let alias_tree = self.manager.inner.alias_tree.clone(); + let key = alias_id_key(&alias); + let id2 = id.clone(); + debug!("Saving alias -> id mapping"); + web::block(move || alias_tree.insert(key.as_bytes(), id2.as_bytes())).await??; + let key = alias_key(&hash.inner, &id); - let main_tree = self.inner.main_tree.clone(); + let main_tree = self.manager.inner.main_tree.clone(); let alias2 = alias.clone(); debug!("Saving hash/id -> alias mapping"); let res = web::block(move || { @@ -827,11 +913,6 @@ impl UploadManager { .await??; if res.is_ok() { - let alias_tree = self.inner.alias_tree.clone(); - let key = alias_id_key(&alias); - debug!("Saving alias -> id mapping"); - web::block(move || alias_tree.insert(key.as_bytes(), id.as_bytes())).await??; - break; } @@ -844,7 +925,7 @@ impl UploadManager { // Generate an alias to the file #[instrument(skip(self, hash, content_type))] async fn next_alias( - &self, + &mut self, hash: &Hash, content_type: mime::Mime, ) -> Result { @@ -859,8 +940,9 @@ impl UploadManager { .map(char::from) .collect(); let alias = file_name(s, content_type.clone())?; + self.alias = Some(alias.clone()); - let res = self.save_alias(hash, &alias).await?; + let res = self.save_alias_hash_mapping(hash, &alias).await?; if res.is_ok() { return Ok(alias); @@ -873,16 +955,16 @@ impl UploadManager { // Save an alias to the database #[instrument(skip(self, hash))] - async fn save_alias( + async fn save_alias_hash_mapping( &self, hash: &Hash, alias: &str, ) -> Result, UploadError> { - let tree = self.inner.alias_tree.clone(); + let tree = self.manager.inner.alias_tree.clone(); let vec = hash.inner.clone(); let alias = alias.to_string(); - debug!("Saving alias"); + debug!("Saving alias -> hash mapping"); let res = web::block(move || { tree.compare_and_swap(alias.as_bytes(), None as Option, Some(vec)) })