Do more cleanup inline

This commit is contained in:
asonix 2023-12-23 11:58:20 -06:00
parent e8380c31c1
commit 6fa79b9188
12 changed files with 145 additions and 79 deletions

View file

@ -43,7 +43,7 @@ pub(super) async fn check_reorient(
async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> { async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
.bytes_read(input) .bytes_read(input)
.to_string() .into_string()
.await?; .await?;
Ok(!buf.is_empty()) Ok(!buf.is_empty())

View file

@ -229,10 +229,10 @@ where
timeout, timeout,
)? )?
.read() .read()
.to_vec() .into_vec()
.await?; .await?;
drop(input_file); input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; let output: FfMpegDiscovery = serde_json::from_slice(&output).map_err(FfMpegError::Json)?;
@ -273,7 +273,7 @@ async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegErro
timeout, timeout,
)? )?
.read() .read()
.to_vec() .into_vec()
.await?; .await?;
let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?; let formats: PixelFormatOutput = serde_json::from_slice(&output).map_err(FfMpegError::Json)?;

View file

@ -129,11 +129,14 @@ where
timeout, timeout,
)? )?
.read() .read()
.to_string() .into_string()
.await?; .await?;
drop(input_file); input_file.cleanup().await.map_err(MagickError::Cleanup)?;
drop(temporary_path); temporary_path
.cleanup()
.await
.map_err(MagickError::Cleanup)?;
if output.is_empty() { if output.is_empty() {
return Err(MagickError::Empty); return Err(MagickError::Empty);
@ -192,11 +195,14 @@ where
timeout, timeout,
)? )?
.read() .read()
.to_vec() .into_vec()
.await?; .await?;
drop(input_file); input_file.cleanup().await.map_err(MagickError::Cleanup)?;
drop(temporary_path); temporary_path
.cleanup()
.await
.map_err(MagickError::Cleanup)?;
if output.is_empty() { if output.is_empty() {
return Err(MagickError::Empty); return Err(MagickError::Empty);

View file

@ -42,7 +42,7 @@ impl ExifError {
pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> { pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> {
let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)? let buf = Process::run("exiftool", &["-n", "-Orientation", "-"], &[], timeout)?
.bytes_read(input) .bytes_read(input)
.to_string() .into_string()
.await?; .await?;
Ok(!buf.is_empty()) Ok(!buf.is_empty())

View file

@ -26,6 +26,9 @@ pub(crate) enum FfMpegError {
#[error("Error closing file")] #[error("Error closing file")]
CloseFile(#[source] std::io::Error), CloseFile(#[source] std::io::Error),
#[error("Error cleaning up after command")]
Cleanup(#[source] std::io::Error),
#[error("Error in store")] #[error("Error in store")]
Store(#[source] StoreError), Store(#[source] StoreError),
@ -53,6 +56,7 @@ impl FfMpegError {
| Self::CreateDir(_) | Self::CreateDir(_)
| Self::ReadFile(_) | Self::ReadFile(_)
| Self::OpenFile(_) | Self::OpenFile(_)
| Self::Cleanup(_)
| Self::CreateFile(_) | Self::CreateFile(_)
| Self::CloseFile(_) => ErrorCode::COMMAND_ERROR, | Self::CloseFile(_) => ErrorCode::COMMAND_ERROR,
} }

View file

@ -135,10 +135,11 @@ async fn process<S: Store + 'static>(
ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), ProcessableFormat::Animation(format) => config.media.animation.quality_for(format),
}; };
let vec = crate::magick::process_image_store_read( let stream = store.to_stream(&identifier, None, None).await?;
let vec = crate::magick::process_image_stream_read(
tmp_dir, tmp_dir,
store, stream,
&identifier,
thumbnail_args, thumbnail_args,
input_format, input_format,
format, format,
@ -146,7 +147,7 @@ async fn process<S: Store + 'static>(
config.media.process_timeout, config.media.process_timeout,
) )
.await? .await?
.to_vec() .into_vec()
.instrument(tracing::info_span!("Reading processed image to vec")) .instrument(tracing::info_span!("Reading processed image to vec"))
.await?; .await?;
@ -216,10 +217,11 @@ where
{ {
let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp); let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp);
let stream = store.to_stream(&identifier, None, None).await?;
let reader = magick::thumbnail( let reader = magick::thumbnail(
tmp_dir, tmp_dir,
store, stream,
&identifier,
processable_format, processable_format,
ProcessableFormat::Image(thumbnail_format), ProcessableFormat::Image(thumbnail_format),
media.image.quality_for(thumbnail_format), media.image.quality_for(thumbnail_format),

View file

@ -103,7 +103,7 @@ pub(super) async fn thumbnail<S: Store>(
)?; )?;
process.wait().await?; process.wait().await?;
drop(input_file); input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
let tmp_two = crate::file::File::open(&output_file) let tmp_two = crate::file::File::open(&output_file)
.await .await

View file

@ -1,10 +1,12 @@
use std::{ffi::OsStr, sync::Arc}; use std::ffi::OsStr;
use actix_web::web::Bytes;
use crate::{ use crate::{
formats::ProcessableFormat, formats::ProcessableFormat,
magick::{MagickError, MAGICK_TEMPORARY_PATH}, magick::{MagickError, MAGICK_TEMPORARY_PATH},
process::{Process, ProcessRead}, process::{Process, ProcessRead},
store::Store, stream::LocalBoxStream,
tmp_file::TmpDir, tmp_file::TmpDir,
}; };
@ -67,20 +69,14 @@ where
Ok(reader) Ok(reader)
} }
pub(super) async fn thumbnail<S: Store + 'static>( pub(super) async fn thumbnail(
tmp_dir: &TmpDir, tmp_dir: &TmpDir,
store: &S, stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
identifier: &Arc<str>,
input_format: ProcessableFormat, input_format: ProcessableFormat,
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<ProcessRead, MagickError> { ) -> Result<ProcessRead, MagickError> {
let stream = store
.to_stream(identifier, None, None)
.await
.map_err(MagickError::Store)?;
thumbnail_animation( thumbnail_animation(
tmp_dir, tmp_dir,
input_format, input_format,

View file

@ -1,15 +1,15 @@
use std::{ffi::OsStr, sync::Arc}; use std::ffi::OsStr;
use actix_web::web::Bytes;
use crate::{ use crate::{
error_code::ErrorCode, error_code::ErrorCode,
formats::ProcessableFormat, formats::ProcessableFormat,
process::{Process, ProcessError, ProcessRead}, process::{Process, ProcessError, ProcessRead},
store::Store, stream::LocalBoxStream,
tmp_file::TmpDir, tmp_file::TmpDir,
}; };
pub(crate) const MAGICK_TEMPORARY_PATH: &str = "MAGICK_TEMPORARY_PATH"; pub(crate) const MAGICK_TEMPORARY_PATH: &str = "MAGICK_TEMPORARY_PATH";
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -17,9 +17,6 @@ pub(crate) enum MagickError {
#[error("Error in imagemagick process")] #[error("Error in imagemagick process")]
Process(#[source] ProcessError), Process(#[source] ProcessError),
#[error("Error in store")]
Store(#[source] crate::store::StoreError),
#[error("Invalid output format")] #[error("Invalid output format")]
Json(#[source] serde_json::Error), Json(#[source] serde_json::Error),
@ -44,6 +41,9 @@ pub(crate) enum MagickError {
#[error("Invalid media file provided")] #[error("Invalid media file provided")]
CommandFailed(ProcessError), CommandFailed(ProcessError),
#[error("Error cleaning up after command")]
Cleanup(#[source] std::io::Error),
#[error("Command output is empty")] #[error("Command output is empty")]
Empty, Empty,
} }
@ -61,7 +61,6 @@ impl MagickError {
pub(crate) const fn error_code(&self) -> ErrorCode { pub(crate) const fn error_code(&self) -> ErrorCode {
match self { match self {
Self::CommandFailed(_) => ErrorCode::COMMAND_FAILURE, Self::CommandFailed(_) => ErrorCode::COMMAND_FAILURE,
Self::Store(e) => e.error_code(),
Self::Process(e) => e.error_code(), Self::Process(e) => e.error_code(),
Self::Json(_) Self::Json(_)
| Self::Write(_) | Self::Write(_)
@ -70,6 +69,7 @@ impl MagickError {
| Self::CreateTemporaryDirectory(_) | Self::CreateTemporaryDirectory(_)
| Self::CloseFile(_) | Self::CloseFile(_)
| Self::Discover(_) | Self::Discover(_)
| Self::Cleanup(_)
| Self::Empty => ErrorCode::COMMAND_ERROR, | Self::Empty => ErrorCode::COMMAND_ERROR,
} }
} }
@ -148,21 +148,15 @@ where
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) async fn process_image_store_read<S: Store + 'static>( pub(crate) async fn process_image_stream_read(
tmp_dir: &TmpDir, tmp_dir: &TmpDir,
store: &S, stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
identifier: &Arc<str>,
args: Vec<String>, args: Vec<String>,
input_format: ProcessableFormat, input_format: ProcessableFormat,
format: ProcessableFormat, format: ProcessableFormat,
quality: Option<u8>, quality: Option<u8>,
timeout: u64, timeout: u64,
) -> Result<ProcessRead, MagickError> { ) -> Result<ProcessRead, MagickError> {
let stream = store
.to_stream(identifier, None, None)
.await
.map_err(MagickError::Store)?;
process_image( process_image(
tmp_dir, tmp_dir,
args, args,

View file

@ -1,6 +1,5 @@
use actix_web::web::Bytes; use actix_web::web::Bytes;
use std::{ use std::{
any::Any,
ffi::OsStr, ffi::OsStr,
future::Future, future::Future,
process::{ExitStatus, Stdio}, process::{ExitStatus, Stdio},
@ -72,12 +71,36 @@ impl std::fmt::Debug for Process {
} }
} }
#[async_trait::async_trait(?Send)]
pub(crate) trait Extras {
async fn consume(&mut self) -> std::io::Result<()>;
}
#[async_trait::async_trait(?Send)]
impl Extras for () {
async fn consume(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl<T> Extras for (Box<dyn Extras>, T)
where
T: Extras,
{
async fn consume(&mut self) -> std::io::Result<()> {
let (res1, res2) = tokio::join!(self.0.consume(), self.1.consume());
res1?;
res2
}
}
pub(crate) struct ProcessRead { pub(crate) struct ProcessRead {
reader: BoxRead<'static>, reader: BoxRead<'static>,
handle: LocalBoxFuture<'static, Result<(), ProcessError>>, handle: LocalBoxFuture<'static, Result<(), ProcessError>>,
command: Arc<str>, command: Arc<str>,
id: Uuid, id: Uuid,
extras: Box<dyn Any>, extras: Box<dyn Extras>,
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -100,6 +123,9 @@ pub(crate) enum ProcessError {
#[error("Failed to read stdout for {0}")] #[error("Failed to read stdout for {0}")]
Read(Arc<str>, #[source] std::io::Error), Read(Arc<str>, #[source] std::io::Error),
#[error("Failed to cleanup for command {0}")]
Cleanup(Arc<str>, #[source] std::io::Error),
#[error("Unknown process error")] #[error("Unknown process error")]
Other(#[source] std::io::Error), Other(#[source] std::io::Error),
} }
@ -109,7 +135,9 @@ impl ProcessError {
match self { match self {
Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND, Self::NotFound(_) => ErrorCode::COMMAND_NOT_FOUND,
Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED, Self::PermissionDenied(_) => ErrorCode::COMMAND_PERMISSION_DENIED,
Self::LimitReached | Self::Read(_, _) | Self::Other(_) => ErrorCode::COMMAND_ERROR, Self::LimitReached | Self::Read(_, _) | Self::Cleanup(_, _) | Self::Other(_) => {
ErrorCode::COMMAND_ERROR
}
Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT, Self::Timeout(_) => ErrorCode::COMMAND_TIMEOUT,
Self::Status(_, _) => ErrorCode::COMMAND_FAILURE, Self::Status(_, _) => ErrorCode::COMMAND_FAILURE,
} }
@ -276,7 +304,7 @@ impl ProcessRead {
} }
} }
pub(crate) async fn to_vec(self) -> Result<Vec<u8>, ProcessError> { pub(crate) async fn into_vec(self) -> Result<Vec<u8>, ProcessError> {
let cmd = self.command.clone(); let cmd = self.command.clone();
self.with_stdout(move |mut stdout| async move { self.with_stdout(move |mut stdout| async move {
@ -291,7 +319,7 @@ impl ProcessRead {
.await? .await?
} }
pub(crate) async fn to_string(self) -> Result<String, ProcessError> { pub(crate) async fn into_string(self) -> Result<String, ProcessError> {
let cmd = self.command.clone(); let cmd = self.command.clone();
self.with_stdout(move |mut stdout| async move { self.with_stdout(move |mut stdout| async move {
@ -318,21 +346,25 @@ impl ProcessRead {
handle, handle,
command, command,
id, id,
extras, mut extras,
} = self; } = self;
let (out, res) = tokio::join!( let (out, res) = tokio::join!(
(f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)), (f)(reader).instrument(tracing::info_span!("cmd-reader", %command, %id)),
handle.instrument(tracing::info_span!("cmd-handle", %command, %id)) handle.instrument(tracing::info_span!("cmd-handle", %command, %id))
); );
res?;
drop(extras); extras
.consume()
.await
.map_err(|e| ProcessError::Cleanup(command, e))?;
res?;
Ok(out) Ok(out)
} }
pub(crate) fn add_extras<Extras: 'static>(self, more_extras: Extras) -> ProcessRead { pub(crate) fn add_extras<E: Extras + 'static>(self, more_extras: E) -> ProcessRead {
let Self { let Self {
reader, reader,
handle, handle,

View file

@ -6,6 +6,8 @@ use std::{
use uuid::Uuid; use uuid::Uuid;
use crate::process::Extras;
pub(crate) type ArcTmpDir = Arc<TmpDir>; pub(crate) type ArcTmpDir = Arc<TmpDir>;
#[derive(Debug)] #[derive(Debug)]
@ -15,36 +17,33 @@ pub(crate) struct TmpDir {
impl TmpDir { impl TmpDir {
pub(crate) async fn init<P: AsRef<Path>>(path: P) -> std::io::Result<Arc<Self>> { pub(crate) async fn init<P: AsRef<Path>>(path: P) -> std::io::Result<Arc<Self>> {
let path = path.as_ref().join(Uuid::new_v4().to_string()); let path = path.as_ref().join(Uuid::now_v7().to_string());
tokio::fs::create_dir(&path).await?; tokio::fs::create_dir(&path).await?;
Ok(Arc::new(TmpDir { path: Some(path) })) Ok(Arc::new(TmpDir { path: Some(path) }))
} }
fn build_tmp_file(&self, ext: Option<&str>) -> Arc<Path> { fn build_tmp_file(&self, ext: Option<&str>) -> PathBuf {
if let Some(ext) = ext { if let Some(ext) = ext {
Arc::from(self.path.as_ref().expect("tmp path exists").join(format!( self.path
"{}{}", .as_ref()
Uuid::new_v4(), .expect("tmp path exists")
ext .join(format!("{}{}", Uuid::now_v7(), ext))
)))
} else { } else {
Arc::from( self.path
self.path .as_ref()
.as_ref() .expect("tmp path exists")
.expect("tmp path exists") .join(Uuid::now_v7().to_string())
.join(Uuid::new_v4().to_string()),
)
} }
} }
pub(crate) fn tmp_file(&self, ext: Option<&str>) -> TmpFile { pub(crate) fn tmp_file(&self, ext: Option<&str>) -> TmpFile {
TmpFile(self.build_tmp_file(ext)) TmpFile(Some(self.build_tmp_file(ext)))
} }
pub(crate) async fn tmp_folder(&self) -> std::io::Result<TmpFolder> { pub(crate) async fn tmp_folder(&self) -> std::io::Result<TmpFolder> {
let path = self.build_tmp_file(None); let path = self.build_tmp_file(None);
tokio::fs::create_dir(&path).await?; tokio::fs::create_dir(&path).await?;
Ok(TmpFolder(path)) Ok(TmpFolder(Some(path)))
} }
pub(crate) async fn cleanup(self: Arc<Self>) -> std::io::Result<()> { pub(crate) async fn cleanup(self: Arc<Self>) -> std::io::Result<()> {
@ -65,11 +64,26 @@ impl Drop for TmpDir {
} }
#[must_use] #[must_use]
pub(crate) struct TmpFolder(Arc<Path>); pub(crate) struct TmpFolder(Option<PathBuf>);
impl TmpFolder {
pub(crate) async fn cleanup(mut self) -> std::io::Result<()> {
self.consume().await
}
}
#[async_trait::async_trait(?Send)]
impl Extras for TmpFolder {
async fn consume(&mut self) -> std::io::Result<()> {
tokio::fs::remove_dir_all(&self).await?;
self.0.take();
Ok(())
}
}
impl AsRef<Path> for TmpFolder { impl AsRef<Path> for TmpFolder {
fn as_ref(&self) -> &Path { fn as_ref(&self) -> &Path {
&self.0 self.0.as_deref().unwrap()
} }
} }
@ -77,25 +91,39 @@ impl Deref for TmpFolder {
type Target = Path; type Target = Path;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 self.0.as_deref().unwrap()
} }
} }
impl Drop for TmpFolder { impl Drop for TmpFolder {
fn drop(&mut self) { fn drop(&mut self) {
crate::sync::spawn( if let Some(path) = self.0.take() {
"remove-tmpfolder", let _ = std::fs::remove_dir_all(path);
tokio::fs::remove_dir_all(self.0.clone()), }
);
} }
} }
#[must_use] #[must_use]
pub(crate) struct TmpFile(Arc<Path>); pub(crate) struct TmpFile(Option<PathBuf>);
impl TmpFile {
pub(crate) async fn cleanup(mut self) -> std::io::Result<()> {
self.consume().await
}
}
#[async_trait::async_trait(?Send)]
impl Extras for TmpFile {
async fn consume(&mut self) -> std::io::Result<()> {
tokio::fs::remove_file(&self).await?;
self.0.take();
Ok(())
}
}
impl AsRef<Path> for TmpFile { impl AsRef<Path> for TmpFile {
fn as_ref(&self) -> &Path { fn as_ref(&self) -> &Path {
&self.0 self.0.as_deref().unwrap()
} }
} }
@ -103,12 +131,14 @@ impl Deref for TmpFile {
type Target = Path; type Target = Path;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.0 self.0.as_deref().unwrap()
} }
} }
impl Drop for TmpFile { impl Drop for TmpFile {
fn drop(&mut self) { fn drop(&mut self) {
crate::sync::spawn("remove-tmpfile", tokio::fs::remove_file(self.0.clone())); if let Some(path) = self.0.take() {
let _ = std::fs::remove_file(path);
}
} }
} }

View file

@ -44,6 +44,8 @@ pub(super) async fn transcode_bytes(
) )
.await?; .await?;
input_file.cleanup().await.map_err(FfMpegError::Cleanup)?;
let tmp_two = crate::file::File::open(&output_file) let tmp_two = crate::file::File::open(&output_file)
.await .await
.map_err(FfMpegError::OpenFile)?; .map_err(FfMpegError::OpenFile)?;