Add process timeout
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2023-08-05 12:41:06 -05:00
parent 5f12be0c6a
commit fee4ed1e3e
22 changed files with 358 additions and 177 deletions

View file

@ -26,6 +26,7 @@ path = "/mnt"
[media]
max_file_size = 40
process_timeout = 30
filters = [
"blur",
"crop",

View file

@ -151,6 +151,11 @@ path = '/mnt'
# default: 40
max_file_size = 40
## Optional: Timeout (in seconds) for all spawned media processing operations
# environment variable: PICTRS__MEDIA__PROCESS_TIMEOUT
# default: 30
process_timeout = 30
## Optional: preprocessing steps for uploaded images
# environment variable: PICTRS__MEDIA__PREPROCESS_STEPS
# default: empty

View file

@ -53,6 +53,7 @@ impl Args {
metrics_prometheus_address,
media_preprocess_steps,
media_max_file_size,
media_process_timeout,
media_retention_variants,
media_retention_proxy,
media_image_max_width,
@ -177,6 +178,7 @@ impl Args {
let media = Media {
max_file_size: media_max_file_size,
process_timeout: media_process_timeout,
preprocess_steps: media_preprocess_steps,
filters: media_filters,
retention: retention.set(),
@ -459,6 +461,8 @@ struct Media {
#[serde(skip_serializing_if = "Option::is_none")]
max_file_size: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
process_timeout: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
preprocess_steps: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
filters: Option<Vec<String>>,
@ -801,6 +805,10 @@ struct Run {
#[arg(long)]
media_max_file_size: Option<usize>,
/// Timeout for any media processing operation
#[arg(long)]
media_process_timeout: Option<u64>,
/// How long to keep image "variants" around
///
/// A variant is any processed version of an original image

View file

@ -73,6 +73,7 @@ struct OldDbDefaults {
#[serde(rename_all = "snake_case")]
struct MediaDefaults {
max_file_size: usize,
process_timeout: u64,
filters: Vec<String>,
retention: RetentionDefaults,
image: ImageDefaults,
@ -238,6 +239,7 @@ impl Default for MediaDefaults {
fn default() -> Self {
MediaDefaults {
max_file_size: 40,
process_timeout: 30,
filters: vec![
"blur".into(),
"crop".into(),

View file

@ -168,6 +168,8 @@ pub(crate) struct OldDb {
pub(crate) struct Media {
pub(crate) max_file_size: usize,
pub(crate) process_timeout: u64,
#[serde(skip_serializing_if = "Option::is_none")]
preprocess_steps: Option<PreprocessSteps>,

View file

@ -31,13 +31,13 @@ impl Details {
self.content_type.type_() == "video"
}
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
pub(crate) async fn from_bytes(timeout: u64, input: web::Bytes) -> Result<Self, Error> {
let DiscoveryLite {
format,
width,
height,
frames,
} = crate::discover::discover_bytes_lite(input).await?;
} = crate::discover::discover_bytes_lite(timeout, input).await?;
Ok(Details::from_parts(format, width, height, frames))
}
@ -45,13 +45,14 @@ impl Details {
pub(crate) async fn from_store<S: Store>(
store: &S,
identifier: &S::Identifier,
timeout: u64,
) -> Result<Self, Error> {
let DiscoveryLite {
format,
width,
height,
frames,
} = crate::discover::discover_store_lite(store, identifier).await?;
} = crate::discover::discover_store_lite(store, identifier, timeout).await?;
Ok(Details::from_parts(format, width, height, frames))
}

View file

@ -38,13 +38,14 @@ pub(crate) enum DiscoverError {
}
pub(crate) async fn discover_bytes_lite(
timeout: u64,
bytes: Bytes,
) -> Result<DiscoveryLite, crate::error::Error> {
if let Some(discovery) = ffmpeg::discover_bytes_lite(bytes.clone()).await? {
if let Some(discovery) = ffmpeg::discover_bytes_lite(timeout, bytes.clone()).await? {
return Ok(discovery);
}
let discovery = magick::discover_bytes_lite(bytes).await?;
let discovery = magick::discover_bytes_lite(timeout, bytes).await?;
Ok(discovery)
}
@ -52,28 +53,34 @@ pub(crate) async fn discover_bytes_lite(
pub(crate) async fn discover_store_lite<S>(
store: &S,
identifier: &S::Identifier,
timeout: u64,
) -> Result<DiscoveryLite, crate::error::Error>
where
S: Store,
{
if let Some(discovery) =
ffmpeg::discover_stream_lite(store.to_stream(identifier, None, None).await?).await?
ffmpeg::discover_stream_lite(timeout, store.to_stream(identifier, None, None).await?)
.await?
{
return Ok(discovery);
}
let discovery =
magick::discover_stream_lite(store.to_stream(identifier, None, None).await?).await?;
magick::discover_stream_lite(timeout, store.to_stream(identifier, None, None).await?)
.await?;
Ok(discovery)
}
pub(crate) async fn discover_bytes(bytes: Bytes) -> Result<Discovery, crate::error::Error> {
let discovery = ffmpeg::discover_bytes(bytes.clone()).await?;
pub(crate) async fn discover_bytes(
timeout: u64,
bytes: Bytes,
) -> Result<Discovery, crate::error::Error> {
let discovery = ffmpeg::discover_bytes(timeout, bytes.clone()).await?;
let discovery = magick::confirm_bytes(discovery, bytes.clone()).await?;
let discovery = magick::confirm_bytes(discovery, timeout, bytes.clone()).await?;
let discovery = exiftool::check_reorient(discovery, bytes).await?;
let discovery = exiftool::check_reorient(discovery, timeout, bytes).await?;
Ok(discovery)
}

View file

@ -16,11 +16,12 @@ pub(super) async fn check_reorient(
height,
frames,
}: Discovery,
timeout: u64,
bytes: Bytes,
) -> Result<Discovery, ExifError> {
let input = match input {
InputFile::Image(ImageInput { format, .. }) => {
let needs_reorient = needs_reorienting(bytes).await?;
let needs_reorient = needs_reorienting(bytes, timeout).await?;
InputFile::Image(ImageInput {
format,
@ -39,8 +40,8 @@ pub(super) async fn check_reorient(
}
#[tracing::instrument(level = "trace", skip(input))]
async fn needs_reorienting(input: Bytes) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"])?;
async fn needs_reorienting(input: Bytes, timeout: u64) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout)?;
let mut reader = process.bytes_read(input);
let mut buf = String::new();

View file

@ -63,46 +63,65 @@ struct Flags {
alpha: usize,
}
pub(super) async fn discover_bytes(bytes: Bytes) -> Result<Option<Discovery>, FfMpegError> {
discover_file_full(move |mut file| {
let bytes = bytes.clone();
pub(super) async fn discover_bytes(
timeout: u64,
bytes: Bytes,
) -> Result<Option<Discovery>, FfMpegError> {
discover_file_full(
move |mut file| {
let bytes = bytes.clone();
async move {
file.write_from_bytes(bytes)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
}
})
async move {
file.write_from_bytes(bytes)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
}
},
timeout,
)
.await
}
pub(super) async fn discover_bytes_lite(
timeout: u64,
bytes: Bytes,
) -> Result<Option<DiscoveryLite>, FfMpegError> {
discover_file_lite(move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
})
discover_file_lite(
move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
},
timeout,
)
.await
}
pub(super) async fn discover_stream_lite<S>(stream: S) -> Result<Option<DiscoveryLite>, FfMpegError>
pub(super) async fn discover_stream_lite<S>(
timeout: u64,
stream: S,
) -> Result<Option<DiscoveryLite>, FfMpegError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
{
discover_file_lite(move |mut file| async move {
file.write_from_stream(stream)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
})
discover_file_lite(
move |mut file| async move {
file.write_from_stream(stream)
.await
.map_err(FfMpegError::Write)?;
Ok(file)
},
timeout,
)
.await
}
async fn discover_file_lite<F, Fut>(f: F) -> Result<Option<DiscoveryLite>, FfMpegError>
async fn discover_file_lite<F, Fut>(
f: F,
timeout: u64,
) -> Result<Option<DiscoveryLite>, FfMpegError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
@ -112,7 +131,7 @@ where
width,
height,
frames,
}) = discover_file(f)
}) = discover_file(f, timeout)
.await? else {
return Ok(None);
};
@ -130,12 +149,12 @@ where
}))
}
async fn discover_file_full<F, Fut>(f: F) -> Result<Option<Discovery>, FfMpegError>
async fn discover_file_full<F, Fut>(f: F, timeout: u64) -> Result<Option<Discovery>, FfMpegError>
where
F: Fn(crate::file::File) -> Fut + Clone,
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
{
let Some(DiscoveryLite { format, width, height, frames }) = discover_file(f.clone()).await? else {
let Some(DiscoveryLite { format, width, height, frames }) = discover_file(f.clone(), timeout).await? else {
return Ok(None);
};
@ -143,12 +162,12 @@ where
InternalFormat::Video(InternalVideoFormat::Webm) => {
static ALPHA_PIXEL_FORMATS: OnceLock<HashSet<String>> = OnceLock::new();
let format = pixel_format(f).await?;
let format = pixel_format(f, timeout).await?;
let alpha = match ALPHA_PIXEL_FORMATS.get() {
Some(alpha_pixel_formats) => alpha_pixel_formats.contains(&format),
None => {
let pixel_formats = alpha_pixel_formats().await?;
let pixel_formats = alpha_pixel_formats(timeout).await?;
let alpha = pixel_formats.contains(&format);
let _ = ALPHA_PIXEL_FORMATS.set(pixel_formats);
alpha
@ -187,7 +206,7 @@ where
}
#[tracing::instrument(skip(f))]
async fn discover_file<F, Fut>(f: F) -> Result<Option<DiscoveryLite>, FfMpegError>
async fn discover_file<F, Fut>(f: F, timeout: u64) -> Result<Option<DiscoveryLite>, FfMpegError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
@ -220,6 +239,7 @@ where
"json",
input_file_str,
],
timeout,
)?;
let mut output = Vec::new();
@ -237,7 +257,7 @@ where
parse_discovery(output)
}
async fn pixel_format<F, Fut>(f: F) -> Result<String, FfMpegError>
async fn pixel_format<F, Fut>(f: F, timeout: u64) -> Result<String, FfMpegError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, FfMpegError>>,
@ -267,6 +287,7 @@ where
"compact=p=0:nk=1",
input_file_str,
],
timeout,
)?;
let mut output = Vec::new();
@ -283,7 +304,7 @@ where
Ok(String::from_utf8_lossy(&output).trim().to_string())
}
async fn alpha_pixel_formats() -> Result<HashSet<String>, FfMpegError> {
async fn alpha_pixel_formats(timeout: u64) -> Result<HashSet<String>, FfMpegError> {
let process = Process::run(
"ffprobe",
&[
@ -296,6 +317,7 @@ async fn alpha_pixel_formats() -> Result<HashSet<String>, FfMpegError> {
"-print_format",
"json",
],
timeout,
)?;
let mut output = Vec::new();

View file

@ -49,31 +49,44 @@ impl Discovery {
}
}
pub(super) async fn discover_bytes_lite(bytes: Bytes) -> Result<DiscoveryLite, MagickError> {
discover_file_lite(move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
Ok(file)
})
pub(super) async fn discover_bytes_lite(
timeout: u64,
bytes: Bytes,
) -> Result<DiscoveryLite, MagickError> {
discover_file_lite(
move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
Ok(file)
},
timeout,
)
.await
}
pub(super) async fn discover_stream_lite<S>(stream: S) -> Result<DiscoveryLite, MagickError>
pub(super) async fn discover_stream_lite<S>(
timeout: u64,
stream: S,
) -> Result<DiscoveryLite, MagickError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
discover_file_lite(move |mut file| async move {
file.write_from_stream(stream)
.await
.map_err(MagickError::Write)?;
Ok(file)
})
discover_file_lite(
move |mut file| async move {
file.write_from_stream(stream)
.await
.map_err(MagickError::Write)?;
Ok(file)
},
timeout,
)
.await
}
pub(super) async fn confirm_bytes(
discovery: Option<Discovery>,
timeout: u64,
bytes: Bytes,
) -> Result<Discovery, MagickError> {
match discovery {
@ -83,12 +96,15 @@ pub(super) async fn confirm_bytes(
height,
..
}) => {
let frames = count_avif_frames(move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
Ok(file)
})
let frames = count_avif_frames(
move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
Ok(file)
},
timeout,
)
.await?;
return Ok(Discovery {
@ -110,17 +126,20 @@ pub(super) async fn confirm_bytes(
}
}
discover_file(move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
discover_file(
move |mut file| async move {
file.write_from_bytes(bytes)
.await
.map_err(MagickError::Write)?;
Ok(file)
})
Ok(file)
},
timeout,
)
.await
}
async fn count_avif_frames<F, Fut>(f: F) -> Result<u32, MagickError>
async fn count_avif_frames<F, Fut>(f: F, timeout: u64) -> Result<u32, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
@ -137,7 +156,11 @@ where
let tmp_one = (f)(tmp_one).await?;
tmp_one.close().await.map_err(MagickError::CloseFile)?;
let process = Process::run("magick", &["convert", "-ping", input_file_str, "INFO:"])?;
let process = Process::run(
"magick",
&["convert", "-ping", input_file_str, "INFO:"],
timeout,
)?;
let mut output = String::new();
process
@ -166,15 +189,15 @@ where
Ok(lines)
}
async fn discover_file_lite<F, Fut>(f: F) -> Result<DiscoveryLite, MagickError>
async fn discover_file_lite<F, Fut>(f: F, timeout: u64) -> Result<DiscoveryLite, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
{
discover_file(f).await.map(Discovery::lite)
discover_file(f, timeout).await.map(Discovery::lite)
}
async fn discover_file<F, Fut>(f: F) -> Result<Discovery, MagickError>
async fn discover_file<F, Fut>(f: F, timeout: u64) -> Result<Discovery, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
@ -191,7 +214,11 @@ where
let tmp_one = (f)(tmp_one).await?;
tmp_one.close().await.map_err(MagickError::CloseFile)?;
let process = Process::run("magick", &["convert", "-ping", input_file_str, "JSON:"])?;
let process = Process::run(
"magick",
&["convert", "-ping", input_file_str, "JSON:"],
timeout,
)?;
let mut output = Vec::new();
process

View file

@ -31,8 +31,8 @@ impl ExifError {
}
#[tracing::instrument(level = "trace", skip(input))]
pub(crate) async fn needs_reorienting(input: Bytes) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"])?;
pub(crate) async fn needs_reorienting(timeout: u64, input: Bytes) -> Result<bool, ExifError> {
let process = Process::run("exiftool", &["-n", "-Orientation", "-"], timeout)?;
let mut reader = process.bytes_read(input);
let mut buf = String::new();
@ -45,8 +45,11 @@ pub(crate) async fn needs_reorienting(input: Bytes) -> Result<bool, ExifError> {
}
#[tracing::instrument(level = "trace", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result<impl AsyncRead + Unpin, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?;
pub(crate) fn clear_metadata_bytes_read(
timeout: u64,
input: Bytes,
) -> Result<impl AsyncRead + Unpin, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?;
Ok(process.bytes_read(input))
}

View file

@ -113,6 +113,7 @@ pub(crate) async fn thumbnail<S: Store>(
from: S::Identifier,
input_format: InternalVideoFormat,
format: ThumbnailFormat,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.file_extension()));
let input_file_str = input_file.to_str().ok_or(FfMpegError::Path)?;
@ -155,6 +156,7 @@ pub(crate) async fn thumbnail<S: Store>(
format.as_ffmpeg_format(),
output_file_str,
],
timeout,
)?;
process.wait().await?;

View file

@ -107,6 +107,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
identifier,
input_format.unwrap_or(InternalVideoFormat::Mp4),
thumbnail_format,
media.process_timeout,
)
.await?;
@ -123,7 +124,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
let input_details = if let Some(details) = repo.details(&identifier).await? {
details
} else {
let details = Details::from_store(store, &identifier).await?;
let details = Details::from_store(store, &identifier, media.process_timeout).await?;
repo.relate_details(&identifier, &details).await?;
@ -151,6 +152,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
input_format,
format,
quality,
media.process_timeout,
)
.await?;
@ -163,7 +165,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
drop(permit);
let details = Details::from_bytes(bytes.clone()).await?;
let details = Details::from_bytes(media.process_timeout, bytes.clone()).await?;
let identifier = store
.save_bytes(bytes.clone(), details.media_type())

View file

@ -64,7 +64,8 @@ where
};
tracing::trace!("Validating bytes");
let (input_type, validated_reader) = crate::validate::validate_bytes(bytes, prescribed).await?;
let (input_type, validated_reader) =
crate::validate::validate_bytes(bytes, prescribed, media.process_timeout).await?;
let processed_reader = if let Some(operations) = media.preprocess_steps() {
if let Some(format) = input_type.processable_format() {
@ -84,6 +85,7 @@ where
format,
format,
quality,
media.process_timeout,
)
.await?;

View file

@ -122,7 +122,8 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(store, &identifier).await?;
let new_details =
Details::from_store(store, &identifier, config.media.process_timeout).await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
@ -798,7 +799,8 @@ async fn process<R: FullRepo, S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(&store, &identifier).await?;
let new_details =
Details::from_store(&store, &identifier, config.media.process_timeout).await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
@ -927,7 +929,8 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
}
tracing::debug!("generating new details from {:?}", identifier);
let new_details = Details::from_store(&store, &identifier).await?;
let new_details =
Details::from_store(&store, &identifier, config.media.process_timeout).await?;
tracing::debug!("storing details for {:?}", identifier);
repo.relate_details(&identifier, &new_details).await?;
tracing::debug!("stored");
@ -1744,6 +1747,7 @@ async fn migrate_inner<S1>(
from: S1,
to: config::primitives::Store,
skip_missing_files: bool,
timeout: u64,
) -> color_eyre::Result<()>
where
S1: Store + 'static,
@ -1753,7 +1757,9 @@ where
let to = FileStore::build(path.clone(), repo.clone()).await?;
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, timeout).await?
}
}
}
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
@ -1789,7 +1795,9 @@ where
.build(client);
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
Repo::Sled(repo) => {
migrate_store(repo, from, to, skip_missing_files, timeout).await?
}
}
}
}
@ -1897,7 +1905,15 @@ impl PictRsConfiguration {
match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(repo, client, from, to, skip_missing_files).await?;
migrate_inner(
repo,
client,
from,
to,
skip_missing_files,
config.media.process_timeout,
)
.await?;
}
config::primitives::Store::ObjectStorage(
config::primitives::ObjectStorage {
@ -1933,7 +1949,15 @@ impl PictRsConfiguration {
.await?
.build(client.clone());
migrate_inner(repo, client, from, to, skip_missing_files).await?;
migrate_inner(
repo,
client,
from,
to,
skip_missing_files,
config.media.process_timeout,
)
.await?;
}
}

View file

@ -68,6 +68,7 @@ async fn process_image<F, Fut>(
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
write_file: F,
) -> Result<impl AsyncRead + Unpin, MagickError>
where
@ -108,7 +109,7 @@ where
}
args.push(&output_arg);
let reader = Process::run("magick", &args)?.read();
let reader = Process::run("magick", &args, timeout)?.read();
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file);
@ -122,6 +123,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> {
let stream = store
.to_stream(identifier, None, None)
@ -133,6 +135,7 @@ pub(crate) async fn process_image_store_read<S: Store + 'static>(
input_format,
format,
quality,
timeout,
|mut tmp_file| async move {
tmp_file
.write_from_stream(stream)
@ -150,12 +153,14 @@ pub(crate) async fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
input_format: ProcessableFormat,
format: ProcessableFormat,
quality: Option<u8>,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, MagickError> {
process_image(
args,
input_format,
format,
quality,
timeout,
|mut tmp_file| async move {
tmp_file
.write_from_async_read(async_read)

View file

@ -17,6 +17,7 @@ pub(super) async fn migrate_store<R, S1, S2>(
from: S1,
to: S2,
skip_missing_files: bool,
timeout: u64,
) -> Result<(), Error>
where
S1: Store + Clone + 'static,
@ -38,8 +39,14 @@ where
let mut failure_count = 0;
while let Err(e) =
do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await
while let Err(e) = do_migrate_store(
repo.clone(),
from.clone(),
to.clone(),
skip_missing_files,
timeout,
)
.await
{
tracing::error!("Migration failed with {}", format!("{e:?}"));
@ -69,6 +76,7 @@ struct MigrateState<R, S1, S2> {
pct: AtomicU64,
index: AtomicU64,
started_at: Instant,
timeout: u64,
}
async fn do_migrate_store<R, S1, S2>(
@ -76,6 +84,7 @@ async fn do_migrate_store<R, S1, S2>(
from: S1,
to: S2,
skip_missing_files: bool,
timeout: u64,
) -> Result<(), Error>
where
S1: Store + 'static,
@ -110,6 +119,7 @@ where
pct: AtomicU64::new(initial_repo_size / 100),
index: AtomicU64::new(0),
started_at: Instant::now(),
timeout,
});
let mut joinset = tokio::task::JoinSet::new();
@ -160,6 +170,7 @@ where
pct,
index,
started_at,
timeout,
} = state;
let current_index = index.fetch_add(1, Ordering::Relaxed);
@ -212,7 +223,7 @@ where
if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? {
if !repo.is_migrated(&identifier).await? {
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
Ok(new_identifier) => {
migrate_details(repo, &identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.clone().into(), &new_identifier)
@ -244,7 +255,7 @@ where
for (variant, identifier) in repo.variants(hash.clone().into()).await? {
if !repo.is_migrated(&identifier).await? {
match migrate_file(repo, from, to, &identifier, *skip_missing_files).await {
match migrate_file(repo, from, to, &identifier, *skip_missing_files, *timeout).await {
Ok(new_identifier) => {
migrate_details(repo, &identifier, &new_identifier).await?;
repo.remove_variant(hash.clone().into(), variant.clone())
@ -280,7 +291,16 @@ where
}
}
match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
match migrate_file(
repo,
from,
to,
&original_identifier,
*skip_missing_files,
*timeout,
)
.await
{
Ok(new_identifier) => {
migrate_details(repo, &original_identifier, &new_identifier).await?;
repo.update_identifier(hash.clone().into(), &new_identifier)
@ -338,6 +358,7 @@ async fn migrate_file<R, S1, S2>(
to: &S2,
identifier: &S1::Identifier,
skip_missing_files: bool,
timeout: u64,
) -> Result<S2::Identifier, MigrateError>
where
R: IdentifierRepo,
@ -347,7 +368,7 @@ where
let mut failure_count = 0;
loop {
match do_migrate_file(repo, from, to, identifier).await {
match do_migrate_file(repo, from, to, identifier, timeout).await {
Ok(identifier) => return Ok(identifier),
Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => {
return Err(MigrateError::From(e));
@ -380,6 +401,7 @@ async fn do_migrate_file<R, S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
timeout: u64,
) -> Result<S2::Identifier, MigrateError>
where
R: IdentifierRepo,
@ -407,7 +429,7 @@ where
let details = if let Some(details) = details_opt {
details
} else {
let new_details = Details::from_store(from, identifier)
let new_details = Details::from_store(from, identifier, timeout)
.await
.map_err(MigrateError::Details)?;
repo.relate_details(identifier, &new_details)

View file

@ -5,11 +5,11 @@ use std::{
pin::Pin,
process::{ExitStatus, Stdio},
task::{Context, Poll},
time::Instant,
time::{Duration, Instant},
};
use tokio::{
io::{AsyncRead, AsyncWriteExt, ReadBuf},
process::{Child, ChildStdin, Command},
process::{Child, ChildStdin, ChildStdout, Command},
sync::oneshot::{channel, Receiver},
};
use tracing::{Instrument, Span};
@ -56,6 +56,7 @@ pub(crate) struct Process {
command: String,
child: Child,
guard: MetricsGuard,
timeout: Duration,
}
impl std::fmt::Debug for Process {
@ -68,15 +69,14 @@ struct DropHandle {
inner: JoinHandle<()>,
}
pin_project_lite::pin_project! {
struct ProcessRead<I> {
#[pin]
inner: I,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
handle: DropHandle,
eof: bool,
}
pub(crate) struct ProcessRead<I> {
inner: I,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
#[allow(dead_code)]
handle: DropHandle,
eof: bool,
sleep: Pin<Box<actix_rt::time::Sleep>>,
}
#[derive(Debug, thiserror::Error)]
@ -90,6 +90,9 @@ pub(crate) enum ProcessError {
#[error("Reached process spawn limit")]
LimitReached,
#[error("{0} timed out")]
Timeout(String),
#[error("{0} Failed with {1}")]
Status(String, ExitStatus),
@ -98,9 +101,9 @@ pub(crate) enum ProcessError {
}
impl Process {
pub(crate) fn run(command: &str, args: &[&str]) -> Result<Self, ProcessError> {
pub(crate) fn run(command: &str, args: &[&str], timeout: u64) -> Result<Self, ProcessError> {
let res = tracing::trace_span!(parent: None, "Create command", %command)
.in_scope(|| Self::spawn(command, Command::new(command).args(args)));
.in_scope(|| Self::spawn(command, Command::new(command).args(args), timeout));
match res {
Ok(this) => Ok(this),
@ -115,7 +118,7 @@ impl Process {
}
}
fn spawn(command: &str, cmd: &mut Command) -> std::io::Result<Self> {
fn spawn(command: &str, cmd: &mut Command, timeout: u64) -> std::io::Result<Self> {
tracing::trace_span!(parent: None, "Spawn command", %command).in_scope(|| {
let guard = MetricsGuard::guard(command.into());
@ -128,133 +131,152 @@ impl Process {
child,
command: String::from(command),
guard,
timeout: Duration::from_secs(timeout),
})
})
}
#[tracing::instrument(skip(self))]
pub(crate) async fn wait(mut self) -> Result<(), ProcessError> {
let res = self.child.wait().await;
pub(crate) async fn wait(self) -> Result<(), ProcessError> {
let Process {
command,
mut child,
guard,
timeout,
} = self;
let res = actix_rt::time::timeout(timeout, child.wait()).await;
match res {
Ok(status) if status.success() => {
self.guard.disarm();
Ok(Ok(status)) if status.success() => {
guard.disarm();
Ok(())
}
Ok(status) => Err(ProcessError::Status(self.command, status)),
Err(e) => Err(ProcessError::Other(e)),
Ok(Ok(status)) => Err(ProcessError::Status(command, status)),
Ok(Err(e)) => Err(ProcessError::Other(e)),
Err(_) => {
child.kill().await.map_err(ProcessError::Other)?;
Err(ProcessError::Timeout(command))
}
}
}
pub(crate) fn bytes_read(self, input: Bytes) -> impl AsyncRead + Unpin {
pub(crate) fn bytes_read(self, input: Bytes) -> ProcessRead<ChildStdout> {
self.spawn_fn(move |mut stdin| {
let mut input = input;
async move { stdin.write_all_buf(&mut input).await }
})
}
pub(crate) fn read(self) -> impl AsyncRead + Unpin {
pub(crate) fn read(self) -> ProcessRead<ChildStdout> {
self.spawn_fn(|_| async { Ok(()) })
}
#[allow(unknown_lints)]
#[allow(clippy::let_with_type_underscore)]
#[tracing::instrument(level = "trace", skip_all)]
fn spawn_fn<F, Fut>(mut self, f: F) -> impl AsyncRead + Unpin
fn spawn_fn<F, Fut>(self, f: F) -> ProcessRead<ChildStdout>
where
F: FnOnce(ChildStdin) -> Fut + 'static,
Fut: Future<Output = std::io::Result<()>>,
{
let stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let Process {
command,
mut child,
guard,
timeout,
} = self;
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %self.command)
let stdin = child.stdin.take().expect("stdin exists");
let stdout = child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel", %command)
.in_scope(channel::<std::io::Error>);
let span = tracing::info_span!(parent: None, "Background process task", %self.command);
let span = tracing::info_span!(parent: None, "Background process task", %command);
span.follows_from(Span::current());
let mut child = self.child;
let command = self.command;
let guard = self.guard;
let handle = tracing::trace_span!(parent: None, "Spawn task", %command).in_scope(|| {
actix_rt::spawn(
async move {
if let Err(e) = (f)(stdin).await {
let _ = tx.send(e);
return;
}
let child_fut = async {
(f)(stdin).await?;
match child.wait().await {
Ok(status) => {
if status.success() {
guard.disarm();
} else {
let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
StatusError(status),
));
}
child.wait().await
};
let error = match actix_rt::time::timeout(timeout, child_fut).await {
Ok(Ok(status)) if status.success() => {
guard.disarm();
return;
}
Err(e) => {
let _ = tx.send(e);
Ok(Ok(status)) => {
std::io::Error::new(std::io::ErrorKind::Other, StatusError(status))
}
}
Ok(Err(e)) => e,
Err(_) => std::io::ErrorKind::TimedOut.into(),
};
let _ = tx.send(error);
let _ = child.kill().await;
}
.instrument(span),
)
});
let sleep = actix_rt::time::sleep(timeout);
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
eof: false,
sleep: Box::pin(sleep),
}
}
}
impl<I> AsyncRead for ProcessRead<I>
where
I: AsyncRead,
I: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = self.as_mut().project();
let err_recv = this.err_recv;
let err_closed = this.err_closed;
let eof = this.eof;
let inner = this.inner;
if !*err_closed {
if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
*err_closed = true;
if !self.err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) {
self.err_closed = true;
if let Ok(err) = res {
return Poll::Ready(Err(err));
}
if *eof {
if self.eof {
return Poll::Ready(Ok(()));
}
}
if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) {
self.err_closed = true;
return Poll::Ready(Err(std::io::ErrorKind::TimedOut.into()));
}
}
if !*eof {
if !self.eof {
let before_size = buf.filled().len();
return match inner.poll_read(cx, buf) {
return match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
if buf.filled().len() == before_size {
*eof = true;
self.eof = true;
if !*err_closed {
if !self.err_closed {
// reached end of stream & haven't received process signal
return Poll::Pending;
}
@ -263,7 +285,7 @@ where
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
*eof = true;
self.eof = true;
Poll::Ready(Err(e))
}
@ -271,7 +293,7 @@ where
};
}
if *err_closed && *eof {
if self.err_closed && self.eof {
return Poll::Ready(Ok(()));
}

View file

@ -44,6 +44,7 @@ const MEGABYTES: usize = 1024 * 1024;
pub(crate) async fn validate_bytes(
bytes: Bytes,
validations: Validations<'_>,
timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> {
if bytes.is_empty() {
return Err(ValidationError::Empty.into());
@ -54,12 +55,12 @@ pub(crate) async fn validate_bytes(
width,
height,
frames,
} = crate::discover::discover_bytes(bytes.clone()).await?;
} = crate::discover::discover_bytes(timeout, bytes.clone()).await?;
match &input {
InputFile::Image(input) => {
let (format, read) =
process_image(bytes, *input, width, height, validations.image).await?;
process_image(bytes, *input, width, height, validations.image, timeout).await?;
Ok((format, Either::left(read)))
}
@ -71,6 +72,7 @@ pub(crate) async fn validate_bytes(
height,
frames.unwrap_or(1),
&validations,
timeout,
)
.await?;
@ -84,6 +86,7 @@ pub(crate) async fn validate_bytes(
height,
frames.unwrap_or(1),
validations.video,
timeout,
)
.await?;
@ -99,6 +102,7 @@ async fn process_image(
width: u16,
height: u16,
validations: &crate::config::Image,
timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> {
if width > validations.max_width {
return Err(ValidationError::Width.into());
@ -121,9 +125,9 @@ async fn process_image(
let read = if needs_transcode {
let quality = validations.quality_for(format);
Either::left(magick::convert_image(input.format, format, quality, bytes).await?)
Either::left(magick::convert_image(input.format, format, quality, timeout, bytes).await?)
} else {
Either::right(exiftool::clear_metadata_bytes_read(bytes)?)
Either::right(exiftool::clear_metadata_bytes_read(bytes, timeout)?)
};
Ok((InternalFormat::Image(format), read))
@ -163,6 +167,7 @@ async fn process_animation(
height: u16,
frames: u32,
validations: &Validations<'_>,
timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> {
match validate_animation(bytes.len(), width, height, frames, validations.animation) {
Ok(()) => {
@ -174,9 +179,13 @@ async fn process_animation(
let read = if needs_transcode {
let quality = validations.animation.quality_for(format);
Either::left(magick::convert_animation(input, format, quality, bytes).await?)
Either::left(
magick::convert_animation(input, format, quality, timeout, bytes).await?,
)
} else {
Either::right(Either::left(exiftool::clear_metadata_bytes_read(bytes)?))
Either::right(Either::left(exiftool::clear_metadata_bytes_read(
bytes, timeout,
)?))
};
Ok((InternalFormat::Animation(format), read))
@ -190,7 +199,7 @@ async fn process_animation(
);
let read = Either::right(Either::right(
magick::convert_video(input, output, bytes).await?,
magick::convert_video(input, output, timeout, bytes).await?,
));
Ok((InternalFormat::Video(output.internal_format()), read))
@ -237,6 +246,7 @@ async fn process_video(
height: u16,
frames: u32,
validations: &crate::config::Video,
timeout: u64,
) -> Result<(InternalFormat, impl AsyncRead + Unpin), Error> {
validate_video(bytes.len(), width, height, frames, validations)?;
@ -248,7 +258,7 @@ async fn process_video(
let crf = validations.crf_for(width, height);
let read = ffmpeg::transcode_bytes(input, output, crf, bytes).await?;
let read = ffmpeg::transcode_bytes(input, output, crf, timeout, bytes).await?;
Ok((InternalFormat::Video(output.internal_format()), read))
}

View file

@ -4,8 +4,11 @@ use tokio::io::AsyncRead;
use crate::{exiftool::ExifError, process::Process};
#[tracing::instrument(level = "trace", skip(input))]
pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> Result<impl AsyncRead + Unpin, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"])?;
pub(crate) fn clear_metadata_bytes_read(
input: Bytes,
timeout: u64,
) -> Result<impl AsyncRead + Unpin, ExifError> {
let process = Process::run("exiftool", &["-all=", "-", "-out", "-"], timeout)?;
Ok(process.bytes_read(input))
}

View file

@ -11,6 +11,7 @@ pub(super) async fn transcode_bytes(
input_format: VideoFormat,
output_format: OutputVideoFormat,
crf: u8,
timeout: u64,
bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, FfMpegError> {
let input_file = crate::tmp_file::tmp_file(None);
@ -37,6 +38,7 @@ pub(super) async fn transcode_bytes(
output_file_str,
output_format,
crf,
timeout,
)
.await?;
@ -59,6 +61,7 @@ async fn transcode_files(
output_path: &str,
output_format: OutputVideoFormat,
crf: u8,
timeout: u64,
) -> Result<(), FfMpegError> {
let mut args = vec![
"-hide_banner",
@ -96,7 +99,7 @@ async fn transcode_files(
output_path,
]);
Process::run("ffmpeg", &args)?.wait().await?;
Process::run("ffmpeg", &args, timeout)?.wait().await?;
Ok(())
}

View file

@ -11,6 +11,7 @@ pub(super) async fn convert_image(
input: ImageFormat,
output: ImageFormat,
quality: Option<u8>,
timeout: u64,
bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> {
convert(
@ -18,6 +19,7 @@ pub(super) async fn convert_image(
output.magick_format(),
false,
quality,
timeout,
bytes,
)
.await
@ -27,6 +29,7 @@ pub(super) async fn convert_animation(
input: AnimationFormat,
output: AnimationFormat,
quality: Option<u8>,
timeout: u64,
bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> {
convert(
@ -34,6 +37,7 @@ pub(super) async fn convert_animation(
output.magick_format(),
true,
quality,
timeout,
bytes,
)
.await
@ -42,6 +46,7 @@ pub(super) async fn convert_animation(
pub(super) async fn convert_video(
input: AnimationFormat,
output: OutputVideoFormat,
timeout: u64,
bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> {
convert(
@ -49,6 +54,7 @@ pub(super) async fn convert_video(
output.magick_format(),
true,
None,
timeout,
bytes,
)
.await
@ -59,6 +65,7 @@ async fn convert(
output: &'static str,
coalesce: bool,
quality: Option<u8>,
timeout: u64,
bytes: Bytes,
) -> Result<impl AsyncRead + Unpin, MagickError> {
let input_file = crate::tmp_file::tmp_file(None);
@ -95,7 +102,7 @@ async fn convert(
args.push(&output_arg);
let reader = Process::run("magick", &args)?.read();
let reader = Process::run("magick", &args, timeout)?.read();
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, input_file);