From 1559d57f0a84fa700e55404473261f9bbe2cc673 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 16 Aug 2023 15:12:16 -0500 Subject: [PATCH] Don't overwrite existing variants --- src/generate.rs | 20 +++++++++++++------- src/migrate_store.rs | 3 ++- src/repo.rs | 6 ++++-- src/repo/migrate.rs | 2 +- src/repo/sled.rs | 20 +++++++++++++------- 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index c09500ff..19760123 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -4,7 +4,7 @@ use crate::{ error::{Error, UploadError}, ffmpeg::ThumbnailFormat, formats::{InputProcessableFormat, InternalVideoFormat}, - repo::{Alias, ArcRepo, Hash}, + repo::{Alias, ArcRepo, Hash, VariantAlreadyExists}, store::{Identifier, Store}, }; use actix_web::web::Bytes; @@ -167,13 +167,19 @@ async fn process( let identifier = store .save_bytes(bytes.clone(), details.media_type()) .await?; + + if let Err(VariantAlreadyExists) = repo + .relate_variant_identifier( + hash, + thumbnail_path.to_string_lossy().to_string(), + &identifier, + ) + .await? + { + store.remove(&identifier).await?; + } + repo.relate_details(&identifier, &details).await?; - repo.relate_variant_identifier( - hash, - thumbnail_path.to_string_lossy().to_string(), - &identifier, - ) - .await?; guard.disarm(); diff --git a/src/migrate_store.rs b/src/migrate_store.rs index 1805c261..15080680 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -253,7 +253,8 @@ where Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; repo.remove_variant(hash.clone(), variant.clone()).await?; - repo.relate_variant_identifier(hash.clone(), variant, &new_identifier) + let _ = repo + .relate_variant_identifier(hash.clone(), variant, &new_identifier) .await?; repo.mark_migrated(&identifier, &new_identifier).await?; diff --git a/src/repo.rs b/src/repo.rs index fd1ee070..45c8eb69 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -43,6 +43,8 @@ pub(crate) struct DeleteToken { pub(crate) struct HashAlreadyExists; #[derive(Debug)] pub(crate) struct AliasAlreadyExists; +#[derive(Debug)] +pub(crate) struct VariantAlreadyExists; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct UploadId { @@ -474,7 +476,7 @@ pub(crate) trait HashRepo: BaseRepo { hash: Hash, variant: String, identifier: &dyn Identifier, - ) -> Result<(), StoreError>; + ) -> Result, StoreError>; async fn variant_identifier( &self, hash: Hash, @@ -531,7 +533,7 @@ where hash: Hash, variant: String, identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + ) -> Result, StoreError> { T::relate_variant_identifier(self, hash, variant, identifier).await } diff --git a/src/repo/migrate.rs b/src/repo/migrate.rs index eb226fd1..63812ff8 100644 --- a/src/repo/migrate.rs +++ b/src/repo/migrate.rs @@ -173,7 +173,7 @@ async fn do_migrate_hash_04( } for (variant, identifier) in variants { - new_repo + let _ = new_repo .relate_variant_identifier(hash.clone(), variant.clone(), &identifier) .await?; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 2ba7d33f..9c2a16bc 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -4,7 +4,7 @@ use crate::{ hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, - UploadResult, VariantAccessRepo, + UploadResult, VariantAccessRepo, VariantAlreadyExists, }, serde_str::Serde, store::StoreError, @@ -1053,18 +1053,24 @@ impl HashRepo for SledRepo { hash: Hash, variant: String, identifier: &dyn Identifier, - ) -> Result<(), StoreError> { + ) -> Result, StoreError> { let hash = hash.to_bytes(); let key = variant_key(&hash, &variant); let value = identifier.to_bytes()?; - b!( - self.hash_variant_identifiers, - hash_variant_identifiers.insert(key, value) - ); + let hash_variant_identifiers = self.hash_variant_identifiers.clone(); - Ok(()) + actix_rt::task::spawn_blocking(move || { + hash_variant_identifiers + .compare_and_swap(key, Option::<&[u8]>::None, Some(value)) + .map(|res| res.map_err(|_| VariantAlreadyExists)) + }) + .await + .map_err(|_| RepoError::Canceled)? + .map_err(SledError::from) + .map_err(RepoError::from) + .map_err(StoreError::from) } #[tracing::instrument(level = "trace", skip(self))]