From ad51e6cd9fe9fdd7ace3a6074e3dffd889e58eb9 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Feb 2024 13:16:25 -0600 Subject: [PATCH] Avoid writing blurhashed images to tmp --- src/blurhash.rs | 43 ++++++++++--------------------------------- src/process.rs | 26 ++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/src/blurhash.rs b/src/blurhash.rs index 0127bc5..897e04f 100644 --- a/src/blurhash.rs +++ b/src/blurhash.rs @@ -1,6 +1,8 @@ -use std::ffi::OsStr; +use std::ffi::{OsStr, OsString}; +use futures_core::Stream; use tokio::io::AsyncReadExt; +use tokio_util::bytes::Bytes; use crate::{ details::Details, @@ -47,13 +49,7 @@ where .internal_format() .processable_format() .expect("not a video"), - |mut tmp_file| async move { - tmp_file - .write_from_stream(stream) - .await - .map_err(MagickError::Write)?; - Ok(tmp_file) - }, + stream, ) .await?; @@ -83,37 +79,19 @@ where Ok(blurhash) } -async fn read_rgba( +async fn read_rgba( state: &State, input_format: ProcessableFormat, - write_file: F, -) -> Result -where - F: FnOnce(crate::file::File) -> Fut, - Fut: std::future::Future>, -{ + stream: impl Stream> + 'static, +) -> Result { let temporary_path = state .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = state.tmp_dir.tmp_file(None); - crate::store::file_store::safe_create_parent(&input_file) - .await - .map_err(MagickError::CreateDir)?; - - let tmp_one = crate::file::File::create(&input_file) - .await - .map_err(MagickError::CreateFile)?; - let tmp_one = (write_file)(tmp_one).await?; - tmp_one.close().await.map_err(MagickError::CloseFile)?; - - let mut input_arg = [ - input_format.magick_format().as_ref(), - input_file.as_os_str(), - ] - .join(":".as_ref()); + let mut input_arg = OsString::from(input_format.magick_format()); + input_arg.push(":-"); if input_format.coalesce() { input_arg.push("[0]"); } @@ -126,8 +104,7 @@ where ]; let process = Process::run("magick", &args, &envs, state.config.media.process_timeout)? - .read() - .add_extras(input_file) + .stream_read(stream) .add_extras(temporary_path); Ok(process) diff --git a/src/process.rs b/src/process.rs index c512b4f..4d04ba5 100644 --- a/src/process.rs +++ b/src/process.rs @@ -6,11 +6,13 @@ use std::{ time::{Duration, Instant}, }; +use futures_core::Stream; +use streem::IntoStreamer; use tokio::{ - io::AsyncReadExt, + io::{AsyncReadExt, AsyncWriteExt}, process::{Child, ChildStdin, Command}, }; -use tokio_util::io::ReaderStream; +use tokio_util::{bytes::Bytes, io::ReaderStream}; use tracing::Instrument; use uuid::Uuid; @@ -249,6 +251,26 @@ impl Process { }) } + pub(crate) fn stream_read(self, input: S) -> ProcessRead + where + S: Stream> + 'static, + { + self.spawn_fn(move |mut stdin| async move { + let stream = std::pin::pin!(input); + let mut stream = stream.into_streamer(); + + while let Some(mut bytes) = stream.try_next().await? { + match stdin.write_all_buf(&mut bytes).await { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, + Err(e) => return Err(e), + } + } + + Ok(()) + }) + } + pub(crate) fn read(self) -> ProcessRead { self.spawn_fn(|_| async { Ok(()) }) }