From 85f635602587bfcb9f706b258aff00cd7a2221d1 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 3 Feb 2024 19:50:00 -0600 Subject: [PATCH] Fix images with trailing bytes failing to upload --- src/process.rs | 28 ++++++++++++++++++++-------- src/validate.rs | 6 +++--- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/process.rs b/src/process.rs index 270903a..bfcc8e0 100644 --- a/src/process.rs +++ b/src/process.rs @@ -126,8 +126,8 @@ pub(crate) enum ProcessError { #[error("Failed to cleanup for command {0}")] Cleanup(Arc, #[source] std::io::Error), - #[error("Unknown process error")] - Other(#[source] std::io::Error), + #[error("Unknown process error for command {0}")] + Other(Arc, #[source] std::io::Error), } impl ProcessError { @@ -135,7 +135,7 @@ impl ProcessError { match self { Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, - Self::LimitReached | Self::Read(_, _) | Self::Cleanup(_, _) | Self::Other(_) => { + Self::LimitReached | Self::Read(_, _) | Self::Cleanup(_, _) | Self::Other(_, _) => { ErrorCode::COMMAND_ERROR } Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, @@ -180,7 +180,7 @@ impl Process { Err(ProcessError::PermissionDenied(command)) } std::io::ErrorKind::WouldBlock => Err(ProcessError::LimitReached), - _ => Err(ProcessError::Other(e)), + _ => Err(ProcessError::Other(command, e)), }, } } @@ -223,7 +223,7 @@ impl Process { Ok(()) } Ok(Ok(status)) => Err(ProcessError::Status(command, status)), - Ok(Err(e)) => Err(ProcessError::Other(e)), + Ok(Err(e)) => Err(ProcessError::Other(command, e)), Err(_) => { let _ = child.kill().await; Err(ProcessError::Timeout(command)) @@ -234,7 +234,16 @@ impl Process { pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead { self.spawn_fn(move |mut stdin| { let mut input = input; - async move { stdin.write_all_buf(&mut input).await } + async move { + match stdin.write_all_buf(&mut input).await { + Ok(()) => Ok(()), + // BrokenPipe means we finished reading from Stdout, so we don't need to write + // to stdin. We'll still error out if the command failed so treat this as a + // success + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => Ok(()), + Err(e) => Err(e), + } + } }) } @@ -275,9 +284,12 @@ impl Process { Ok(()) } Ok(Ok(status)) => Err(ProcessError::Status(command2, status)), - Ok(Err(e)) => Err(ProcessError::Other(e)), + Ok(Err(e)) => Err(ProcessError::Other(command2, e)), Err(_) => { - child.kill().await.map_err(ProcessError::Other)?; + child + .kill() + .await + .map_err(|e| ProcessError::Other(command2.clone(), e))?; Err(ProcessError::Timeout(command2)) } } diff --git a/src/validate.rs b/src/validate.rs index b69b5df..ee857bd 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -92,7 +92,7 @@ pub(crate) async fn validate_bytes( } } -#[tracing::instrument(skip(state, bytes))] +#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_image( state: &State, bytes: Bytes, @@ -157,7 +157,7 @@ fn validate_animation( Ok(()) } -#[tracing::instrument(skip(state, bytes))] +#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_animation( state: &State, bytes: Bytes, @@ -215,7 +215,7 @@ fn validate_video( Ok(()) } -#[tracing::instrument(skip(state, bytes))] +#[tracing::instrument(skip(state, bytes), fields(len = bytes.len()))] async fn process_video( state: &State, bytes: Bytes,