Fix webp metadata stripping, more tracing cleanup
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2022-10-01 22:47:52 -05:00
parent 718f09c43a
commit 669b3fb86f
13 changed files with 195 additions and 157 deletions

View file

@ -71,38 +71,41 @@ where
R: FullRepo + 'static,
S: Store,
{
#[tracing::instrument(name = "Drop Backgrounded", skip(self), fields(identifier = ?self.identifier, upload_id = ?self.upload_id))]
fn drop(&mut self) {
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
if self.identifier.is_some() || self.upload_id.is_some() {
let cleanup_parent_span =
tracing::info_span!(parent: None, "Dropped backgrounded cleanup");
cleanup_parent_span.follows_from(Span::current());
let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Identifier", identifier = ?identifier);
cleanup_span.follows_from(Span::current());
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
}
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Backgrounded cleanup Identifier", identifier = ?identifier);
if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
}
let cleanup_span = tracing::info_span!(parent: None, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
cleanup_span.follows_from(Span::current());
if let Some(upload_id) = self.upload_id {
let repo = self.repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = repo.claim(upload_id).await;
}
.instrument(cleanup_span),
)
});
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Backgrounded cleanup Upload ID", upload_id = ?upload_id);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = repo.claim(upload_id).await;
}
.instrument(cleanup_span),
)
});
}
}
}
}

View file

@ -51,7 +51,7 @@ where
vacant.insert(Vec::new());
let span = tracing::info_span!(
"Processing image",
hash = &tracing::field::debug(&hash),
hash = &tracing::field::debug(&hex::encode(&hash)),
path = &tracing::field::debug(&path),
completed = &tracing::field::Empty,
);
@ -64,7 +64,7 @@ where
occupied.get_mut().push(tx);
let span = tracing::info_span!(
"Waiting for processed image",
hash = &tracing::field::debug(&hash),
hash = &tracing::field::debug(&hex::encode(&hash)),
path = &tracing::field::debug(&path),
);
(Some(rx), span)

View file

@ -337,7 +337,7 @@ pub(crate) async fn trancsocde_bytes(
Ok(Box::pin(clean_reader))
}
#[tracing::instrument]
#[tracing::instrument(skip(store))]
pub(crate) async fn thumbnail<S: Store>(
store: S,
from: S::Identifier,

View file

@ -13,7 +13,7 @@ use tokio::io::AsyncReadExt;
use tracing::Instrument;
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(hash))]
#[tracing::instrument(skip(repo, store, hash))]
pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,
@ -44,7 +44,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(hash))]
#[tracing::instrument(skip(repo, store, hash))]
async fn process<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,

View file

@ -41,7 +41,7 @@ where
Ok(buf.into_bytes())
}
#[tracing::instrument(skip(stream))]
#[tracing::instrument(skip(repo, store, stream))]
pub(crate) async fn ingest<R, S>(
repo: &R,
store: &S,
@ -58,7 +58,7 @@ where
let bytes = aggregate(stream).await?;
tracing::debug!("Validating bytes");
tracing::trace!("Validating bytes");
let (input_type, validated_reader) = crate::validate::validate_bytes(
bytes,
CONFIG.media.format,
@ -114,7 +114,7 @@ where
Ok(session)
}
#[tracing::instrument]
#[tracing::instrument(level = "trace", skip_all)]
async fn save_upload<R, S>(
repo: &R,
store: &S,
@ -151,27 +151,27 @@ where
self.alias.as_ref()
}
#[tracing::instrument]
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
tracing::debug!("Generating delete token");
tracing::trace!("Generating delete token");
let delete_token = DeleteToken::generate();
tracing::debug!("Saving delete token");
tracing::trace!("Saving delete token");
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
if res.is_err() {
let delete_token = self.repo.delete_token(&alias).await?;
tracing::debug!("Returning existing delete token, {:?}", delete_token);
tracing::trace!("Returning existing delete token, {:?}", delete_token);
return Ok(delete_token);
}
tracing::debug!("Returning new delete token, {:?}", delete_token);
tracing::trace!("Returning new delete token, {:?}", delete_token);
Ok(delete_token)
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash))]
async fn add_existing_alias(
&mut self,
hash: &[u8],
@ -194,15 +194,13 @@ where
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, hash))]
async fn create_alias(
&mut self,
hash: &[u8],
input_type: ValidInputType,
is_cached: bool,
) -> Result<(), Error> {
tracing::debug!("Alias gen loop");
loop {
let alias = Alias::generate(input_type.as_ext().to_string());
@ -219,7 +217,7 @@ where
return Ok(());
}
tracing::debug!("Alias exists, regenerating");
tracing::trace!("Alias exists, regenerating");
}
}
}
@ -229,61 +227,62 @@ where
R: FullRepo + 'static,
S: Store,
{
#[tracing::instrument(name = "Drop Session", skip(self), fields(hash = ?self.hash, alias = ?self.alias, identifier = ?self.identifier))]
fn drop(&mut self) {
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
cleanup_parent_span.follows_from(Span::current());
if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() {
let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup");
cleanup_parent_span.follows_from(Span::current());
if let Some(hash) = self.hash.take() {
let repo = self.repo.clone();
if let Some(hash) = self.hash.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash);
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup hash", hash = ?hash);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
}
.instrument(cleanup_span),
)
});
}
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_hash(&repo, hash.into()).await;
}
.instrument(cleanup_span),
)
});
}
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias);
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span.clone(), "Session cleanup alias", alias = ?alias);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Ok(token) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Ok(token) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
}
}
}
.instrument(cleanup_span),
)
});
}
.instrument(cleanup_span),
)
});
}
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
if let Some(identifier) = self.identifier.take() {
let repo = self.repo.clone();
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier);
let cleanup_span = tracing::info_span!(parent: cleanup_parent_span, "Session cleanup identifier", identifier = ?identifier);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
let _ = crate::queue::cleanup_identifier(&repo, identifier).await;
}
.instrument(cleanup_span),
)
});
}
}
}
}

View file

@ -18,7 +18,6 @@ use std::{
time::{Duration, SystemTime},
};
use tokio::sync::Semaphore;
use tracing::{debug, info, instrument};
use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_futures::Instrument;
@ -96,15 +95,15 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
let details = repo.details(&identifier).await?;
if let Some(details) = details {
debug!("details exist");
tracing::debug!("details exist");
Ok(details)
} else {
debug!("generating new details from {:?}", identifier);
tracing::debug!("generating new details from {:?}", identifier);
let hint = details_hint(alias);
let new_details = Details::from_store(store.clone(), identifier.clone(), hint).await?;
debug!("storing details for {:?}", identifier);
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
debug!("stored");
tracing::debug!("stored");
Ok(new_details)
}
}
@ -217,7 +216,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
}
/// Handle responding to succesful uploads
#[instrument(name = "Uploaded files", skip(value))]
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))]
async fn upload<R: FullRepo, S: Store + 'static>(
Multipart(Upload(value)): Multipart<Upload<R, S>>,
repo: web::Data<R>,
@ -227,7 +226,7 @@ async fn upload<R: FullRepo, S: Store + 'static>(
}
/// Handle responding to succesful uploads
#[instrument(name = "Imported files", skip(value))]
#[tracing::instrument(name = "Imported files", skip(value, repo, store))]
async fn import<R: FullRepo, S: Store + 'static>(
Multipart(Import(value)): Multipart<Import<R, S>>,
repo: web::Data<R>,
@ -237,7 +236,7 @@ async fn import<R: FullRepo, S: Store + 'static>(
}
/// Handle responding to succesful uploads
#[instrument(name = "Uploaded files", skip(value))]
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))]
async fn handle_upload<R: FullRepo, S: Store + 'static>(
value: Value<Session<R, S>>,
repo: web::Data<R>,
@ -257,7 +256,7 @@ async fn handle_upload<R: FullRepo, S: Store + 'static>(
for image in &images {
if let Some(alias) = image.result.alias() {
info!("Uploaded {} as {:?}", image.filename, alias);
tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = image.result.delete_token().await?;
let details = ensure_details(&repo, &store, alias).await?;
@ -329,7 +328,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
}
}
#[instrument(name = "Uploaded files", skip(value))]
#[tracing::instrument(name = "Uploaded files", skip(value, repo))]
async fn upload_backgrounded<R: FullRepo, S: Store>(
Multipart(BackgroundedUpload(value)): Multipart<BackgroundedUpload<R, S>>,
repo: web::Data<R>,
@ -377,7 +376,7 @@ struct ClaimQuery {
}
/// Claim a backgrounded upload
#[instrument(name = "Waiting on upload", skip(repo))]
#[tracing::instrument(name = "Waiting on upload", skip_all)]
async fn claim_upload<R: FullRepo, S: Store + 'static>(
repo: web::Data<R>,
store: web::Data<S>,
@ -426,7 +425,7 @@ struct UrlQuery {
}
/// download an image from a URL
#[instrument(name = "Downloading file", skip(client, repo))]
#[tracing::instrument(name = "Downloading file", skip(client, repo, store))]
async fn download<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
repo: web::Data<R>,
@ -450,7 +449,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}
}
#[instrument(name = "Downloading file inline", skip(stream))]
#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store))]
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
@ -476,7 +475,7 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
})))
}
#[instrument(name = "Downloading file in background", skip(stream))]
#[tracing::instrument(name = "Downloading file in background", skip(stream, repo, store))]
async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
@ -504,7 +503,7 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
}
/// Delete aliases and files
#[instrument(name = "Deleting file", skip(repo))]
#[tracing::instrument(name = "Deleting file", skip(repo))]
async fn delete<R: FullRepo>(
repo: web::Data<R>,
path_entries: web::Path<(String, String)>,
@ -560,7 +559,7 @@ fn prepare_process(
Ok((format, alias, thumbnail_path, thumbnail_args))
}
#[instrument(name = "Fetching derived details", skip(repo))]
#[tracing::instrument(name = "Fetching derived details", skip(repo))]
async fn process_details<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
@ -582,7 +581,7 @@ async fn process_details<R: FullRepo, S: Store>(
}
/// Process files
#[instrument(name = "Serving processed image", skip(repo))]
#[tracing::instrument(name = "Serving processed image", skip(repo, store))]
async fn process<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>,
@ -605,19 +604,19 @@ async fn process<R: FullRepo, S: Store + 'static>(
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
debug!("details exist");
tracing::debug!("details exist");
details
} else {
debug!("generating new details from {:?}", identifier);
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(
(**store).clone(),
identifier.clone(),
Some(ValidInputType::from_format(format)),
)
.await?;
debug!("storing details for {:?}", identifier);
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
debug!("stored");
tracing::debug!("stored");
new_details
};
@ -671,7 +670,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
))
}
#[instrument(name = "Serving processed image headers", skip(repo))]
#[tracing::instrument(name = "Serving processed image headers", skip(repo, store))]
async fn process_head<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>,
@ -693,19 +692,19 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
let details = repo.details(&identifier).await?;
let details = if let Some(details) = details {
debug!("details exist");
tracing::debug!("details exist");
details
} else {
debug!("generating new details from {:?}", identifier);
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(
(**store).clone(),
identifier.clone(),
Some(ValidInputType::from_format(format)),
)
.await?;
debug!("storing details for {:?}", identifier);
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
debug!("stored");
tracing::debug!("stored");
new_details
};
@ -716,7 +715,7 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
}
/// Process files
#[instrument(name = "Spawning image process", skip(repo))]
#[tracing::instrument(name = "Spawning image process", skip(repo))]
async fn process_backgrounded<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
@ -740,7 +739,7 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
}
/// Fetch file details
#[instrument(name = "Fetching details", skip(repo))]
#[tracing::instrument(name = "Fetching details", skip(repo, store))]
async fn details<R: FullRepo, S: Store + 'static>(
alias: web::Path<Serde<Alias>>,
repo: web::Data<R>,
@ -754,7 +753,7 @@ async fn details<R: FullRepo, S: Store + 'static>(
}
/// Serve files
#[instrument(name = "Serving file", skip(repo))]
#[tracing::instrument(name = "Serving file", skip(repo, store))]
async fn serve<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>,
@ -772,7 +771,7 @@ async fn serve<R: FullRepo, S: Store + 'static>(
ranged_file_resp(&store, identifier, range, details).await
}
#[instrument(name = "Serving file headers", skip(repo))]
#[tracing::instrument(name = "Serving file headers", skip(repo, store))]
async fn serve_head<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>,
@ -916,7 +915,7 @@ fn srv_head(
builder
}
#[instrument(name = "Spawning variant cleanup", skip(repo))]
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))]
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> {
queue::cleanup_all_variants(&repo).await?;
Ok(HttpResponse::NoContent().finish())
@ -927,7 +926,7 @@ struct AliasQuery {
alias: Serde<Alias>,
}
#[instrument(name = "Purging file", skip(repo))]
#[tracing::instrument(name = "Purging file", skip(repo))]
async fn purge<R: FullRepo>(
query: web::Query<AliasQuery>,
repo: web::Data<R>,
@ -944,7 +943,7 @@ async fn purge<R: FullRepo>(
})))
}
#[instrument(name = "Fetching aliases", skip(repo))]
#[tracing::instrument(name = "Fetching aliases", skip(repo))]
async fn aliases<R: FullRepo>(
query: web::Query<AliasQuery>,
repo: web::Data<R>,
@ -958,7 +957,7 @@ async fn aliases<R: FullRepo>(
})))
}
#[instrument(name = "Fetching identifier", skip(repo))]
#[tracing::instrument(name = "Fetching identifier", skip(repo))]
async fn identifier<R: FullRepo, S: Store>(
query: web::Query<AliasQuery>,
repo: web::Data<R>,

View file

@ -55,7 +55,7 @@ impl Process {
})
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
pub(crate) async fn wait(mut self) -> std::io::Result<()> {
let status = self.child.wait().await?;
if !status.success() {
@ -99,7 +99,7 @@ impl Process {
})
}
#[tracing::instrument(skip(f))]
#[tracing::instrument(level = "trace", skip_all)]
fn spawn_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin
where
F: FnOnce(ChildStdin) -> Fut + 'static,

View file

@ -13,23 +13,48 @@ use tracing::Instrument;
mod cleanup;
mod process;
#[derive(Debug)]
struct Base64Bytes(Vec<u8>);
impl serde::Serialize for Base64Bytes {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = base64::encode(&self.0);
s.serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for Base64Bytes {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer)?;
base64::decode(s)
.map(Base64Bytes)
.map_err(|e| serde::de::Error::custom(e.to_string()))
}
}
const CLEANUP_QUEUE: &str = "cleanup";
const PROCESS_QUEUE: &str = "process";
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Cleanup {
Hash {
hash: Vec<u8>,
hash: Base64Bytes,
},
Identifier {
identifier: Vec<u8>,
identifier: Base64Bytes,
},
Alias {
alias: Serde<Alias>,
token: Serde<DeleteToken>,
},
Variant {
hash: Vec<u8>,
hash: Base64Bytes,
},
AllVariants,
}
@ -37,7 +62,7 @@ enum Cleanup {
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Process {
Ingest {
identifier: Vec<u8>,
identifier: Base64Bytes,
upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>,
should_validate: bool,
@ -66,7 +91,7 @@ pub(crate) async fn cleanup_alias<R: QueueRepo>(
pub(crate) async fn cleanup_hash<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Hash {
hash: hash.as_ref().to_vec(),
hash: Base64Bytes(hash.as_ref().to_vec()),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
@ -77,7 +102,7 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
identifier: I,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Identifier {
identifier: identifier.to_bytes()?,
identifier: Base64Bytes(identifier.to_bytes()?),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
@ -85,7 +110,7 @@ pub(crate) async fn cleanup_identifier<R: QueueRepo, I: Identifier>(
async fn cleanup_variants<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Cleanup::Variant {
hash: hash.as_ref().to_vec(),
hash: Base64Bytes(hash.as_ref().to_vec()),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
@ -106,7 +131,7 @@ pub(crate) async fn queue_ingest<R: QueueRepo>(
is_cached: bool,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Ingest {
identifier,
identifier: Base64Bytes(identifier),
declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id),
should_validate,
@ -188,7 +213,7 @@ where
loop {
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
let span = tracing::info_span!("Running Job", worker_id = ?worker_id, job = ?String::from_utf8_lossy(bytes.as_ref()));
let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
span.in_scope(|| (callback)(repo, store, bytes.as_ref()))
.instrument(span)

View file

@ -1,6 +1,6 @@
use crate::{
error::{Error, UploadError},
queue::{Cleanup, LocalBoxFuture},
queue::{Base64Bytes, Cleanup, LocalBoxFuture},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo, IdentifierRepo},
serde_str::Serde,
store::{Identifier, Store},
@ -19,9 +19,11 @@ where
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Cleanup::Hash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::Hash {
hash: Base64Bytes(in_hash),
} => hash::<R, S>(repo, in_hash).await?,
Cleanup::Identifier {
identifier: in_identifier,
identifier: Base64Bytes(in_identifier),
} => identifier(repo, store, in_identifier).await?,
Cleanup::Alias {
alias: stored_alias,
@ -34,7 +36,9 @@ where
)
.await?
}
Cleanup::Variant { hash } => variant::<R, S>(repo, hash).await?,
Cleanup::Variant {
hash: Base64Bytes(hash),
} => variant::<R, S>(repo, hash).await?,
Cleanup::AllVariants => all_variants::<R, S>(repo).await?,
},
Err(e) => {
@ -46,7 +50,7 @@ where
})
}
#[tracing::instrument(skip(repo, store))]
#[tracing::instrument(skip_all)]
async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: FullRepo,
@ -76,7 +80,7 @@ where
Ok(())
}
#[tracing::instrument(skip(repo))]
#[tracing::instrument(skip_all)]
async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: FullRepo,
@ -113,6 +117,7 @@ where
Ok(())
}
#[tracing::instrument(skip_all)]
async fn alias<R>(repo: &R, alias: Alias, token: DeleteToken) -> Result<(), Error>
where
R: FullRepo,

View file

@ -2,7 +2,7 @@ use crate::{
config::ImageFormat,
error::Error,
ingest::Session,
queue::{LocalBoxFuture, Process},
queue::{Base64Bytes, LocalBoxFuture, Process},
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
serde_str::Serde,
store::{Identifier, Store},
@ -23,7 +23,7 @@ where
match serde_json::from_slice(job) {
Ok(job) => match job {
Process::Ingest {
identifier,
identifier: Base64Bytes(identifier),
upload_id,
declared_alias,
should_validate,
@ -66,7 +66,7 @@ where
})
}
#[tracing::instrument(skip(repo, store))]
#[tracing::instrument(skip_all)]
async fn process_ingest<R, S>(
repo: &R,
store: &S,
@ -130,6 +130,7 @@ where
Ok(())
}
#[tracing::instrument(skip_all)]
async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,

View file

@ -281,7 +281,7 @@ impl CachedRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, upload_id: UploadId) -> Result<(), Error> {
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
Ok(())
@ -320,13 +320,13 @@ impl UploadRepo for SledRepo {
Err(UploadError::Canceled.into())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "trace", skip(self))]
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
b!(self.uploads, uploads.remove(upload_id.as_bytes()));
Ok(())
}
#[tracing::instrument(skip(self, result))]
#[tracing::instrument(level = "trace", skip(self, result))]
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
let result: InnerUploadResult = result.into();
let result = serde_json::to_vec(&result)?;
@ -384,7 +384,7 @@ impl QueueRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, job), fields(worker_id = %String::from_utf8_lossy(&job)))]
#[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))]
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> {
let id = self.db.generate_id()?;
let mut key = queue_name.as_bytes().to_vec();
@ -597,7 +597,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
#[tracing::instrument(skip_all)]
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases

View file

@ -170,7 +170,7 @@ impl Store for ObjectStore {
self.save_stream(ReaderStream::new(reader)).await
}
#[tracing::instrument(skip(stream))]
#[tracing::instrument(skip_all)]
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, Error>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
@ -295,7 +295,7 @@ impl Store for ObjectStore {
Ok(object_id)
}
#[tracing::instrument(skip(bytes))]
#[tracing::instrument(skip_all)]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
let (req, object_id) = self.put_object_request().await?;
@ -308,7 +308,7 @@ impl Store for ObjectStore {
Ok(object_id)
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn to_stream(
&self,
identifier: &Self::Identifier,
@ -328,7 +328,7 @@ impl Store for ObjectStore {
Ok(Box::pin(response.map_err(payload_to_io_error)))
}
#[tracing::instrument(skip(writer))]
#[tracing::instrument(skip(self, writer))]
async fn read_into<Writer>(
&self,
identifier: &Self::Identifier,
@ -359,7 +359,7 @@ impl Store for ObjectStore {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
let response = self
.head_object_request(identifier)
@ -383,7 +383,7 @@ impl Store for ObjectStore {
Ok(length)
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let response = self.delete_object_request(identifier).send().await?;

View file

@ -63,7 +63,7 @@ pub(crate) async fn validate_bytes(
}
Ok((
ValidInputType::from_video_codec(video_codec),
Either::right(Either::left(
Either::right(Either::left(Either::left(
crate::ffmpeg::trancsocde_bytes(
bytes,
video_format,
@ -72,20 +72,26 @@ pub(crate) async fn validate_bytes(
audio_codec,
)
.await?,
)),
))),
))
}
(FileFormat::Image(image_format), Some(format)) if image_format != format => Ok((
ValidInputType::from_format(format),
Either::right(Either::right(Either::left(
Either::right(Either::left(Either::right(
crate::magick::convert_bytes_read(bytes, format)?,
))),
)),
(FileFormat::Image(ImageFormat::Webp), _) => Ok((
ValidInputType::Webp,
Either::right(Either::left(Either::right(
crate::magick::convert_bytes_read(bytes, ImageFormat::Webp)?,
))),
)),
(FileFormat::Image(image_format), _) => Ok((
ValidInputType::from_format(image_format),
Either::right(Either::right(Either::right(
crate::exiftool::clear_metadata_bytes_read(bytes)?,
))),
Either::right(Either::right(crate::exiftool::clear_metadata_bytes_read(
bytes,
)?)),
)),
}
}