From 669b3fb86f4b2820566ae3512b42dd6dd072a0ba Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 1 Oct 2022 22:47:52 -0500 Subject: [PATCH] Fix webp metadata stripping, more tracing cleanup --- src/backgrounded.rs | 55 +++++++++--------- src/concurrent_processor.rs | 4 +- src/ffmpeg.rs | 2 +- src/generate.rs | 4 +- src/ingest.rs | 109 ++++++++++++++++++------------------ src/main.rs | 67 +++++++++++----------- src/process.rs | 4 +- src/queue.rs | 43 +++++++++++--- src/queue/cleanup.rs | 17 ++++-- src/queue/process.rs | 7 ++- src/repo/sled.rs | 10 ++-- src/store/object_store.rs | 12 ++-- src/validate.rs | 18 ++++-- 13 files changed, 195 insertions(+), 157 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 4049fbe..5d4420a 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -71,38 +71,41 @@ where R: FullRepo + 'static, S: Store, { - #[tracing::instrument(name = "Drop Backgrounded", skip(self), fields(identifier = ?self.identifier, upload_id = ?self.upload_id))] fn drop(&mut self) { - if let Some(identifier) = self.identifier.take() { - let repo = self.repo.clone(); + if self.identifier.is_some() || self.upload_id.is_some() { + let cleanup_parent_span = + tracing::info_span!(parent: None, "Dropped backgrounded cleanup"); + cleanup_parent_span.follows_from(Span::current()); - let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Identifier", identifier = ?identifier); - cleanup_span.follows_from(Span::current()); + if let Some(identifier) = self.identifier.take() { + let repo = self.repo.clone(); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; - } - .instrument(cleanup_span), - ) - }); - } + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Backgrounded cleanup Identifier", identifier = ?identifier); - if let Some(upload_id) = self.upload_id { - let repo = self.repo.clone(); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + } + .instrument(cleanup_span), + ) + }); + } - let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); - cleanup_span.follows_from(Span::current()); + if let Some(upload_id) = self.upload_id { + let repo = self.repo.clone(); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = repo.claim(upload_id).await; - } - .instrument(cleanup_span), - ) - }); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id); + + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + let _ = repo.claim(upload_id).await; + } + .instrument(cleanup_span), + ) + }); + } } } } diff --git a/src/concurrent_processor.rs b/src/concurrent_processor.rs index 8ec4ae1..c018b6c 100644 --- a/src/concurrent_processor.rs +++ b/src/concurrent_processor.rs @@ -51,7 +51,7 @@ where vacant.insert(Vec::new()); let span = tracing::info_span!( "Processing image", - hash = &tracing::field::debug(&hash), + hash = &tracing::field::debug(&hex::encode(&hash)), path = &tracing::field::debug(&path), completed = &tracing::field::Empty, ); @@ -64,7 +64,7 @@ where occupied.get_mut().push(tx); let span = tracing::info_span!( "Waiting for processed image", - hash = &tracing::field::debug(&hash), + hash = &tracing::field::debug(&hex::encode(&hash)), path = &tracing::field::debug(&path), ); (Some(rx), span) diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index a9d9413..f6034a1 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -337,7 +337,7 @@ pub(crate) async fn trancsocde_bytes( Ok(Box::pin(clean_reader)) } -#[tracing::instrument] +#[tracing::instrument(skip(store))] pub(crate) async fn thumbnail( store: S, from: S::Identifier, diff --git a/src/generate.rs b/src/generate.rs index d43d965..4ebfe14 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -13,7 +13,7 @@ use tokio::io::AsyncReadExt; use tracing::Instrument; #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(hash))] +#[tracing::instrument(skip(repo, store, hash))] pub(crate) async fn generate( repo: &R, store: &S, @@ -44,7 +44,7 @@ pub(crate) async fn generate( } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(hash))] +#[tracing::instrument(skip(repo, store, hash))] async fn process( repo: &R, store: &S, diff --git a/src/ingest.rs b/src/ingest.rs index 40b19f8..e9ec0b3 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -41,7 +41,7 @@ where Ok(buf.into_bytes()) } -#[tracing::instrument(skip(stream))] +#[tracing::instrument(skip(repo, store, stream))] pub(crate) async fn ingest( repo: &R, store: &S, @@ -58,7 +58,7 @@ where let bytes = aggregate(stream).await?; - tracing::debug!("Validating bytes"); + tracing::trace!("Validating bytes"); let (input_type, validated_reader) = crate::validate::validate_bytes( bytes, CONFIG.media.format, @@ -114,7 +114,7 @@ where Ok(session) } -#[tracing::instrument] +#[tracing::instrument(level = "trace", skip_all)] async fn save_upload( repo: &R, store: &S, @@ -151,27 +151,27 @@ where self.alias.as_ref() } - #[tracing::instrument] + #[tracing::instrument(level = "trace", skip_all)] pub(crate) async fn delete_token(&self) -> Result { let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?; - tracing::debug!("Generating delete token"); + tracing::trace!("Generating delete token"); let delete_token = DeleteToken::generate(); - tracing::debug!("Saving delete token"); + tracing::trace!("Saving delete token"); let res = self.repo.relate_delete_token(&alias, &delete_token).await?; if res.is_err() { let delete_token = self.repo.delete_token(&alias).await?; - tracing::debug!("Returning existing delete token, {:?}", delete_token); + tracing::trace!("Returning existing delete token, {:?}", delete_token); return Ok(delete_token); } - tracing::debug!("Returning new delete token, {:?}", delete_token); + tracing::trace!("Returning new delete token, {:?}", delete_token); Ok(delete_token) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash))] async fn add_existing_alias( &mut self, hash: &[u8], @@ -194,15 +194,13 @@ where Ok(()) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, hash))] async fn create_alias( &mut self, hash: &[u8], input_type: ValidInputType, is_cached: bool, ) -> Result<(), Error> { - tracing::debug!("Alias gen loop"); - loop { let alias = Alias::generate(input_type.as_ext().to_string()); @@ -219,7 +217,7 @@ where return Ok(()); } - tracing::debug!("Alias exists, regenerating"); + tracing::trace!("Alias exists, regenerating"); } } } @@ -229,61 +227,62 @@ where R: FullRepo + 'static, S: Store, { - #[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))] fn drop(&mut self) { - let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); - cleanup_parent_span.follows_from(Span::current()); + if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { + let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); + cleanup_parent_span.follows_from(Span::current()); - if let Some(hash) = self.hash.take() { - let repo = self.repo.clone(); + if let Some(hash) = self.hash.take() { + let repo = self.repo.clone(); - let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; - } - .instrument(cleanup_span), - ) - }); - } + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_hash(&repo, hash.into()).await; + } + .instrument(cleanup_span), + ) + }); + } - if let Some(alias) = self.alias.take() { - let repo = self.repo.clone(); + if let Some(alias) = self.alias.take() { + let repo = self.repo.clone(); - let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - if let Ok(token) = repo.delete_token(&alias).await { - let _ = crate::queue::cleanup_alias(&repo, alias, token).await; - } else { - let token = DeleteToken::generate(); - if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + if let Ok(token) = repo.delete_token(&alias).await { let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } else { + let token = DeleteToken::generate(); + if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await { + let _ = crate::queue::cleanup_alias(&repo, alias, token).await; + } } } - } - .instrument(cleanup_span), - ) - }); - } + .instrument(cleanup_span), + ) + }); + } - if let Some(identifier) = self.identifier.take() { - let repo = self.repo.clone(); + if let Some(identifier) = self.identifier.take() { + let repo = self.repo.clone(); - let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); + let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier); - tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { - actix_rt::spawn( - async move { - let _ = crate::queue::cleanup_identifier(&repo, identifier).await; - } - .instrument(cleanup_span), - ) - }); + tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + let _ = crate::queue::cleanup_identifier(&repo, identifier).await; + } + .instrument(cleanup_span), + ) + }); + } } } } diff --git a/src/main.rs b/src/main.rs index 346de4d..788e3ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,6 @@ use std::{ time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; -use tracing::{debug, info, instrument}; use tracing_actix_web::TracingLogger; use tracing_awc::Tracing; use tracing_futures::Instrument; @@ -96,15 +95,15 @@ async fn ensure_details( let details = repo.details(&identifier).await?; if let Some(details) = details { - debug!("details exist"); + tracing::debug!("details exist"); Ok(details) } else { - debug!("generating new details from {:?}", identifier); + tracing::debug!("generating new details from {:?}", identifier); let hint = details_hint(alias); let new_details = Details::from_store(store.clone(), identifier.clone(), hint).await?; - debug!("storing details for {:?}", identifier); + tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; - debug!("stored"); + tracing::debug!("stored"); Ok(new_details) } } @@ -217,7 +216,7 @@ impl FormData for Import { } /// Handle responding to succesful uploads -#[instrument(name = "Uploaded files", skip(value))] +#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] async fn upload( Multipart(Upload(value)): Multipart>, repo: web::Data, @@ -227,7 +226,7 @@ async fn upload( } /// Handle responding to succesful uploads -#[instrument(name = "Imported files", skip(value))] +#[tracing::instrument(name = "Imported files", skip(value, repo, store))] async fn import( Multipart(Import(value)): Multipart>, repo: web::Data, @@ -237,7 +236,7 @@ async fn import( } /// Handle responding to succesful uploads -#[instrument(name = "Uploaded files", skip(value))] +#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] async fn handle_upload( value: Value>, repo: web::Data, @@ -257,7 +256,7 @@ async fn handle_upload( for image in &images { if let Some(alias) = image.result.alias() { - info!("Uploaded {} as {:?}", image.filename, alias); + tracing::debug!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token().await?; let details = ensure_details(&repo, &store, alias).await?; @@ -329,7 +328,7 @@ impl FormData for BackgroundedUpload { } } -#[instrument(name = "Uploaded files", skip(value))] +#[tracing::instrument(name = "Uploaded files", skip(value, repo))] async fn upload_backgrounded( Multipart(BackgroundedUpload(value)): Multipart>, repo: web::Data, @@ -377,7 +376,7 @@ struct ClaimQuery { } /// Claim a backgrounded upload -#[instrument(name = "Waiting on upload", skip(repo))] +#[tracing::instrument(name = "Waiting on upload", skip_all)] async fn claim_upload( repo: web::Data, store: web::Data, @@ -426,7 +425,7 @@ struct UrlQuery { } /// download an image from a URL -#[instrument(name = "Downloading file", skip(client, repo))] +#[tracing::instrument(name = "Downloading file", skip(client, repo, store))] async fn download( client: web::Data, repo: web::Data, @@ -450,7 +449,7 @@ async fn download( } } -#[instrument(name = "Downloading file inline", skip(stream))] +#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store))] async fn do_download_inline( stream: impl Stream> + Unpin + 'static, repo: web::Data, @@ -476,7 +475,7 @@ async fn do_download_inline( }))) } -#[instrument(name = "Downloading file in background", skip(stream))] +#[tracing::instrument(name = "Downloading file in background", skip(stream, repo, store))] async fn do_download_backgrounded( stream: impl Stream> + Unpin + 'static, repo: web::Data, @@ -504,7 +503,7 @@ async fn do_download_backgrounded( } /// Delete aliases and files -#[instrument(name = "Deleting file", skip(repo))] +#[tracing::instrument(name = "Deleting file", skip(repo))] async fn delete( repo: web::Data, path_entries: web::Path<(String, String)>, @@ -560,7 +559,7 @@ fn prepare_process( Ok((format, alias, thumbnail_path, thumbnail_args)) } -#[instrument(name = "Fetching derived details", skip(repo))] +#[tracing::instrument(name = "Fetching derived details", skip(repo))] async fn process_details( query: web::Query, ext: web::Path, @@ -582,7 +581,7 @@ async fn process_details( } /// Process files -#[instrument(name = "Serving processed image", skip(repo))] +#[tracing::instrument(name = "Serving processed image", skip(repo, store))] async fn process( range: Option>, query: web::Query, @@ -605,19 +604,19 @@ async fn process( let details = repo.details(&identifier).await?; let details = if let Some(details) = details { - debug!("details exist"); + tracing::debug!("details exist"); details } else { - debug!("generating new details from {:?}", identifier); + tracing::debug!("generating new details from {:?}", identifier); let new_details = Details::from_store( (**store).clone(), identifier.clone(), Some(ValidInputType::from_format(format)), ) .await?; - debug!("storing details for {:?}", identifier); + tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; - debug!("stored"); + tracing::debug!("stored"); new_details }; @@ -671,7 +670,7 @@ async fn process( )) } -#[instrument(name = "Serving processed image headers", skip(repo))] +#[tracing::instrument(name = "Serving processed image headers", skip(repo, store))] async fn process_head( range: Option>, query: web::Query, @@ -693,19 +692,19 @@ async fn process_head( let details = repo.details(&identifier).await?; let details = if let Some(details) = details { - debug!("details exist"); + tracing::debug!("details exist"); details } else { - debug!("generating new details from {:?}", identifier); + tracing::debug!("generating new details from {:?}", identifier); let new_details = Details::from_store( (**store).clone(), identifier.clone(), Some(ValidInputType::from_format(format)), ) .await?; - debug!("storing details for {:?}", identifier); + tracing::debug!("storing details for {:?}", identifier); repo.relate_details(&identifier, &new_details).await?; - debug!("stored"); + tracing::debug!("stored"); new_details }; @@ -716,7 +715,7 @@ async fn process_head( } /// Process files -#[instrument(name = "Spawning image process", skip(repo))] +#[tracing::instrument(name = "Spawning image process", skip(repo))] async fn process_backgrounded( query: web::Query, ext: web::Path, @@ -740,7 +739,7 @@ async fn process_backgrounded( } /// Fetch file details -#[instrument(name = "Fetching details", skip(repo))] +#[tracing::instrument(name = "Fetching details", skip(repo, store))] async fn details( alias: web::Path>, repo: web::Data, @@ -754,7 +753,7 @@ async fn details( } /// Serve files -#[instrument(name = "Serving file", skip(repo))] +#[tracing::instrument(name = "Serving file", skip(repo, store))] async fn serve( range: Option>, alias: web::Path>, @@ -772,7 +771,7 @@ async fn serve( ranged_file_resp(&store, identifier, range, details).await } -#[instrument(name = "Serving file headers", skip(repo))] +#[tracing::instrument(name = "Serving file headers", skip(repo, store))] async fn serve_head( range: Option>, alias: web::Path>, @@ -916,7 +915,7 @@ fn srv_head( builder } -#[instrument(name = "Spawning variant cleanup", skip(repo))] +#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))] async fn clean_variants(repo: web::Data) -> Result { queue::cleanup_all_variants(&repo).await?; Ok(HttpResponse::NoContent().finish()) @@ -927,7 +926,7 @@ struct AliasQuery { alias: Serde, } -#[instrument(name = "Purging file", skip(repo))] +#[tracing::instrument(name = "Purging file", skip(repo))] async fn purge( query: web::Query, repo: web::Data, @@ -944,7 +943,7 @@ async fn purge( }))) } -#[instrument(name = "Fetching aliases", skip(repo))] +#[tracing::instrument(name = "Fetching aliases", skip(repo))] async fn aliases( query: web::Query, repo: web::Data, @@ -958,7 +957,7 @@ async fn aliases( }))) } -#[instrument(name = "Fetching identifier", skip(repo))] +#[tracing::instrument(name = "Fetching identifier", skip(repo))] async fn identifier( query: web::Query, repo: web::Data, diff --git a/src/process.rs b/src/process.rs index 3ba4bc1..4fbf452 100644 --- a/src/process.rs +++ b/src/process.rs @@ -55,7 +55,7 @@ impl Process { }) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] pub(crate) async fn wait(mut self) -> std::io::Result<()> { let status = self.child.wait().await?; if !status.success() { @@ -99,7 +99,7 @@ impl Process { }) } - #[tracing::instrument(skip(f))] + #[tracing::instrument(level = "trace", skip_all)] fn spawn_fn(mut self, f: F) -> impl AsyncRead + Unpin where F: FnOnce(ChildStdin) -> Fut + 'static, diff --git a/src/queue.rs b/src/queue.rs index ddbab9f..9890252 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -13,23 +13,48 @@ use tracing::Instrument; mod cleanup; mod process; +#[derive(Debug)] +struct Base64Bytes(Vec); + +impl serde::Serialize for Base64Bytes { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let s = base64::encode(&self.0); + s.serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for Base64Bytes { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s: String = serde::Deserialize::deserialize(deserializer)?; + base64::decode(s) + .map(Base64Bytes) + .map_err(|e| serde::de::Error::custom(e.to_string())) + } +} + const CLEANUP_QUEUE: &str = "cleanup"; const PROCESS_QUEUE: &str = "process"; #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Cleanup { Hash { - hash: Vec, + hash: Base64Bytes, }, Identifier { - identifier: Vec, + identifier: Base64Bytes, }, Alias { alias: Serde, token: Serde, }, Variant { - hash: Vec, + hash: Base64Bytes, }, AllVariants, } @@ -37,7 +62,7 @@ enum Cleanup { #[derive(Debug, serde::Deserialize, serde::Serialize)] enum Process { Ingest { - identifier: Vec, + identifier: Base64Bytes, upload_id: Serde, declared_alias: Option>, should_validate: bool, @@ -66,7 +91,7 @@ pub(crate) async fn cleanup_alias( pub(crate) async fn cleanup_hash(repo: &R, hash: R::Bytes) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Hash { - hash: hash.as_ref().to_vec(), + hash: Base64Bytes(hash.as_ref().to_vec()), })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) @@ -77,7 +102,7 @@ pub(crate) async fn cleanup_identifier( identifier: I, ) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Identifier { - identifier: identifier.to_bytes()?, + identifier: Base64Bytes(identifier.to_bytes()?), })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) @@ -85,7 +110,7 @@ pub(crate) async fn cleanup_identifier( async fn cleanup_variants(repo: &R, hash: R::Bytes) -> Result<(), Error> { let job = serde_json::to_vec(&Cleanup::Variant { - hash: hash.as_ref().to_vec(), + hash: Base64Bytes(hash.as_ref().to_vec()), })?; repo.push(CLEANUP_QUEUE, job.into()).await?; Ok(()) @@ -106,7 +131,7 @@ pub(crate) async fn queue_ingest( is_cached: bool, ) -> Result<(), Error> { let job = serde_json::to_vec(&Process::Ingest { - identifier, + identifier: Base64Bytes(identifier), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), should_validate, @@ -188,7 +213,7 @@ where loop { let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; - let span = tracing::info_span!("Running Job", worker_id = ?worker_id, job = ?String::from_utf8_lossy(bytes.as_ref())); + let span = tracing::info_span!("Running Job", worker_id = ?worker_id); span.in_scope(|| (callback)(repo, store, bytes.as_ref())) .instrument(span) diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index b0a588d..891b326 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -1,6 +1,6 @@ use crate::{ error::{Error, UploadError}, - queue::{Cleanup, LocalBoxFuture}, + queue::{Base64Bytes, Cleanup, LocalBoxFuture}, repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo}, serde_str::Serde, store::{Identifier, Store}, @@ -19,9 +19,11 @@ where Box::pin(async move { match serde_json::from_slice(job) { Ok(job) => match job { - Cleanup::Hash { hash: in_hash } => hash::(repo, in_hash).await?, + Cleanup::Hash { + hash: Base64Bytes(in_hash), + } => hash::(repo, in_hash).await?, Cleanup::Identifier { - identifier: in_identifier, + identifier: Base64Bytes(in_identifier), } => identifier(repo, store, in_identifier).await?, Cleanup::Alias { alias: stored_alias, @@ -34,7 +36,9 @@ where ) .await? } - Cleanup::Variant { hash } => variant::(repo, hash).await?, + Cleanup::Variant { + hash: Base64Bytes(hash), + } => variant::(repo, hash).await?, Cleanup::AllVariants => all_variants::(repo).await?, }, Err(e) => { @@ -46,7 +50,7 @@ where }) } -#[tracing::instrument(skip(repo, store))] +#[tracing::instrument(skip_all)] async fn identifier(repo: &R, store: &S, identifier: Vec) -> Result<(), Error> where R: FullRepo, @@ -76,7 +80,7 @@ where Ok(()) } -#[tracing::instrument(skip(repo))] +#[tracing::instrument(skip_all)] async fn hash(repo: &R, hash: Vec) -> Result<(), Error> where R: FullRepo, @@ -113,6 +117,7 @@ where Ok(()) } +#[tracing::instrument(skip_all)] async fn alias(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error> where R: FullRepo, diff --git a/src/queue/process.rs b/src/queue/process.rs index 7b7ae42..ab854b9 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -2,7 +2,7 @@ use crate::{ config::ImageFormat, error::Error, ingest::Session, - queue::{LocalBoxFuture, Process}, + queue::{Base64Bytes, LocalBoxFuture, Process}, repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult}, serde_str::Serde, store::{Identifier, Store}, @@ -23,7 +23,7 @@ where match serde_json::from_slice(job) { Ok(job) => match job { Process::Ingest { - identifier, + identifier: Base64Bytes(identifier), upload_id, declared_alias, should_validate, @@ -66,7 +66,7 @@ where }) } -#[tracing::instrument(skip(repo, store))] +#[tracing::instrument(skip_all)] async fn process_ingest( repo: &R, store: &S, @@ -130,6 +130,7 @@ where Ok(()) } +#[tracing::instrument(skip_all)] async fn generate( repo: &R, store: &S, diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 458dd9e..62f542f 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -281,7 +281,7 @@ impl CachedRepo for SledRepo { #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn create(&self, upload_id: UploadId) -> Result<(), Error> { b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1")); Ok(()) @@ -320,13 +320,13 @@ impl UploadRepo for SledRepo { Err(UploadError::Canceled.into()) } - #[tracing::instrument(skip(self))] + #[tracing::instrument(level = "trace", skip(self))] async fn claim(&self, upload_id: UploadId) -> Result<(), Error> { b!(self.uploads, uploads.remove(upload_id.as_bytes())); Ok(()) } - #[tracing::instrument(skip(self, result))] + #[tracing::instrument(level = "trace", skip(self, result))] async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> { let result: InnerUploadResult = result.into(); let result = serde_json::to_vec(&result)?; @@ -384,7 +384,7 @@ impl QueueRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, job), fields(worker_id = %String::from_utf8_lossy(&job)))] + #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> { let id = self.db.generate_id()?; let mut key = queue_name.as_bytes().to_vec(); @@ -597,7 +597,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] + #[tracing::instrument(skip_all)] async fn aliases(&self, hash: Self::Bytes) -> Result, Error> { let v = b!(self.hash_aliases, { Ok(hash_aliases diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 7549782..e0d6257 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -170,7 +170,7 @@ impl Store for ObjectStore { self.save_stream(ReaderStream::new(reader)).await } - #[tracing::instrument(skip(stream))] + #[tracing::instrument(skip_all)] async fn save_stream(&self, mut stream: S) -> Result where S: Stream> + Unpin + 'static, @@ -295,7 +295,7 @@ impl Store for ObjectStore { Ok(object_id) } - #[tracing::instrument(skip(bytes))] + #[tracing::instrument(skip_all)] async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; @@ -308,7 +308,7 @@ impl Store for ObjectStore { Ok(object_id) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn to_stream( &self, identifier: &Self::Identifier, @@ -328,7 +328,7 @@ impl Store for ObjectStore { Ok(Box::pin(response.map_err(payload_to_io_error))) } - #[tracing::instrument(skip(writer))] + #[tracing::instrument(skip(self, writer))] async fn read_into( &self, identifier: &Self::Identifier, @@ -359,7 +359,7 @@ impl Store for ObjectStore { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn len(&self, identifier: &Self::Identifier) -> Result { let response = self .head_object_request(identifier) @@ -383,7 +383,7 @@ impl Store for ObjectStore { Ok(length) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { let response = self.delete_object_request(identifier).send().await?; diff --git a/src/validate.rs b/src/validate.rs index ec1f4c2..0263142 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -63,7 +63,7 @@ pub(crate) async fn validate_bytes( } Ok(( ValidInputType::from_video_codec(video_codec), - Either::right(Either::left( + Either::right(Either::left(Either::left( crate::ffmpeg::trancsocde_bytes( bytes, video_format, @@ -72,20 +72,26 @@ pub(crate) async fn validate_bytes( audio_codec, ) .await?, - )), + ))), )) } (FileFormat::Image(image_format), Some(format)) if image_format != format => Ok(( ValidInputType::from_format(format), - Either::right(Either::right(Either::left( + Either::right(Either::left(Either::right( crate::magick::convert_bytes_read(bytes, format)?, ))), )), + (FileFormat::Image(ImageFormat::Webp), _) => Ok(( + ValidInputType::Webp, + Either::right(Either::left(Either::right( + crate::magick::convert_bytes_read(bytes, ImageFormat::Webp)?, + ))), + )), (FileFormat::Image(image_format), _) => Ok(( ValidInputType::from_format(image_format), - Either::right(Either::right(Either::right( - crate::exiftool::clear_metadata_bytes_read(bytes)?, - ))), + Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read( + bytes, + )?)), )), } }