From 3ecefcb64e6308aeb54812dd81e5ad46d5a1f395 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 2 Mar 2024 13:27:58 -0600 Subject: [PATCH] Save blurhashes in repo, improve some error responses, simplify extracting aliases --- src/error.rs | 6 +- src/error_code.rs | 3 + src/init_metrics.rs | 10 + src/lib.rs | 260 +++++++----------- src/repo.rs | 11 + src/repo/postgres.rs | 45 +++ .../V0012__add_blurhash_to_hashes.rs | 15 + src/repo/postgres/schema.rs | 1 + src/repo/sled.rs | 19 ++ 9 files changed, 210 insertions(+), 160 deletions(-) create mode 100644 src/repo/postgres/migrations/V0012__add_blurhash_to_hashes.rs diff --git a/src/error.rs b/src/error.rs index d107a9c..f2bb7b3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -123,6 +123,9 @@ pub(crate) enum UploadError { #[error("No files present in upload")] NoFiles, + #[error("Media has not been proxied")] + MissingProxy, + #[error("Requested a file that doesn't exist")] MissingAlias, @@ -186,6 +189,7 @@ impl UploadError { Self::Semaphore => ErrorCode::PROCESS_SEMAPHORE_CLOSED, Self::Canceled => ErrorCode::PANIC, Self::NoFiles => ErrorCode::VALIDATE_NO_FILES, + Self::MissingProxy => ErrorCode::PROXY_NOT_FOUND, Self::MissingAlias => ErrorCode::ALIAS_NOT_FOUND, Self::MissingIdentifier => ErrorCode::LOST_FILE, Self::InvalidToken => ErrorCode::INVALID_DELETE_TOKEN, @@ -256,7 +260,7 @@ impl ResponseError for Error { Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Exiftool(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Process(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, - Some(UploadError::MissingAlias) => StatusCode::NOT_FOUND, + Some(UploadError::MissingProxy | UploadError::MissingAlias) => StatusCode::NOT_FOUND, Some(UploadError::Ffmpeg(e)) if e.is_not_found() => StatusCode::NOT_FOUND, Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN, Some(UploadError::Range) => StatusCode::RANGE_NOT_SATISFIABLE, diff --git a/src/error_code.rs b/src/error_code.rs index b1e0d06..62059e2 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -119,6 +119,9 @@ impl ErrorCode { pub(crate) const VALIDATE_NO_FILES: ErrorCode = ErrorCode { code: "validate-no-files", }; + pub(crate) const PROXY_NOT_FOUND: ErrorCode = ErrorCode { + code: "proxy-not-found", + }; pub(crate) const ALIAS_NOT_FOUND: ErrorCode = ErrorCode { code: "alias-not-found", }; diff --git a/src/init_metrics.rs b/src/init_metrics.rs index 18cd2d3..a699fa7 100644 --- a/src/init_metrics.rs +++ b/src/init_metrics.rs @@ -255,6 +255,14 @@ fn describe_postgres() { POSTGRES_VARIANTS_REMOVE, "Timings for removing a variant for a provided hash" ); + metrics::describe_histogram!( + POSTGRES_HASHES_RELATE_BLURHASH, + "Timings for relating a blurhash with a provided hash" + ); + metrics::describe_histogram!( + POSTGRES_HASHES_BLURHASH, + "Timings for fetching a blurhash for a provided hash" + ); metrics::describe_histogram!( POSTGRES_HASHES_RELATE_MOTION_IDENTIFIER, "Timings for relating a still image identifier for a provided hash representing a video" @@ -438,6 +446,8 @@ pub(crate) const POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER: &str = pub(crate) const POSTGRES_VARIANTS_IDENTIFIER: &str = "pict-rs.postgres.variants.identifier"; pub(crate) const POSTGRES_VARIANTS_FOR_HASH: &str = "pict-rs.postgres.variants.for-hash"; pub(crate) const POSTGRES_VARIANTS_REMOVE: &str = "pict-rs.postgres.variants.remove"; +pub(crate) const POSTGRES_HASHES_RELATE_BLURHASH: &str = "pict-rs.postgres.hashes.relate-blurhash"; +pub(crate) const POSTGRES_HASHES_BLURHASH: &str = "pict-rs.postgres.hashes.blurhash"; pub(crate) const POSTGRES_HASHES_RELATE_MOTION_IDENTIFIER: &str = "pict-rs.postgres.hashes.relate-motion-identifier"; pub(crate) const POSTGRES_HASHES_MOTION_IDENTIFIER: &str = diff --git a/src/lib.rs b/src/lib.rs index 70b1359..927abd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -713,28 +713,15 @@ async fn process_details( ext: web::Path, state: web::Data>, ) -> Result { - let alias = match source { - ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { - Serde::into_inner(alias) - } - ProcessSource::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().json(&serde_json::json!({ - "msg": "No images associated with provided proxy url" - }))); - }; - alias - } - }; + let alias = alias_from_query(source.into(), &state).await?; let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?; - let Some(hash) = state.repo.hash(&alias).await? else { - // Invalid alias - return Ok(HttpResponse::NotFound().json(&serde_json::json!({ - "msg": "No images associated with provided alias", - }))); - }; + let hash = state + .repo + .hash(&alias) + .await? + .ok_or(UploadError::MissingAlias)?; let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); @@ -785,32 +772,7 @@ async fn process( state: web::Data>, process_map: web::Data, ) -> Result { - let alias = match source { - ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { - Serde::into_inner(alias) - } - ProcessSource::Proxy { proxy } => { - let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { - alias - } else if !state.config.server.read_only { - let stream = download_stream(proxy.as_str(), &state).await?; - - let (alias, _, _) = ingest_inline(stream, &state).await?; - - state.repo.relate_url(proxy, alias.clone()).await?; - - alias - } else { - return Err(UploadError::ReadOnly.into()); - }; - - if !state.config.server.read_only { - state.repo.accessed_alias(alias.clone()).await?; - } - - alias - } - }; + let alias = proxy_alias_from_query(source.into(), &state).await?; let (format, thumbnail_path, thumbnail_args) = prepare_process(&state.config, operations, ext.as_str())?; @@ -981,20 +943,10 @@ async fn process_backgrounded( /// Fetch file details #[tracing::instrument(name = "Fetching query details", skip(state))] async fn details_query( - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().json(&serde_json::json!({ - "msg": "Provided proxy URL has not been cached", - }))); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; let details = ensure_details(&state, &alias).await?; @@ -1016,33 +968,10 @@ async fn details( #[tracing::instrument(name = "Serving file query", skip(state))] async fn serve_query( range: Option>, - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { - alias - } else if !state.config.server.read_only { - let stream = download_stream(proxy.as_str(), &state).await?; - - let (alias, _, _) = ingest_inline(stream, &state).await?; - - state.repo.relate_url(proxy, alias.clone()).await?; - - alias - } else { - return Err(UploadError::ReadOnly.into()); - }; - - if !state.config.server.read_only { - state.repo.accessed_alias(alias.clone()).await?; - } - - alias - } - }; + let alias = proxy_alias_from_query(query, &state).await?; do_serve(range, alias, state).await } @@ -1092,18 +1021,10 @@ async fn do_serve( #[tracing::instrument(name = "Serving query file headers", skip(state))] async fn serve_query_head( range: Option>, - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().finish()); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; do_serve_head(range, alias, state).await } @@ -1275,12 +1196,13 @@ fn srv_head( builder } -async fn blurhash( - web::Query(alias_query): web::Query, - state: web::Data>, -) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), +#[tracing::instrument(level = "DEBUG", skip(state))] +async fn proxy_alias_from_query( + alias_query: AliasQuery, + state: &State, +) -> Result { + match alias_query { + AliasQuery::Alias { alias } => Ok(Serde::into_inner(alias)), AliasQuery::Proxy { proxy } => { let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { alias @@ -1300,16 +1222,37 @@ async fn blurhash( state.repo.accessed_alias(alias.clone()).await?; } - alias + Ok(alias) } - }; + } +} - let details = ensure_details(&state, &alias).await?; - let blurhash = blurhash::generate(&state, &alias, &details).await?; +async fn blurhash( + web::Query(query): web::Query, + state: web::Data>, +) -> Result { + let alias = proxy_alias_from_query(query, &state).await?; + + let hash = state + .repo + .hash(&alias) + .await? + .ok_or(UploadError::MissingAlias)?; + + let blurhash = if let Some(blurhash) = state.repo.blurhash(hash.clone()).await? { + blurhash + } else { + let details = ensure_details(&state, &alias).await?; + let blurhash = blurhash::generate(&state, &alias, &details).await?; + let blurhash: Arc = Arc::from(blurhash); + state.repo.relate_blurhash(hash, blurhash.clone()).await?; + + blurhash + }; Ok(HttpResponse::Ok().json(serde_json::json!({ "msg": "ok", - "blurhash": blurhash, + "blurhash": blurhash.as_ref(), }))) } @@ -1378,6 +1321,17 @@ enum AliasQuery { Alias { alias: Serde }, } +impl From for AliasQuery { + fn from(value: ProcessSource) -> Self { + match value { + ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { + Self::Alias { alias } + } + ProcessSource::Proxy { proxy } => Self::Proxy { proxy }, + } + } +} + #[tracing::instrument(name = "Setting 404 Image", skip(state))] async fn set_not_found( json: web::Json, @@ -1392,15 +1346,16 @@ async fn set_not_found( AliasQuery::Proxy { .. } => { return Ok(HttpResponse::BadRequest().json(serde_json::json!({ "msg": "Cannot use proxied media as Not Found image", + "code": "proxy-not-allowed", }))); } }; - if state.repo.hash(&alias).await?.is_none() { - return Ok(HttpResponse::BadRequest().json(serde_json::json!({ - "msg": "No hash associated with provided alias" - }))); - } + state + .repo + .hash(&alias) + .await? + .ok_or(UploadError::MissingAlias)?; state .repo @@ -1412,32 +1367,44 @@ async fn set_not_found( }))) } +async fn alias_from_query(alias_query: AliasQuery, state: &State) -> Result { + match alias_query { + AliasQuery::Alias { alias } => Ok(Serde::into_inner(alias)), + AliasQuery::Proxy { proxy } => { + let alias = state + .repo + .related(proxy) + .await? + .ok_or(UploadError::MissingProxy)?; + + if !state.config.server.read_only { + state.repo.accessed_alias(alias.clone()).await?; + } + + Ok(alias) + } + } +} + #[tracing::instrument(name = "Purging file", skip(state))] async fn purge( - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().finish()); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; let aliases = state.repo.aliases_from_alias(&alias).await?; - let Some(hash) = state.repo.hash(&alias).await? else { - return Ok(HttpResponse::BadRequest().json(&serde_json::json!({ - "msg": "No images associated with provided alias", - }))); - }; + let hash = state + .repo + .hash(&alias) + .await? + .ok_or(UploadError::MissingAlias)?; + queue::cleanup_hash(&state.repo, hash).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ @@ -1448,22 +1415,14 @@ async fn purge( #[tracing::instrument(name = "Deleting alias", skip(state))] async fn delete_alias( - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().finish()); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; if let Some(token) = state.repo.delete_token(&alias).await? { queue::cleanup_alias(&state.repo, alias, token).await?; @@ -1478,18 +1437,10 @@ async fn delete_alias( #[tracing::instrument(name = "Fetching aliases", skip(state))] async fn aliases( - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().finish()); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; let aliases = state.repo.aliases_from_alias(&alias).await?; @@ -1501,25 +1452,16 @@ async fn aliases( #[tracing::instrument(name = "Fetching identifier", skip(state))] async fn identifier( - web::Query(alias_query): web::Query, + web::Query(query): web::Query, state: web::Data>, ) -> Result { - let alias = match alias_query { - AliasQuery::Alias { alias } => Serde::into_inner(alias), - AliasQuery::Proxy { proxy } => { - let Some(alias) = state.repo.related(proxy).await? else { - return Ok(HttpResponse::NotFound().finish()); - }; - alias - } - }; + let alias = alias_from_query(query, &state).await?; - let Some(identifier) = state.repo.identifier_from_alias(&alias).await? else { - // Invalid alias - return Ok(HttpResponse::NotFound().json(serde_json::json!({ - "msg": "No identifiers associated with provided alias" - }))); - }; + let identifier = state + .repo + .identifier_from_alias(&alias) + .await? + .ok_or(UploadError::MissingAlias)?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", diff --git a/src/repo.rs b/src/repo.rs index aaf556b..48c4ebc 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -658,6 +658,9 @@ pub(crate) trait HashRepo: BaseRepo { async fn variants(&self, hash: Hash) -> Result)>, RepoError>; async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; + async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError>; + async fn blurhash(&self, hash: Hash) -> Result>, RepoError>; + async fn relate_motion_identifier( &self, hash: Hash, @@ -739,6 +742,14 @@ where T::remove_variant(self, hash, variant).await } + async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError> { + T::relate_blurhash(self, hash, blurhash).await + } + + async fn blurhash(&self, hash: Hash) -> Result>, RepoError> { + T::blurhash(self, hash).await + } + async fn relate_motion_identifier( &self, hash: Hash, diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 3714b0f..02ee3fd 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -930,6 +930,51 @@ impl HashRepo for PostgresRepo { Ok(()) } + #[tracing::instrument(level = "debug", skip(self))] + async fn relate_blurhash( + &self, + input_hash: Hash, + input_blurhash: Arc, + ) -> Result<(), RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.get_connection().await?; + + diesel::update(hashes) + .filter(hash.eq(&input_hash)) + .set(blurhash.eq(input_blurhash.as_ref())) + .execute(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_HASHES_RELATE_BLURHASH) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .map_err(PostgresError::Diesel)?; + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn blurhash(&self, input_hash: Hash) -> Result>, RepoError> { + use schema::hashes::dsl::*; + + let mut conn = self.get_connection().await?; + + let opt = hashes + .select(blurhash) + .filter(hash.eq(&input_hash)) + .get_result::>(&mut conn) + .with_metrics(crate::init_metrics::POSTGRES_HASHES_BLURHASH) + .with_timeout(Duration::from_secs(5)) + .await + .map_err(|_| PostgresError::DbTimeout)? + .optional() + .map_err(PostgresError::Diesel)? + .flatten() + .map(Arc::from); + + Ok(opt) + } + #[tracing::instrument(level = "debug", skip(self))] async fn relate_motion_identifier( &self, diff --git a/src/repo/postgres/migrations/V0012__add_blurhash_to_hashes.rs b/src/repo/postgres/migrations/V0012__add_blurhash_to_hashes.rs new file mode 100644 index 0000000..fa70797 --- /dev/null +++ b/src/repo/postgres/migrations/V0012__add_blurhash_to_hashes.rs @@ -0,0 +1,15 @@ +use barrel::backend::Pg; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.change_table("hashes", |t| { + t.add_column( + "blurhash", + types::text().size(60).nullable(true).unique(false), + ); + }); + + m.make::().to_string() +} diff --git a/src/repo/postgres/schema.rs b/src/repo/postgres/schema.rs index 78f8951..ec1d7c3 100644 --- a/src/repo/postgres/schema.rs +++ b/src/repo/postgres/schema.rs @@ -27,6 +27,7 @@ diesel::table! { identifier -> Text, motion_identifier -> Nullable, created_at -> Timestamp, + blurhash -> Nullable, } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 96a8ba0..f16b01b 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -91,6 +91,7 @@ pub(crate) struct SledRepo { hash_identifiers: Tree, hash_variant_identifiers: Tree, hash_motion_identifiers: Tree, + hash_blurhashes: Tree, aliases: Tree, alias_hashes: Tree, alias_delete_tokens: Tree, @@ -132,6 +133,7 @@ impl SledRepo { hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?, hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?, + hash_blurhashes: db.open_tree("pict-rs-hash-blurhashes-tree")?, aliases: db.open_tree("pict-rs-aliases-tree")?, alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, @@ -1354,6 +1356,23 @@ impl HashRepo for SledRepo { Ok(()) } + #[tracing::instrument(level = "trace", skip(self))] + async fn relate_blurhash(&self, hash: Hash, blurhash: Arc) -> Result<(), RepoError> { + b!( + self.hash_blurhashes, + hash_blurhashes.insert(hash.to_bytes(), blurhash.as_bytes()) + ); + + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn blurhash(&self, hash: Hash) -> Result>, RepoError> { + let opt = b!(self.hash_blurhashes, hash_blurhashes.get(hash.to_ivec())); + + Ok(opt.map(try_into_arc_str).transpose()?) + } + #[tracing::instrument(level = "trace", skip(self))] async fn relate_motion_identifier( &self,