From 210af5d7d9f56636285b73774c5ce624ab91b210 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 11 Nov 2023 14:22:12 -0600 Subject: [PATCH] Add danger_dummy_mode --- defaults.toml | 1 + pict-rs.toml | 8 ++++ src/config/commandline.rs | 9 +++++ src/config/defaults.rs | 2 + src/config/file.rs | 2 + src/details.rs | 12 ++++++ src/generate.rs | 73 +++++++++++++++++----------------- src/ingest.rs | 68 +++++++++++++++++++++++++++----- src/lib.rs | 82 ++++++++++++--------------------------- src/queue/process.rs | 12 +++--- 10 files changed, 159 insertions(+), 110 deletions(-) diff --git a/defaults.toml b/defaults.toml index 61f8f3be..76e5c411 100644 --- a/defaults.toml +++ b/defaults.toml @@ -1,6 +1,7 @@ [server] address = "0.0.0.0:8080" read_only = false +danger_dummy_mode = false max_file_count = 1 [client] diff --git a/pict-rs.toml b/pict-rs.toml index f7d7a9e2..997a2786 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -12,6 +12,14 @@ address = '0.0.0.0:8080' # This can be useful if you need to run a copy of pict-rs while performing maintenance. read_only = false +## Optional: whether to run pict-rs without dependencies. +# environment variable: PICTRS__SERVER__DANGER_DUMMY_MODE +# default: false +# +# This means pict-rs will not be able to inspect metadata of uploaded media, or perform processing +# on it. This mode is provided for use in test environments. It should not be used in production. +danger_dummy_mode = false + ## Optional: shared secret for internal endpoints # environment variable: PICTRS__SERVER__API_KEY # default: empty diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 13d25d1e..fa9258e2 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -100,6 +100,7 @@ impl Args { media_video_quality_2160, media_filters, read_only, + danger_dummy_mode, max_file_count, store, }) => { @@ -107,6 +108,7 @@ impl Args { address, api_key, read_only, + danger_dummy_mode, max_file_count, }; @@ -509,6 +511,8 @@ struct Server { api_key: Option, #[serde(skip_serializing_if = "std::ops::Not::not")] read_only: bool, + #[serde(skip_serializing_if = "std::ops::Not::not")] + danger_dummy_mode: bool, #[serde(skip_serializing_if = "Option::is_none")] max_file_count: Option, } @@ -1097,6 +1101,11 @@ struct Run { #[arg(long)] read_only: bool, + /// Allow running without ffmpeg, imagemagick, or exiftool. This will allow hosting arbitrary + /// files and provide inaccurate metadata for uploaded media + #[arg(long)] + danger_dummy_mode: bool, + #[command(subcommand)] store: Option, } diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 12efde38..160dc1d0 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -21,6 +21,7 @@ pub(crate) struct Defaults { struct ServerDefaults { address: SocketAddr, read_only: bool, + danger_dummy_mode: bool, max_file_count: u32, } @@ -181,6 +182,7 @@ impl Default for ServerDefaults { ServerDefaults { address: "0.0.0.0:8080".parse().expect("Valid address string"), read_only: false, + danger_dummy_mode: false, max_file_count: 1, } } diff --git a/src/config/file.rs b/src/config/file.rs index 88941001..a04e1a01 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -113,6 +113,8 @@ pub(crate) struct Server { pub(crate) read_only: bool, + pub(crate) danger_dummy_mode: bool, + pub(crate) max_file_count: u32, } diff --git a/src/details.rs b/src/details.rs index cf8d4a22..1c3d7b6a 100644 --- a/src/details.rs +++ b/src/details.rs @@ -119,6 +119,18 @@ impl Details { } } + pub(crate) fn danger_dummy(format: InternalFormat) -> Self { + Self::from_parts_full( + format, + 0, + 0, + None, + HumanDate { + timestamp: time::OffsetDateTime::now_utc(), + }, + ) + } + pub(crate) fn from_parts_full( format: InternalFormat, width: u16, diff --git a/src/generate.rs b/src/generate.rs index f5ce4290..fe262ea4 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -42,7 +42,7 @@ impl Drop for MetricsGuard { } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(repo, store, hash, process_map, media))] +#[tracing::instrument(skip(repo, store, hash, process_map, config))] pub(crate) async fn generate( tmp_dir: &TmpDir, repo: &ArcRepo, @@ -52,30 +52,41 @@ pub(crate) async fn generate( thumbnail_path: PathBuf, thumbnail_args: Vec, original_details: &Details, - media: &crate::config::Media, + config: &crate::config::Configuration, hash: Hash, ) -> Result<(Details, Bytes), Error> { - let process_fut = process( - tmp_dir, - repo, - store, - format, - thumbnail_path.clone(), - thumbnail_args, - original_details, - media, - hash.clone(), - ); + if config.server.danger_dummy_mode { + let identifier = repo + .identifier(hash) + .await? + .ok_or(UploadError::MissingIdentifier)?; - let (details, bytes) = process_map - .process(hash, thumbnail_path, process_fut) - .await?; + let bytes = store.to_bytes(&identifier, None, None).await?.into_bytes(); - Ok((details, bytes)) + Ok((original_details.clone(), bytes)) + } else { + let process_fut = process( + tmp_dir, + repo, + store, + format, + thumbnail_path.clone(), + thumbnail_args, + original_details, + config, + hash.clone(), + ); + + let (details, bytes) = process_map + .process(hash, thumbnail_path, process_fut) + .await?; + + Ok((details, bytes)) + } } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(repo, store, hash, media))] +#[tracing::instrument(skip(repo, store, hash, config))] async fn process( tmp_dir: &TmpDir, repo: &ArcRepo, @@ -84,7 +95,7 @@ async fn process( thumbnail_path: PathBuf, thumbnail_args: Vec, original_details: &Details, - media: &crate::config::Media, + config: &crate::config::Configuration, hash: Hash, ) -> Result<(Details, Bytes), Error> { let guard = MetricsGuard::guard(); @@ -97,22 +108,12 @@ async fn process( output_format, hash.clone(), original_details, - media, + &config.media, ) .await?; - let input_details = if let Some(details) = repo.details(&identifier).await? { - details - } else { - let bytes_stream = store.to_bytes(&identifier, None, None).await?; - - let details = - Details::from_bytes(tmp_dir, media.process_timeout, bytes_stream.into_bytes()).await?; - - repo.relate_details(&identifier, &details).await?; - - details - }; + let input_details = + crate::ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await?; let input_format = input_details .internal_format() @@ -122,8 +123,8 @@ async fn process( let format = input_format.process_to(output_format); let quality = match format { - ProcessableFormat::Image(format) => media.image.quality_for(format), - ProcessableFormat::Animation(format) => media.animation.quality_for(format), + ProcessableFormat::Image(format) => config.media.image.quality_for(format), + ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), }; let mut processed_reader = crate::magick::process_image_store_read( @@ -134,7 +135,7 @@ async fn process( input_format, format, quality, - media.process_timeout, + config.media.process_timeout, ) .await?; @@ -147,7 +148,7 @@ async fn process( drop(permit); - let details = Details::from_bytes(tmp_dir, media.process_timeout, bytes.clone()).await?; + let details = Details::from_bytes(tmp_dir, config.media.process_timeout, bytes.clone()).await?; let identifier = store .save_bytes(bytes.clone(), details.media_type()) diff --git a/src/ingest.rs b/src/ingest.rs index b553e546..3b85952b 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration}; use crate::{ bytes_stream::BytesStream, @@ -18,7 +18,7 @@ use streem::IntoStreamer; use tracing::{Instrument, Span}; mod hasher; -use hasher::Hasher; +use hasher::{Hasher, State}; #[derive(Debug)] pub(crate) struct Session { @@ -46,16 +46,12 @@ where Ok(buf.into_bytes()) } -#[tracing::instrument(skip(repo, store, client, stream, media))] -pub(crate) async fn ingest( +async fn process_ingest( tmp_dir: &TmpDir, - repo: &ArcRepo, store: &S, - client: &ClientWithMiddleware, stream: impl Stream> + 'static, - declared_alias: Option, media: &crate::config::Media, -) -> Result +) -> Result<(InternalFormat, Arc, Details, Rc>), Error> where S: Store, { @@ -115,6 +111,56 @@ where drop(permit); + Ok((input_type, identifier, details, state)) +} + +async fn dummy_ingest( + store: &S, + stream: impl Stream> + 'static, +) -> Result<(InternalFormat, Arc, Details, Rc>), Error> +where + S: Store, +{ + let stream = crate::stream::map(stream, |res| match res { + Ok(bytes) => Ok(bytes), + Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)), + }); + + let reader = Box::pin(tokio_util::io::StreamReader::new(stream)); + + let hasher_reader = Hasher::new(reader); + let state = hasher_reader.state(); + + let input_type = InternalFormat::Image(crate::formats::ImageFormat::Png); + + let identifier = store + .save_async_read(hasher_reader, input_type.media_type()) + .await?; + + let details = Details::danger_dummy(input_type); + + Ok((input_type, identifier, details, state)) +} + +#[tracing::instrument(skip(repo, store, client, stream, config))] +pub(crate) async fn ingest( + tmp_dir: &TmpDir, + repo: &ArcRepo, + store: &S, + client: &ClientWithMiddleware, + stream: impl Stream> + 'static, + declared_alias: Option, + config: &crate::config::Configuration, +) -> Result +where + S: Store, +{ + let (input_type, identifier, details, state) = if config.server.danger_dummy_mode { + dummy_ingest(store, stream).await? + } else { + process_ingest(tmp_dir, store, stream, &config.media).await? + }; + let mut session = Session { repo: repo.clone(), delete_token: DeleteToken::generate(), @@ -123,12 +169,14 @@ where identifier: Some(identifier.clone()), }; - if let Some(endpoint) = &media.external_validation { + if let Some(endpoint) = &config.media.external_validation { let stream = store.to_stream(&identifier, None, None).await?; let response = client .post(endpoint.as_str()) - .timeout(Duration::from_secs(media.external_validation_timeout)) + .timeout(Duration::from_secs( + config.media.external_validation_timeout, + )) .header("Content-Type", input_type.media_type().as_ref()) .body(Body::wrap_stream(crate::stream::make_send(stream))) .send() diff --git a/src/lib.rs b/src/lib.rs index bc131fa2..f8460f12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,6 +113,16 @@ async fn ensure_details( return Err(UploadError::MissingAlias.into()); }; + ensure_details_identifier(tmp_dir, repo, store, config, &identifier).await +} + +async fn ensure_details_identifier( + tmp_dir: &TmpDir, + repo: &ArcRepo, + store: &S, + config: &Configuration, + identifier: &Arc, +) -> Result { let details = repo.details(&identifier).await?; if let Some(details) = details { @@ -121,10 +131,14 @@ async fn ensure_details( } else { if config.server.read_only { return Err(UploadError::ReadOnly.into()); + } else if config.server.danger_dummy_mode { + return Ok(Details::danger_dummy(formats::InternalFormat::Image( + formats::ImageFormat::Png, + ))); } tracing::debug!("generating new details from {:?}", identifier); - let bytes_stream = store.to_bytes(&identifier, None, None).await?; + let bytes_stream = store.to_bytes(identifier, None, None).await?; let new_details = Details::from_bytes( tmp_dir, config.media.process_timeout, @@ -132,7 +146,7 @@ async fn ensure_details( ) .await?; tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; + repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); Ok(new_details) } @@ -195,13 +209,7 @@ impl FormData for Upload { let stream = crate::stream::from_err(stream); ingest::ingest( - &tmp_dir, - &repo, - &**store, - &client, - stream, - None, - &config.media, + &tmp_dir, &repo, &**store, &client, stream, None, &config, ) .await } @@ -279,7 +287,7 @@ impl FormData for Import { &client, stream, Some(Alias::from_existing(&filename)), - &config.media, + &config, ) .await } @@ -534,7 +542,7 @@ async fn ingest_inline( client: &ClientWithMiddleware, config: &Configuration, ) -> Result<(Alias, DeleteToken, Details), Error> { - let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config.media).await?; + let session = ingest::ingest(tmp_dir, repo, store, client, stream, None, &config).await?; let alias = session.alias().expect("alias should exist").to_owned(); @@ -922,29 +930,8 @@ async fn process( let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if let Some(identifier) = identifier_opt { - let details = repo.details(&identifier).await?; - - let details = if let Some(details) = details { - tracing::debug!("details exist"); - details - } else { - if config.server.read_only { - return Err(UploadError::ReadOnly.into()); - } - - tracing::debug!("generating new details from {:?}", identifier); - let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let new_details = Details::from_bytes( - &tmp_dir, - config.media.process_timeout, - bytes_stream.into_bytes(), - ) - .await?; - tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; - tracing::debug!("stored"); - new_details - }; + let details = + ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() @@ -970,7 +957,7 @@ async fn process( thumbnail_path, thumbnail_args, &original_details, - &config.media, + &config, hash, ) .await?; @@ -1047,29 +1034,8 @@ async fn process_head( let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if let Some(identifier) = identifier_opt { - let details = repo.details(&identifier).await?; - - let details = if let Some(details) = details { - tracing::debug!("details exist"); - details - } else { - if config.server.read_only { - return Err(UploadError::ReadOnly.into()); - } - - tracing::debug!("generating new details from {:?}", identifier); - let bytes_stream = store.to_bytes(&identifier, None, None).await?; - let new_details = Details::from_bytes( - &tmp_dir, - config.media.process_timeout, - bytes_stream.into_bytes(), - ) - .await?; - tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(&identifier, &new_details).await?; - tracing::debug!("stored"); - new_details - }; + let details = + ensure_details_identifier(&tmp_dir, &repo, &store, &config, &identifier).await?; if let Some(public_url) = store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() diff --git a/src/queue/process.rs b/src/queue/process.rs index 6587528a..399099eb 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -44,7 +44,7 @@ where Arc::from(identifier), Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), - &config.media, + config, ) .await? } @@ -112,7 +112,7 @@ impl Drop for UploadGuard { } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, repo, store, client, media))] +#[tracing::instrument(skip(tmp_dir, repo, store, client, config))] async fn process_ingest( tmp_dir: &ArcTmpDir, repo: &ArcRepo, @@ -121,7 +121,7 @@ async fn process_ingest( unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, - media: &crate::config::Media, + config: &Configuration, ) -> Result<(), Error> where S: Store + 'static, @@ -135,7 +135,7 @@ where let repo = repo.clone(); let client = client.clone(); - let media = media.clone(); + let config = config.clone(); let error_boundary = crate::sync::spawn("ingest-media", async move { let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); @@ -146,7 +146,7 @@ where &client, stream, declared_alias, - &media, + &config, ) .await?; @@ -218,7 +218,7 @@ async fn generate( process_path, process_args, &original_details, - &config.media, + config, hash, ) .await?;