From da876fd55320a4900436b1f80518d2c7d8dde3b1 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 14 Aug 2023 14:25:19 -0500 Subject: [PATCH] Make it compile --- src/concurrent_processor.rs | 15 +++--- src/formats.rs | 6 ++- src/generate.rs | 8 +-- src/ingest.rs | 37 +++++++------- src/ingest/hasher.rs | 42 +++++++++------- src/lib.rs | 9 ++-- src/migrate_store.rs | 56 +++++++-------------- src/queue.rs | 19 +++---- src/queue/cleanup.rs | 21 +++----- src/repo.rs | 2 +- src/repo/hash.rs | 98 +++++++++++++++++++++++++++++++++++-- src/repo/sled.rs | 22 +++++---- 12 files changed, 203 insertions(+), 132 deletions(-) diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 06eeeff0..4bb10d8d 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -1,6 +1,7 @@ use crate::{ details::Details, error::{Error, UploadError}, + repo::Hash, }; use actix_web::web; use dashmap::{mapref::entry::Entry, DashMap}; @@ -16,7 +17,7 @@ use tracing::Span; type OutcomeReceiver = Receiver<(Details, web::Bytes)>; -type ProcessMapKey = (Vec, PathBuf); +type ProcessMapKey = (Hash, PathBuf); type ProcessMapInner = DashMap; @@ -32,14 +33,14 @@ impl ProcessMap { pub(super) async fn process( &self, - hash: &[u8], + hash: Hash, path: PathBuf, fut: Fut, ) -> Result<(Details, web::Bytes), Error> where Fut: Future>, { - let key = (hash.to_vec(), path.clone()); + let key = (hash.clone(), path.clone()); let (sender, receiver) = flume::bounded(1); @@ -51,8 +52,8 @@ impl ProcessMap { let span = tracing::info_span!( "Processing image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), + hash = ?hash, + path = ?path, completed = &tracing::field::Empty, ); @@ -63,8 +64,8 @@ impl ProcessMap { Entry::Occupied(receiver) => { let span = tracing::info_span!( "Waiting for processed image", - hash = &tracing::field::debug(&hex::encode(hash)), - path = &tracing::field::debug(&path), + hash = ?hash, + path = ?path, ); let receiver = receiver.get().clone().into_recv_async(); diff --git a/src/formats.rs b/src/formats.rs index 3247c311..3ea46e57 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -25,10 +25,12 @@ pub(crate) enum InputFile { Video(VideoFormat), } -#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] pub(crate) enum InternalFormat { - Image(ImageFormat), Animation(AnimationFormat), + Image(ImageFormat), Video(InternalVideoFormat), } diff --git a/src/generate.rs b/src/generate.rs index 3498c1b8..be122972 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -4,7 +4,7 @@ use crate::{ error::{Error, UploadError}, ffmpeg::ThumbnailFormat, formats::{InputProcessableFormat, InternalVideoFormat}, - repo::{Alias, FullRepo}, + repo::{Alias, FullRepo, Hash}, store::Store, }; use actix_web::web::Bytes; @@ -51,7 +51,7 @@ pub(crate) async fn generate( input_format: Option, thumbnail_format: Option, media: &crate::config::Media, - hash: R::Bytes, + hash: Hash, ) -> Result<(Details, Bytes), Error> { let process_fut = process( repo, @@ -67,7 +67,7 @@ pub(crate) async fn generate( ); let (details, bytes) = process_map - .process(hash.as_ref(), thumbnail_path, process_fut) + .process(hash, thumbnail_path, process_fut) .await?; Ok((details, bytes)) @@ -85,7 +85,7 @@ async fn process( input_format: Option, thumbnail_format: Option, media: &crate::config::Media, - hash: R::Bytes, + hash: Hash, ) -> Result<(Details, Bytes), Error> { let guard = MetricsGuard::guard(); let permit = crate::PROCESS_SEMAPHORE.acquire().await; diff --git a/src/ingest.rs b/src/ingest.rs index e797791b..714af5ef 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -3,12 +3,11 @@ use crate::{ either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, - repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, + repo::{Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo}, store::Store, }; use actix_web::web::Bytes; use futures_util::{Stream, StreamExt}; -use sha2::{Digest, Sha256}; use tracing::{Instrument, Span}; mod hasher; @@ -22,7 +21,7 @@ where { repo: R, delete_token: DeleteToken, - hash: Option>, + hash: Option, alias: Option, identifier: Option, } @@ -97,8 +96,8 @@ where Either::right(validated_reader) }; - let hasher_reader = Hasher::new(processed_reader, Sha256::new()); - let hasher = hasher_reader.hasher(); + let hasher_reader = Hasher::new(processed_reader); + let state = hasher_reader.state(); let identifier = store .save_async_read(hasher_reader, input_type.media_type()) @@ -114,14 +113,16 @@ where identifier: Some(identifier.clone()), }; - let hash = hasher.borrow_mut().finalize_reset().to_vec(); + let (hash, size) = state.borrow_mut().finalize_reset(); - save_upload(&mut session, repo, store, &hash, &identifier).await?; + let hash = Hash::new(hash, size, input_type); + + save_upload(&mut session, repo, store, hash.clone(), &identifier).await?; if let Some(alias) = declared_alias { - session.add_existing_alias(&hash, alias).await? + session.add_existing_alias(hash, alias).await? } else { - session.create_alias(&hash, input_type).await? + session.create_alias(hash, input_type).await? }; Ok(session) @@ -132,14 +133,14 @@ async fn save_upload( session: &mut Session, repo: &R, store: &S, - hash: &[u8], + hash: Hash, identifier: &S::Identifier, ) -> Result<(), Error> where S: Store, R: FullRepo, { - if HashRepo::create(repo, hash.to_vec().into(), identifier) + if HashRepo::create(repo, hash.clone(), identifier) .await? .is_err() { @@ -150,7 +151,7 @@ where } // Set hash after upload uniquness check so we don't clean existing files on failure - session.hash = Some(Vec::from(hash)); + session.hash = Some(hash); Ok(()) } @@ -177,10 +178,8 @@ where } #[tracing::instrument(skip(self, hash))] - async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> { - let hash: R::Bytes = hash.to_vec().into(); - - AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone()) + async fn add_existing_alias(&mut self, hash: Hash, alias: Alias) -> Result<(), Error> { + AliasRepo::create(&self.repo, &alias, &self.delete_token, hash) .await? .map_err(|_| UploadError::DuplicateAlias)?; @@ -190,9 +189,7 @@ where } #[tracing::instrument(level = "debug", skip(self, hash))] - async fn create_alias(&mut self, hash: &[u8], input_type: InternalFormat) -> Result<(), Error> { - let hash: R::Bytes = hash.to_vec().into(); - + async fn create_alias(&mut self, hash: Hash, input_type: InternalFormat) -> Result<(), Error> { loop { let alias = Alias::generate(input_type.file_extension().to_string()); @@ -232,7 +229,7 @@ where tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { actix_rt::spawn( async move { - let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + let _ = crate::queue::cleanup_hash(&repo, hash).await; } .instrument(cleanup_span), ) diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 6123e57b..b5b809c8 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -1,4 +1,4 @@ -use sha2::{digest::FixedOutputReset, Digest}; +use sha2::{digest::FixedOutputReset, Digest, Sha256}; use std::{ cell::RefCell, pin::Pin, @@ -7,43 +7,47 @@ use std::{ }; use tokio::io::{AsyncRead, ReadBuf}; -struct State { - hasher: D, +pub(super) struct State { + hasher: Sha256, size: u64, } pin_project_lite::pin_project! { - pub(crate) struct Hasher { + pub(crate) struct Hasher { #[pin] inner: I, - state: Rc>>, + state: Rc>, } } -impl Hasher -where - D: Digest + FixedOutputReset + Send + 'static, -{ - pub(super) fn new(reader: I, digest: D) -> Self { +impl Hasher { + pub(super) fn new(reader: I) -> Self { Hasher { inner: reader, state: Rc::new(RefCell::new(State { - hasher: digest, + hasher: Sha256::new(), size: 0, })), } } - pub(super) fn state(&self) -> Rc>> { + pub(super) fn state(&self) -> Rc> { Rc::clone(&self.state) } } -impl AsyncRead for Hasher +impl State { + pub(super) fn finalize_reset(&mut self) -> ([u8; 32], u64) { + let arr = self.hasher.finalize_fixed_reset().into(); + + (arr, self.size) + } +} + +impl AsyncRead for Hasher where I: AsyncRead, - D: Digest, { fn poll_read( mut self: Pin<&mut Self>, @@ -94,24 +98,26 @@ mod test { #[test] fn hasher_works() { - let hash = test_on_arbiter!(async move { + let (hash, size) = test_on_arbiter!(async move { let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; - let mut reader = Hasher::new(file1, Sha256::new()); + let mut reader = Hasher::new(file1); tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; - Ok(reader.state().borrow_mut().hasher.finalize_reset().to_vec()) as std::io::Result<_> + Ok(reader.state().borrow_mut().finalize_reset()) as std::io::Result<_> }) .unwrap(); let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); let mut vec = Vec::new(); file.read_to_end(&mut vec).unwrap(); + let correct_size = vec.len() as u64; let mut hasher = Sha256::new(); hasher.update(vec); - let correct_hash = hasher.finalize_reset().to_vec(); + let correct_hash: [u8; 32] = hasher.finalize_reset().into(); assert_eq!(hash, correct_hash); + assert_eq!(size, correct_size); } } diff --git a/src/lib.rs b/src/lib.rs index 89818cab..d1adaab2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,8 +68,8 @@ use self::{ migrate_store::migrate_store, queue::queue_generate, repo::{ - sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, - Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, + sled::SledRepo, Alias, AliasAccessRepo, DeleteToken, FullRepo, Hash, HashRepo, + IdentifierRepo, Repo, SettingsRepo, UploadId, UploadResult, VariantAccessRepo, }, serde_str::Serde, store::{ @@ -696,7 +696,7 @@ async fn process_details( Ok(HttpResponse::Ok().json(&details)) } -async fn not_found_hash(repo: &R) -> Result, Error> { +async fn not_found_hash(repo: &R) -> Result, Error> { let Some(not_found) = repo.get(NOT_FOUND_KEY).await? else { return Ok(None); }; @@ -1115,8 +1115,7 @@ async fn do_serve( let Some(identifier) = repo.identifier(hash.clone()).await? else { tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) + "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); crate::queue::cleanup_hash(&repo, hash).await?; return Ok(HttpResponse::NotFound().finish()); diff --git a/src/migrate_store.rs b/src/migrate_store.rs index a716b123..27846f1a 100644 --- a/src/migrate_store.rs +++ b/src/migrate_store.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ details::Details, error::{Error, UploadError}, - repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, + repo::{Hash, HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, store::{Identifier, Store}, }; @@ -125,7 +125,7 @@ where let mut joinset = tokio::task::JoinSet::new(); while let Some(hash) = stream.next().await { - let hash = hash?.as_ref().to_vec(); + let hash = hash?; if joinset.len() >= 32 { if let Some(res) = joinset.join_next().await { @@ -149,11 +149,8 @@ where Ok(()) } -#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] -async fn migrate_hash( - state: &MigrateState, - hash: Vec, -) -> Result<(), Error> +#[tracing::instrument(skip(state))] +async fn migrate_hash(state: &MigrateState, hash: Hash) -> Result<(), Error> where S1: Store, S2: Store, @@ -175,14 +172,13 @@ where let current_index = index.fetch_add(1, Ordering::Relaxed); - let original_identifier = match repo.identifier(hash.clone().into()).await { + let original_identifier = match repo.identifier(hash.clone()).await { Ok(Some(identifier)) => identifier, Ok(None) => { tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) + "Original File identifier for hash {hash:?} is missing, queue cleanup task", ); - crate::queue::cleanup_hash(repo, hash.clone().into()).await?; + crate::queue::cleanup_hash(repo, hash.clone()).await?; return Ok(()); } Err(e) => return Err(e.into()), @@ -221,24 +217,21 @@ where } } - if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { + if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; - repo.relate_motion_identifier(hash.clone().into(), &new_identifier) + repo.relate_motion_identifier(hash.clone(), &new_identifier) .await?; repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + tracing::warn!("Skipping motion file for hash {hash:?}"); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}"); return Err(e); } Err(MigrateError::From(e)) => { @@ -253,30 +246,22 @@ where } } - for (variant, identifier) in repo.variants(hash.clone().into()).await? { + for (variant, identifier) in repo.variants(hash.clone()).await? { if !repo.is_migrated(&identifier).await? { match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await { Ok(new_identifier) => { migrate_details(repo, &identifier, &new_identifier).await?; - repo.remove_variant(hash.clone().into(), variant.clone()) - .await?; - repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) + repo.remove_variant(hash.clone(), variant.clone()).await?; + repo.relate_variant_identifier(hash.clone(), variant, &new_identifier) .await?; repo.mark_migrated(&identifier, &new_identifier).await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!( - "Skipping variant {} for hash {}", - variant, - hex::encode(&hash) - ); + tracing::warn!("Skipping variant {variant} for hash {hash:?}",); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}",); return Err(e); } Err(MigrateError::From(e)) => { @@ -303,19 +288,16 @@ where { Ok(new_identifier) => { migrate_details(repo, &original_identifier, &new_identifier).await?; - repo.update_identifier(hash.clone().into(), &new_identifier) + repo.update_identifier(hash.clone(), &new_identifier) .await?; repo.mark_migrated(&original_identifier, &new_identifier) .await?; } Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { - tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + tracing::warn!("Skipping original file for hash {hash:?}"); } Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); + tracing::warn!("Error generating details for motion file for hash {hash:?}",); return Err(e); } Err(MigrateError::From(e)) => { diff --git a/src/queue.rs b/src/queue.rs index ba932ce8..509c191f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -4,7 +4,7 @@ use crate::{ error::Error, formats::InputProcessableFormat, repo::{ - Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, JobId, QueueRepo, + Alias, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, JobId, QueueRepo, UploadId, }, serde_str::Serde, @@ -54,7 +54,7 @@ const PROCESS_QUEUE: &str = "process"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Cleanup { Hash { - hash: Base64Bytes, + hash: Hash, }, Identifier { identifier: Base64Bytes, @@ -64,7 +64,7 @@ enum Cleanup { token: Serde, }, Variant { - hash: Base64Bytes, + hash: Hash, #[serde(skip_serializing_if = "Option::is_none")] variant: Option, }, @@ -101,10 +101,8 @@ pub(crate) async fn cleanup_alias( Ok(()) } -pub(crate) async fn cleanup_hash(repo: &R, hash: R::Bytes) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Hash { - hash: Base64Bytes(hash.as_ref().to_vec()), - })?; +pub(crate) async fn cleanup_hash(repo: &R, hash: Hash) -> Result<(), Error> { + let job = serde_json::to_vec(&Cleanup::Hash { hash })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } @@ -122,13 +120,10 @@ pub(crate) async fn cleanup_identifier( async fn cleanup_variants( repo: &R, - hash: R::Bytes, + hash: Hash, variant: Option, ) -> Result<(), Error> { - let job = serde_json::to_vec(&Cleanup::Variant { - hash: Base64Bytes(hash.as_ref().to_vec()), - variant, - })?; + let job = serde_json::to_vec(&Cleanup::Variant { hash, variant })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) } diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index 3bb051c0..df955df5 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -3,7 +3,7 @@ use crate::{ error::{Error, UploadError}, queue::{Base64Bytes, Cleanup, LocalBoxFuture}, repo::{ - Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo, + Alias, AliasAccessRepo, AliasRepo, DeleteToken, FullRepo, Hash, HashRepo, IdentifierRepo, VariantAccessRepo, }, serde_str::Serde, @@ -24,9 +24,7 @@ where Box::pin(async move { match serde_json::from_slice(job) { Ok(job) => match job { - Cleanup::Hash { - hash: Base64Bytes(in_hash), - } => hash::(repo, in_hash).await?, + Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, Cleanup::Identifier { identifier: Base64Bytes(in_identifier), } => identifier(repo, store, in_identifier).await?, @@ -41,10 +39,9 @@ where ) .await? } - Cleanup::Variant { - hash: Base64Bytes(hash), - variant, - } => hash_variant::(repo, hash, variant).await?, + Cleanup::Variant { hash, variant } => { + hash_variant::(repo, hash, variant).await? + } Cleanup::AllVariants => all_variants::(repo).await?, Cleanup::OutdatedVariants => outdated_variants::(repo, configuration).await?, Cleanup::OutdatedProxies => outdated_proxies::(repo, configuration).await?, @@ -89,13 +86,11 @@ where } #[tracing::instrument(skip_all)] -async fn hash(repo: &R, hash: Vec) -> Result<(), Error> +async fn hash(repo: &R, hash: Hash) -> Result<(), Error> where R: FullRepo, S: Store, { - let hash: R::Bytes = hash.into(); - let aliases = repo.for_hash(hash.clone()).await?; if !aliases.is_empty() { @@ -221,15 +216,13 @@ where #[tracing::instrument(skip_all)] async fn hash_variant( repo: &R, - hash: Vec, + hash: Hash, target_variant: Option, ) -> Result<(), Error> where R: FullRepo, S: Store, { - let hash: R::Bytes = hash.into(); - if let Some(target_variant) = target_variant { if let Some(identifier) = repo .variant_identifier::(hash.clone(), target_variant.clone()) diff --git a/src/repo.rs b/src/repo.rs index 3640f57c..8ef7c146 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -208,7 +208,7 @@ where #[async_trait::async_trait(?Send)] pub(crate) trait VariantAccessRepo: BaseRepo { - type VariantAccessStream: Stream>; + type VariantAccessStream: Stream>; async fn accessed(&self, hash: Hash, variant: String) -> Result<(), RepoError>; diff --git a/src/repo/hash.rs b/src/repo/hash.rs index 91c12e36..12c7550f 100644 --- a/src/repo/hash.rs +++ b/src/repo/hash.rs @@ -1,7 +1,7 @@ use crate::formats::InternalFormat; use std::sync::Arc; -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct Hash { hash: Arc<[u8; 32]>, size: u64, @@ -9,8 +9,21 @@ pub(crate) struct Hash { } impl Hash { - pub(crate) fn new(hash: Arc<[u8; 32]>, size: u64, format: InternalFormat) -> Self { - Self { hash, format, size } + pub(crate) fn new(hash: [u8; 32], size: u64, format: InternalFormat) -> Self { + Self { + hash: Arc::new(hash), + format, + size, + } + } + + #[cfg(test)] + pub(crate) fn test_value() -> Self { + Self { + hash: Arc::new([0u8; 32]), + format: InternalFormat::Image(crate::formats::ImageFormat::Jxl), + size: 1234, + } } pub(super) fn to_bytes(&self) -> Vec { @@ -63,3 +76,82 @@ impl std::fmt::Debug for Hash { .finish() } } + +#[derive(serde::Deserialize, serde::Serialize)] +struct SerdeHash { + hash: String, + size: u64, + format: InternalFormat, +} + +impl serde::Serialize for Hash { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let hash = hex::encode(&self.hash[..]); + + SerdeHash { + hash, + size: self.size, + format: self.format, + } + .serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for Hash { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + use serde::de::Error; + + let SerdeHash { hash, size, format } = SerdeHash::deserialize(deserializer)?; + let hash = hex::decode(hash) + .map_err(D::Error::custom)? + .try_into() + .map_err(|_| D::Error::custom("Invalid hash size"))?; + + Ok(Hash::new(hash, size, format)) + } +} + +#[cfg(test)] +mod tests { + use super::Hash; + + #[test] + fn round_trip() { + let hashes = [ + Hash { + hash: std::sync::Arc::from([0u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Image(crate::formats::ImageFormat::Jxl), + }, + Hash { + hash: std::sync::Arc::from([255u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Animation( + crate::formats::AnimationFormat::Avif, + ), + }, + Hash { + hash: std::sync::Arc::from([99u8; 32]), + size: 1234, + format: crate::formats::InternalFormat::Video( + crate::formats::InternalVideoFormat::Mp4, + ), + }, + ]; + + for hash in hashes { + let bytes = hash.to_bytes(); + let new_hash = Hash::from_bytes(&bytes).expect("From bytes"); + let new_bytes = new_hash.to_bytes(); + + assert_eq!(hash, new_hash, "Hash mismatch"); + assert_eq!(bytes, new_bytes, "Bytes mismatch"); + } + } +} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 10c851f7..636105d0 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,9 +1,10 @@ use crate::{ details::MaybeHumanDate, repo::{ - hash::Hash, Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo, - HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, MigrationRepo, QueueRepo, - SettingsRepo, UploadId, UploadRepo, UploadResult, + hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, + Details, FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, JobId, + MigrationRepo, ProxyRepo, QueueRepo, RepoError, SettingsRepo, UploadId, UploadRepo, + UploadResult, VariantAccessRepo, }, serde_str::Serde, store::StoreError, @@ -24,8 +25,6 @@ use std::{ use tokio::{sync::Notify, task::JoinHandle}; use url::Url; -use super::{AliasAccessRepo, ProxyRepo, RepoError, VariantAccessRepo}; - macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ let $ident = $self.$ident.clone(); @@ -317,7 +316,7 @@ impl futures_util::Stream for AliasAccessStream { } impl futures_util::Stream for VariantAccessStream { - type Item = Result<(IVec, String), RepoError>; + type Item = Result<(Hash, String), RepoError>; fn poll_next( mut self: Pin<&mut Self>, @@ -888,9 +887,12 @@ pub(crate) enum VariantKeyError { #[error("Invalid utf8 in Variant")] Utf8, + + #[error("Hash format is invalid")] + InvalidHash, } -fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyError> { +fn parse_variant_access_key(bytes: IVec) -> Result<(Hash, String), VariantKeyError> { if bytes.len() < 8 { return Err(VariantKeyError::TooShort); } @@ -904,6 +906,8 @@ fn parse_variant_access_key(bytes: IVec) -> Result<(IVec, String), VariantKeyErr let hash = bytes.subslice(8, hash_len); + let hash = Hash::from_ivec(hash).ok_or(VariantKeyError::InvalidHash)?; + let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len); if variant_len == 0 { @@ -1421,10 +1425,10 @@ impl From for SledError { mod tests { #[test] fn round_trip() { - let hash = sled::IVec::from(b"some hash value"); + let hash = crate::repo::Hash::test_value(); let variant = String::from("some string value"); - let key = super::variant_access_key(&hash, &variant); + let key = super::variant_access_key(&hash.to_bytes(), &variant); let (out_hash, out_variant) = super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes");