From 6c921817e188fa2fb2759be8259661ad1d88c6d2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 24 Sep 2023 11:54:16 -0500 Subject: [PATCH] Enable thumbnailing animations, use dynamic dispatch for a number of async readers --- src/details.rs | 4 -- src/ffmpeg.rs | 116 +----------------------------- src/formats.rs | 21 ++++-- src/formats/image.rs | 2 +- src/generate.rs | 149 +++++++++++++++++++++++++++------------ src/generate/ffmpeg.rs | 112 +++++++++++++++++++++++++++++ src/generate/magick.rs | 81 +++++++++++++++++++++ src/ingest.rs | 7 +- src/lib.rs | 11 ++- src/magick.rs | 16 ++--- src/queue/process.rs | 4 +- src/read.rs | 1 + src/repo.rs | 22 ------ src/validate.rs | 25 ++++--- src/validate/exiftool.rs | 7 +- src/validate/ffmpeg.rs | 4 +- src/validate/magick.rs | 8 +-- 17 files changed, 351 insertions(+), 239 deletions(-) create mode 100644 src/generate/ffmpeg.rs create mode 100644 src/generate/magick.rs create mode 100644 src/read.rs diff --git a/src/details.rs b/src/details.rs index 9141cd2..582e075 100644 --- a/src/details.rs +++ b/src/details.rs @@ -79,10 +79,6 @@ impl Details { } } - pub(crate) fn is_video(&self) -> bool { - self.inner.content_type.type_() == "video" - } - pub(crate) fn created_at(&self) -> time::OffsetDateTime { self.inner.created_at.timestamp } diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index bab1582..6d5bdc9 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -1,18 +1,4 @@ -use std::sync::Arc; - -use crate::{ - error_code::ErrorCode, - formats::InternalVideoFormat, - process::{Process, ProcessError}, - store::{Store, StoreError}, -}; -use tokio::io::AsyncRead; - -#[derive(Clone, Copy, Debug)] -pub(crate) enum ThumbnailFormat { - Jpeg, - // Webp, -} +use crate::{error_code::ErrorCode, process::ProcessError, store::StoreError}; #[derive(Debug, thiserror::Error)] pub(crate) enum FfMpegError { @@ -100,103 +86,3 @@ impl FfMpegError { false } } - -impl ThumbnailFormat { - const fn as_ffmpeg_codec(self) -> &'static str { - match self { - Self::Jpeg => "mjpeg", - // Self::Webp => "webp", - } - } - - const fn to_file_extension(self) -> &'static str { - match self { - Self::Jpeg => ".jpeg", - // Self::Webp => ".webp", - } - } - - const fn as_ffmpeg_format(self) -> &'static str { - match self { - Self::Jpeg => "image2", - // Self::Webp => "webp", - } - } - - pub(crate) fn media_type(self) -> mime::Mime { - match self { - Self::Jpeg => mime::IMAGE_JPEG, - // Self::Webp => crate::formats::mimes::image_webp(), - } - } -} - -#[tracing::instrument(skip(store))] -pub(crate) async fn thumbnail( - store: S, - from: Arc, - input_format: InternalVideoFormat, - format: ThumbnailFormat, - timeout: u64, -) -> Result { - let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension())); - let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; - crate::store::file_store::safe_create_parent(&input_file) - .await - .map_err(FfMpegError::CreateDir)?; - - let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension())); - let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?; - crate::store::file_store::safe_create_parent(&output_file) - .await - .map_err(FfMpegError::CreateDir)?; - - let mut tmp_one = crate::file::File::create(&input_file) - .await - .map_err(FfMpegError::CreateFile)?; - let stream = store - .to_stream(&from, None, None) - .await - .map_err(FfMpegError::Store)?; - tmp_one - .write_from_stream(stream) - .await - .map_err(FfMpegError::Write)?; - tmp_one.close().await.map_err(FfMpegError::CloseFile)?; - - let process = Process::run( - "ffmpeg", - &[ - "-hide_banner", - "-v", - "warning", - "-i", - input_file_str, - "-frames:v", - "1", - "-codec", - format.as_ffmpeg_codec(), - "-f", - format.as_ffmpeg_format(), - output_file_str, - ], - timeout, - )?; - - process.wait().await?; - tokio::fs::remove_file(input_file) - .await - .map_err(FfMpegError::RemoveFile)?; - - let tmp_two = crate::file::File::open(&output_file) - .await - .map_err(FfMpegError::OpenFile)?; - let stream = tmp_two - .read_to_stream(None, None) - .await - .map_err(FfMpegError::ReadFile)?; - let reader = tokio_util::io::StreamReader::new(stream); - let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); - - Ok(Box::pin(clean_reader)) -} diff --git a/src/formats.rs b/src/formats.rs index eccef38..e82fbec 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -166,12 +166,18 @@ impl ProcessableFormat { } } - pub(crate) fn process_to(self, output: InputProcessableFormat) -> Option { + pub(crate) const fn process_to(self, output: InputProcessableFormat) -> Option { match (self, output) { (Self::Image(_), InputProcessableFormat::Avif) => Some(Self::Image(ImageFormat::Avif)), - (Self::Image(_), InputProcessableFormat::Jpeg) => Some(Self::Image(ImageFormat::Jpeg)), - (Self::Image(_), InputProcessableFormat::Jxl) => Some(Self::Image(ImageFormat::Jxl)), - (Self::Image(_), InputProcessableFormat::Png) => Some(Self::Image(ImageFormat::Png)), + (Self::Image(_) | Self::Animation(_), InputProcessableFormat::Jpeg) => { + Some(Self::Image(ImageFormat::Jpeg)) + } + (Self::Image(_) | Self::Animation(_), InputProcessableFormat::Jxl) => { + Some(Self::Image(ImageFormat::Jxl)) + } + (Self::Image(_) | Self::Animation(_), InputProcessableFormat::Png) => { + Some(Self::Image(ImageFormat::Png)) + } (Self::Image(_), InputProcessableFormat::Webp) => Some(Self::Image(ImageFormat::Webp)), (Self::Animation(_), InputProcessableFormat::Apng) => { Some(Self::Animation(AnimationFormat::Apng)) @@ -187,11 +193,12 @@ impl ProcessableFormat { } (Self::Image(_), InputProcessableFormat::Apng) => None, (Self::Image(_), InputProcessableFormat::Gif) => None, - (Self::Animation(_), InputProcessableFormat::Jpeg) => None, - (Self::Animation(_), InputProcessableFormat::Jxl) => None, - (Self::Animation(_), InputProcessableFormat::Png) => None, } } + + pub(crate) const fn should_thumbnail(self, output: Self) -> bool { + matches!((self, output), (Self::Animation(_), Self::Image(_))) + } } impl FromStr for InputProcessableFormat { diff --git a/src/formats/image.rs b/src/formats/image.rs index 00c00bb..6ea1eb3 100644 --- a/src/formats/image.rs +++ b/src/formats/image.rs @@ -90,7 +90,7 @@ impl ImageFormat { } } - pub(super) fn media_type(self) -> mime::Mime { + pub(crate) fn media_type(self) -> mime::Mime { match self { Self::Avif => super::mimes::image_avif(), Self::Jpeg => mime::IMAGE_JPEG, diff --git a/src/generate.rs b/src/generate.rs index 6808590..28a9d16 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -1,14 +1,16 @@ +mod ffmpeg; +mod magick; + use crate::{ concurrent_processor::ProcessMap, details::Details, error::{Error, UploadError}, - ffmpeg::ThumbnailFormat, - formats::{InputProcessableFormat, InternalVideoFormat}, - repo::{Alias, ArcRepo, Hash, VariantAlreadyExists}, + formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, + repo::{ArcRepo, Hash, VariantAlreadyExists}, store::Store, }; use actix_web::web::Bytes; -use std::{path::PathBuf, time::Instant}; +use std::{path::PathBuf, sync::Arc, time::Instant}; use tokio::io::AsyncReadExt; use tracing::Instrument; @@ -45,11 +47,9 @@ pub(crate) async fn generate( store: &S, process_map: &ProcessMap, format: InputProcessableFormat, - alias: Alias, thumbnail_path: PathBuf, thumbnail_args: Vec, - input_format: Option, - thumbnail_format: Option, + original_details: &Details, media: &crate::config::Media, hash: Hash, ) -> Result<(Details, Bytes), Error> { @@ -57,11 +57,9 @@ pub(crate) async fn generate( repo, store, format, - alias, thumbnail_path.clone(), thumbnail_args, - input_format, - thumbnail_format, + original_details, media, hash.clone(), ); @@ -79,44 +77,24 @@ async fn process( repo: &ArcRepo, store: &S, output_format: InputProcessableFormat, - alias: Alias, thumbnail_path: PathBuf, thumbnail_args: Vec, - input_format: Option, - thumbnail_format: Option, + original_details: &Details, media: &crate::config::Media, hash: Hash, ) -> Result<(Details, Bytes), Error> { let guard = MetricsGuard::guard(); let permit = crate::PROCESS_SEMAPHORE.acquire().await; - let identifier = if let Some(identifier) = repo.still_identifier_from_alias(&alias).await? { - identifier - } else { - let Some(identifier) = repo.identifier(hash.clone()).await? else { - return Err(UploadError::MissingIdentifier.into()); - }; - - let thumbnail_format = thumbnail_format.unwrap_or(ThumbnailFormat::Jpeg); - - let reader = crate::ffmpeg::thumbnail( - store.clone(), - identifier, - input_format.unwrap_or(InternalVideoFormat::Mp4), - thumbnail_format, - media.process_timeout, - ) - .await?; - - let motion_identifier = store - .save_async_read(reader, thumbnail_format.media_type()) - .await?; - - repo.relate_motion_identifier(hash.clone(), &motion_identifier) - .await?; - - motion_identifier - }; + let identifier = input_identifier( + repo, + store, + output_format, + hash.clone(), + original_details, + media, + ) + .await?; let input_details = if let Some(details) = repo.details(&identifier).await? { details @@ -133,13 +111,13 @@ async fn process( .processable_format() .expect("Already verified format is processable"); - let Some(format) = input_format.process_to(output_format) else { - return Err(UploadError::InvalidProcessExtension.into()); - }; + let format = input_format + .process_to(output_format) + .ok_or(UploadError::InvalidProcessExtension)?; let quality = match format { - crate::formats::ProcessableFormat::Image(format) => media.image.quality_for(format), - crate::formats::ProcessableFormat::Animation(format) => media.animation.quality_for(format), + ProcessableFormat::Image(format) => media.image.quality_for(format), + ProcessableFormat::Animation(format) => media.animation.quality_for(format), }; let mut processed_reader = crate::magick::process_image_store_read( @@ -185,3 +163,84 @@ async fn process( Ok((details, bytes)) as Result<(Details, Bytes), Error> } + +#[tracing::instrument(skip_all)] +async fn input_identifier( + repo: &ArcRepo, + store: &S, + output_format: InputProcessableFormat, + hash: Hash, + original_details: &Details, + media: &crate::config::Media, +) -> Result, Error> +where + S: Store + 'static, +{ + let should_thumbnail = + if let Some(input_format) = original_details.internal_format().processable_format() { + let output_format = input_format + .process_to(output_format) + .ok_or(UploadError::InvalidProcessExtension)?; + + input_format.should_thumbnail(output_format) + } else { + // video case + true + }; + + if should_thumbnail { + if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { + return Ok(identifier); + }; + + let identifier = repo + .identifier(hash.clone()) + .await? + .ok_or(UploadError::MissingIdentifier)?; + + let (reader, media_type) = if let Some(processable_format) = + original_details.internal_format().processable_format() + { + let thumbnail_format = ImageFormat::Jpeg; + + let reader = magick::thumbnail( + store, + &identifier, + processable_format, + ProcessableFormat::Image(thumbnail_format), + media.image.quality_for(thumbnail_format), + media.process_timeout, + ) + .await?; + + (reader, thumbnail_format.media_type()) + } else { + let thumbnail_format = ffmpeg::ThumbnailFormat::Jpeg; + + let reader = ffmpeg::thumbnail( + store.clone(), + identifier, + original_details + .video_format() + .unwrap_or(InternalVideoFormat::Mp4), + thumbnail_format, + media.process_timeout, + ) + .await?; + + (reader, thumbnail_format.media_type()) + }; + + let motion_identifier = store.save_async_read(reader, media_type).await?; + + repo.relate_motion_identifier(hash, &motion_identifier) + .await?; + + return Ok(motion_identifier); + } + + repo.identifier(hash) + .await? + .ok_or(UploadError::MissingIdentifier) + .map_err(From::from) +} diff --git a/src/generate/ffmpeg.rs b/src/generate/ffmpeg.rs new file mode 100644 index 0000000..7a9bf6c --- /dev/null +++ b/src/generate/ffmpeg.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use crate::{ + ffmpeg::FfMpegError, formats::InternalVideoFormat, process::Process, read::BoxRead, + store::Store, +}; + +#[derive(Clone, Copy, Debug)] +pub(super) enum ThumbnailFormat { + Jpeg, + // Webp, +} + +impl ThumbnailFormat { + const fn as_ffmpeg_codec(self) -> &'static str { + match self { + Self::Jpeg => "mjpeg", + // Self::Webp => "webp", + } + } + + const fn to_file_extension(self) -> &'static str { + match self { + Self::Jpeg => ".jpeg", + // Self::Webp => ".webp", + } + } + + const fn as_ffmpeg_format(self) -> &'static str { + match self { + Self::Jpeg => "image2", + // Self::Webp => "webp", + } + } + + pub(crate) fn media_type(self) -> mime::Mime { + match self { + Self::Jpeg => mime::IMAGE_JPEG, + // Self::Webp => crate::formats::mimes::image_webp(), + } + } +} + +#[tracing::instrument(skip(store))] +pub(super) async fn thumbnail( + store: S, + from: Arc, + input_format: InternalVideoFormat, + format: ThumbnailFormat, + timeout: u64, +) -> Result, FfMpegError> { + let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension())); + let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(FfMpegError::CreateDir)?; + + let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension())); + let output_file_str = output_file.to_str().ok_or(FfMpegError::Path)?; + crate::store::file_store::safe_create_parent(&output_file) + .await + .map_err(FfMpegError::CreateDir)?; + + let mut tmp_one = crate::file::File::create(&input_file) + .await + .map_err(FfMpegError::CreateFile)?; + let stream = store + .to_stream(&from, None, None) + .await + .map_err(FfMpegError::Store)?; + tmp_one + .write_from_stream(stream) + .await + .map_err(FfMpegError::Write)?; + tmp_one.close().await.map_err(FfMpegError::CloseFile)?; + + let process = Process::run( + "ffmpeg", + &[ + "-hide_banner", + "-v", + "warning", + "-i", + input_file_str, + "-frames:v", + "1", + "-codec", + format.as_ffmpeg_codec(), + "-f", + format.as_ffmpeg_format(), + output_file_str, + ], + timeout, + )?; + + process.wait().await?; + tokio::fs::remove_file(input_file) + .await + .map_err(FfMpegError::RemoveFile)?; + + let tmp_two = crate::file::File::open(&output_file) + .await + .map_err(FfMpegError::OpenFile)?; + let stream = tmp_two + .read_to_stream(None, None) + .await + .map_err(FfMpegError::ReadFile)?; + let reader = tokio_util::io::StreamReader::new(stream); + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file); + + Ok(Box::pin(clean_reader)) +} diff --git a/src/generate/magick.rs b/src/generate/magick.rs new file mode 100644 index 0000000..3834355 --- /dev/null +++ b/src/generate/magick.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use crate::{ + formats::ProcessableFormat, magick::MagickError, process::Process, read::BoxRead, store::Store, +}; + +async fn thumbnail_animation( + input_format: ProcessableFormat, + format: ProcessableFormat, + quality: Option, + timeout: u64, + write_file: F, +) -> Result, MagickError> +where + F: FnOnce(crate::file::File) -> Fut, + Fut: std::future::Future>, +{ + let input_file = crate::tmp_file::tmp_file(None); + let input_file_str = input_file.to_str().ok_or(MagickError::Path)?; + crate::store::file_store::safe_create_parent(&input_file) + .await + .map_err(MagickError::CreateDir)?; + + let tmp_one = crate::file::File::create(&input_file) + .await + .map_err(MagickError::CreateFile)?; + let tmp_one = (write_file)(tmp_one).await?; + tmp_one.close().await.map_err(MagickError::CloseFile)?; + + let input_arg = format!("{}:{input_file_str}[0]", input_format.magick_format()); + let output_arg = format!("{}:-", format.magick_format()); + let quality = quality.map(|q| q.to_string()); + + let len = format.coalesce().then(|| 4).unwrap_or(3) + quality.is_some().then(|| 1).unwrap_or(0); + + let mut args: Vec<&str> = Vec::with_capacity(len); + args.push("convert"); + args.push(&input_arg); + if format.coalesce() { + args.push("-coalesce"); + } + if let Some(quality) = &quality { + args.extend(["-quality", quality]); + } + args.push(&output_arg); + + let reader = Process::run("magick", &args, timeout)?.read(); + + let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file); + + Ok(Box::pin(clean_reader)) +} + +pub(super) async fn thumbnail( + store: &S, + identifier: &Arc, + input_format: ProcessableFormat, + format: ProcessableFormat, + quality: Option, + timeout: u64, +) -> Result, MagickError> { + let stream = store + .to_stream(identifier, None, None) + .await + .map_err(MagickError::Store)?; + + thumbnail_animation( + input_format, + format, + quality, + timeout, + |mut tmp_file| async move { + tmp_file + .write_from_stream(stream) + .await + .map_err(MagickError::Write)?; + Ok(tmp_file) + }, + ) + .await +} diff --git a/src/ingest.rs b/src/ingest.rs index 2baeb3b..c2fc9e5 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -2,7 +2,6 @@ use std::{sync::Arc, time::Duration}; use crate::{ bytes_stream::BytesStream, - either::Either, error::{Error, UploadError}, formats::{InternalFormat, Validations}, future::WithMetrics, @@ -93,12 +92,12 @@ where ) .await?; - Either::left(processed_reader) + processed_reader } else { - Either::right(validated_reader) + validated_reader } } else { - Either::right(validated_reader) + validated_reader }; let hasher_reader = Hasher::new(processed_reader); diff --git a/src/lib.rs b/src/lib.rs index 8c91ab3..2d47af1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ mod process; mod processor; mod queue; mod range; +mod read; mod repo; mod repo_04; mod serde_str; @@ -917,11 +918,9 @@ async fn process( &store, &process_map, format, - alias, thumbnail_path, thumbnail_args, - original_details.video_format(), - None, + &original_details, &config.media, hash, ) @@ -1087,7 +1086,7 @@ async fn details_query( let Some(alias) = repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().json(&serde_json::json!({ "msg": "Provided proxy URL has not been cached", - }))) + }))); }; alias } @@ -1193,9 +1192,7 @@ async fn do_serve( }; let Some(identifier) = repo.identifier(hash.clone()).await? else { - tracing::warn!( - "Original File identifier for hash {hash:?} is missing, queue cleanup task", - ); + tracing::warn!("Original File identifier for hash {hash:?} is missing, queue cleanup task",); crate::queue::cleanup_hash(&repo, hash).await?; return Ok(HttpResponse::NotFound().finish()); }; diff --git a/src/magick.rs b/src/magick.rs index 29c9636..d08fef3 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -4,8 +4,10 @@ use crate::{ error_code::ErrorCode, formats::ProcessableFormat, process::{Process, ProcessError}, + read::BoxRead, store::Store, }; + use tokio::io::AsyncRead; #[derive(Debug, thiserror::Error)] @@ -94,7 +96,7 @@ async fn process_image( quality: Option, timeout: u64, write_file: F, -) -> Result +) -> Result, MagickError> where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, @@ -115,11 +117,9 @@ where let output_arg = format!("{}:-", format.magick_format()); let quality = quality.map(|q| q.to_string()); - let len = if format.coalesce() { - process_args.len() + 4 - } else { - process_args.len() + 3 - }; + let len = format.coalesce().then(|| 4).unwrap_or(3) + + quality.is_some().then(|| 1).unwrap_or(0) + + process_args.len(); let mut args: Vec<&str> = Vec::with_capacity(len); args.push("convert"); @@ -148,7 +148,7 @@ pub(crate) async fn process_image_store_read( format: ProcessableFormat, quality: Option, timeout: u64, -) -> Result { +) -> Result, MagickError> { let stream = store .to_stream(identifier, None, None) .await @@ -178,7 +178,7 @@ pub(crate) async fn process_image_async_read( format: ProcessableFormat, quality: Option, timeout: u64, -) -> Result { +) -> Result, MagickError> { process_image( args, input_format, diff --git a/src/queue/process.rs b/src/queue/process.rs index 480a504..bf8204b 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -199,11 +199,9 @@ async fn generate( store, process_map, target_format, - source, process_path, process_args, - original_details.video_format(), - None, + &original_details, &config.media, hash, ) diff --git a/src/read.rs b/src/read.rs new file mode 100644 index 0000000..aacb4b8 --- /dev/null +++ b/src/read.rs @@ -0,0 +1 @@ +pub(crate) type BoxRead<'a> = std::pin::Pin>; diff --git a/src/repo.rs b/src/repo.rs index 01f5c6e..c226307 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -130,28 +130,6 @@ pub(crate) trait FullRepo: self.aliases_for_hash(hash).await } - - #[tracing::instrument(skip(self))] - async fn still_identifier_from_alias( - &self, - alias: &Alias, - ) -> Result>, RepoError> { - let Some(hash) = self.hash(alias).await? else { - return Ok(None); - }; - - let Some(identifier) = self.identifier(hash.clone()).await? else { - return Ok(None); - }; - - match self.details(&identifier).await? { - Some(details) if details.is_video() => { - self.motion_identifier(hash).await.map_err(From::from) - } - Some(_) => Ok(Some(identifier)), - None => Ok(None), - } - } } #[async_trait::async_trait(?Send)] diff --git a/src/validate.rs b/src/validate.rs index 0d94c55..fdf6d13 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -4,16 +4,15 @@ mod magick; use crate::{ discover::Discovery, - either::Either, error::Error, error_code::ErrorCode, formats::{ AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, InternalFormat, Validations, }, + read::BoxRead, }; use actix_web::web::Bytes; -use tokio::io::AsyncRead; #[derive(Debug, thiserror::Error)] pub(crate) enum ValidationError { @@ -60,7 +59,7 @@ pub(crate) async fn validate_bytes( bytes: Bytes, validations: Validations<'_>, timeout: u64, -) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { +) -> Result<(InternalFormat, BoxRead<'static>), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); } @@ -77,7 +76,7 @@ pub(crate) async fn validate_bytes( let (format, read) = process_image(bytes, *input, width, height, validations.image, timeout).await?; - Ok((format, Either::left(read))) + Ok((format, read)) } InputFile::Animation(input) => { let (format, read) = process_animation( @@ -91,7 +90,7 @@ pub(crate) async fn validate_bytes( ) .await?; - Ok((format, Either::right(Either::left(read)))) + Ok((format, read)) } InputFile::Video(input) => { let (format, read) = process_video( @@ -105,7 +104,7 @@ pub(crate) async fn validate_bytes( ) .await?; - Ok((format, Either::right(Either::right(read)))) + Ok((format, read)) } } } @@ -118,7 +117,7 @@ async fn process_image( height: u16, validations: &crate::config::Image, timeout: u64, -) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { +) -> Result<(InternalFormat, BoxRead<'static>), Error> { if width > validations.max_width { return Err(ValidationError::Width.into()); } @@ -140,9 +139,9 @@ async fn process_image( let read = if needs_transcode { let quality = validations.quality_for(format); - Either::left(magick::convert_image(input.format, format, quality, timeout, bytes).await?) + magick::convert_image(input.format, format, quality, timeout, bytes).await? } else { - Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?) + exiftool::clear_metadata_bytes_read(bytes, timeout)? }; Ok((InternalFormat::Image(format), read)) @@ -183,7 +182,7 @@ async fn process_animation( frames: u32, validations: &crate::config::Animation, timeout: u64, -) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { +) -> Result<(InternalFormat, BoxRead<'static>), Error> { validate_animation(bytes.len(), width, height, frames, validations)?; let AnimationOutput { @@ -194,9 +193,9 @@ async fn process_animation( let read = if needs_transcode { let quality = validations.quality_for(format); - Either::left(magick::convert_animation(input, format, quality, timeout, bytes).await?) + magick::convert_animation(input, format, quality, timeout, bytes).await? } else { - Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?) + exiftool::clear_metadata_bytes_read(bytes, timeout)? }; Ok((InternalFormat::Animation(format), read)) @@ -240,7 +239,7 @@ async fn process_video( frames: u32, validations: &crate::config::Video, timeout: u64, -) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> { +) -> Result<(InternalFormat, BoxRead<'static>), Error> { validate_video(bytes.len(), width, height, frames, validations)?; let output = input.build_output( diff --git a/src/validate/exiftool.rs b/src/validate/exiftool.rs index da557bc..ef5be40 100644 --- a/src/validate/exiftool.rs +++ b/src/validate/exiftool.rs @@ -1,14 +1,13 @@ use actix_web::web::Bytes; -use tokio::io::AsyncRead; -use crate::{exiftool::ExifError, process::Process}; +use crate::{exiftool::ExifError, process::Process, read::BoxRead}; #[tracing::instrument(level = "trace", skip(input))] pub(crate) fn clear_metadata_bytes_read( input: Bytes, timeout: u64, -) -> Result { +) -> Result, ExifError> { let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?; - Ok(process.bytes_read(input)) + Ok(Box::pin(process.bytes_read(input))) } diff --git a/src/validate/ffmpeg.rs b/src/validate/ffmpeg.rs index 9184ea1..3b1c8ae 100644 --- a/src/validate/ffmpeg.rs +++ b/src/validate/ffmpeg.rs @@ -1,10 +1,10 @@ use actix_web::web::Bytes; -use tokio::io::AsyncRead; use crate::{ ffmpeg::FfMpegError, formats::{InputVideoFormat, OutputVideo}, process::Process, + read::BoxRead, }; pub(super) async fn transcode_bytes( @@ -13,7 +13,7 @@ pub(super) async fn transcode_bytes( crf: u8, timeout: u64, bytes: Bytes, -) -> Result { +) -> Result, FfMpegError> { let input_file = crate::tmp_file::tmp_file(None); let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?; crate::store::file_store::safe_create_parent(&input_file) diff --git a/src/validate/magick.rs b/src/validate/magick.rs index 0a9f7fd..530fa96 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -1,10 +1,10 @@ use actix_web::web::Bytes; -use tokio::io::AsyncRead; use crate::{ formats::{AnimationFormat, ImageFormat}, magick::MagickError, process::Process, + read::BoxRead, }; pub(super) async fn convert_image( @@ -13,7 +13,7 @@ pub(super) async fn convert_image( quality: Option, timeout: u64, bytes: Bytes, -) -> Result { +) -> Result, MagickError> { convert( input.magick_format(), output.magick_format(), @@ -31,7 +31,7 @@ pub(super) async fn convert_animation( quality: Option, timeout: u64, bytes: Bytes, -) -> Result { +) -> Result, MagickError> { convert( input.magick_format(), output.magick_format(), @@ -50,7 +50,7 @@ async fn convert( quality: Option, timeout: u64, bytes: Bytes, -) -> Result { +) -> Result, MagickError> { let input_file = crate::tmp_file::tmp_file(None); let input_file_str = input_file.to_str().ok_or(MagickError::Path)?;