From 50d118a2a724d02daf471b4e3d29731fddaa1ead Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 25 Sep 2022 15:17:33 -0500 Subject: [PATCH] Implement image pre-processing on upload --- Cargo.lock | 1 + Cargo.toml | 3 ++- README.md | 17 ++++++++++---- pict-rs.toml | 37 +++++++++++++++++++++++------ src/config/commandline.rs | 11 +++++++++ src/config/file.rs | 23 ++++++++++++++++++ src/config/primitives.rs | 8 +++++++ src/ingest.rs | 22 +++++++++++++++--- src/magick.rs | 48 +++++++++++++++++++++++++++----------- src/process.rs | 49 +++++++++++++++++++++++++++++++++++++++ 10 files changed, 190 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8dad32f..3044ff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,6 +1580,7 @@ dependencies = [ "serde", "serde_cbor", "serde_json", + "serde_urlencoded", "sha2", "sled", "storage-path-generator", diff --git a/Cargo.toml b/Cargo.toml index 7505b69..753c775 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,11 +40,12 @@ once_cell = "1.4.0" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" pin-project-lite = "0.2.7" +quick-xml = { version = "0.24.1", features = ["serialize"] } rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" -quick-xml = { version = "0.24.1", features = ["serialize"] } +serde_urlencoded = "0.7.1" sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" diff --git a/README.md b/README.md index 9f30617..2b0f5f1 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ _a simple image hosting service_ ## Usage ### Running ``` -pict-rs 0.4.0-alpha.1 +pict-rs 0.4.0-alpha.7 asonix A simple image hosting service @@ -77,6 +77,9 @@ OPTIONS: -h, --help Print help information + --media-cache-duration + How long, in hours, to keep media ingested through the "cached" endpoint + --media-enable-silent-video Whether to enable GIF and silent MP4 uploads. Full videos are unsupported @@ -98,11 +101,14 @@ OPTIONS: --media-max-width The maximum width, in pixels, for uploaded media + --media-preprocess-steps + Optional pre-processing steps for uploaded media + --media-skip-validate-imports Whether to validate media on the "import" endpoint --worker-id - + ID of this pict-rs node. Doesn't do much yet SUBCOMMANDS: filesystem Run pict-rs with filesystem storage @@ -327,7 +333,7 @@ set. A secure API key can be generated by any password generator. - `POST /internal/import` for uploading an image while preserving the filename as the first alias. The upload format and response format are the same as the `POST /image` endpoint. -- `POST /internal/purge?alias={alias} Purge a file by it's alias. This removes all aliases and +- `POST /internal/purge?alias={alias}` Purge a file by it's alias. This removes all aliases and files associated with the query. This endpoint returns the following JSON @@ -337,10 +343,13 @@ A secure API key can be generated by any password generator. "aliases": ["asdf.png"] } ``` -- `GET /internal/aliases?alias={alias} Get the aliases for a file by it's alias +- `GET /internal/aliases?alias={alias}` Get the aliases for a file by it's alias - `?alias={alias}` get aliases by alias This endpiont returns the same JSON as the purge endpoint +- `DELETE /internal/variants` Queue a cleanup for generated variants of uploaded images. + + If any of the cleaned variants are fetched again, they will be re-generated. Additionally, all endpoints support setting deadlines, after which the request will cease processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an diff --git a/pict-rs.toml b/pict-rs.toml index 727f78d..45981ed 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -115,6 +115,13 @@ path = '/mnt' ## Media Processing Configuration [media] +## Optional: preprocessing steps for uploaded images +# environment variable: PICTRS__MEDIA__PREPROCESS_STEPS +# default: empty +# +# This configuration is the same format as the process endpoint's query arguments +preprocess_steps = 'crop=16x9&resize=1200&blur=0.2' + ## Optional: max media width (in pixels) # environment variable: PICTRS__MEDIA__MAX_WIDTH # default: 10,000 @@ -189,15 +196,36 @@ cache_capacity = 67108864 # available options: filesystem, object_storage type = 'object_storage' +## Required: endpoint at which the object storage exists +# environment variable: PICTRS__STORE__ENDPOINT +# default: empty +# +# examples: +# - `http://localhost:9000` # minio +# - `https://s3.dualstack.eu-west-1.amazonaws.com` # s3 +endpoint = 'http://minio:9000' + +## Optional: How to format object storage requests +# environment variable: PICTRS__STORE__USE_PATH_STYLE +# default: false +# +# When this is true, objects will be fetched from http{s}://{endpoint}:{port}/{bucket_name}/{object} +# When false, objects will be fetched from http{s}://{bucket_name}.{endpoint}:{port}/{object} +# +# Set to true when using minio +use_path_style = false + ## Required: object storage bucket name # environment variable: PICTRS__STORE__BUCKET_NAME # default: empty -bucket_name = 'BUCKET_NAME' +bucket_name = 'pict-rs' ## Required: object storage region # environment variable: PICTRS__STORE__REGION # default: empty -region = 'REGION' +# +# When using minio, this can be set to `minio` +region = 'minio' ## Required: object storage access key # environment variable: PICTRS__STORE__ACCESS_KEY @@ -209,11 +237,6 @@ access_key = 'ACCESS_KEY' # default: empty secret_key = 'SECRET_KEY' -## Optional: object storage security token -# environment variable: PICTRS__STORE__SECURITY_TOKEN -# default: empty -security_token = 'SECURITY_TOKEN' - ## Optional: object storage session token # environment variable: PICTRS__STORE__SESSION_TOKEN # default: empty diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 69d98c4..1b017be 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -45,6 +45,7 @@ impl Args { address, api_key, worker_id, + media_preprocess_steps, media_skip_validate_imports, media_max_width, media_max_height, @@ -62,6 +63,7 @@ impl Args { worker_id, }; let media = Media { + preprocess_steps: media_preprocess_steps, skip_validate_imports: media_skip_validate_imports, max_width: media_max_width, max_height: media_max_height, @@ -299,6 +301,8 @@ struct OldDb { #[derive(Debug, Default, serde::Serialize)] #[serde(rename_all = "snake_case")] struct Media { + #[serde(skip_serializing_if = "Option::is_none")] + preprocess_steps: Option, #[serde(skip_serializing_if = "Option::is_none")] max_width: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -383,9 +387,16 @@ struct Run { #[clap(long)] api_key: Option, + /// ID of this pict-rs node. Doesn't do much yet #[clap(long)] worker_id: Option, + /// Optional pre-processing steps for uploaded media. + /// + /// All still images will be put through these steps before saving + #[clap(long)] + media_preprocess_steps: Option, + /// Whether to validate media on the "import" endpoint #[clap(long)] media_skip_validate_imports: Option, diff --git a/src/config/file.rs b/src/config/file.rs index 1de0ef4..0c3c8fb 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -2,6 +2,7 @@ use crate::{ config::primitives::{ImageFormat, LogFormat, Store, Targets}, serde_str::Serde, }; +use once_cell::sync::OnceCell; use std::{collections::BTreeSet, net::SocketAddr, path::PathBuf}; use url::Url; @@ -86,6 +87,9 @@ pub(crate) struct OldDb { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Media { + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) preprocess_steps: Option, + pub(crate) max_width: usize, pub(crate) max_height: usize, @@ -106,6 +110,25 @@ pub(crate) struct Media { pub(crate) cache_duration: i64, } +impl Media { + pub(crate) fn preprocess_steps(&self) -> Option<&[(String, String)]> { + static PREPROCESS_STEPS: OnceCell> = OnceCell::new(); + + if let Some(steps) = &self.preprocess_steps { + let steps = PREPROCESS_STEPS + .get_or_try_init(|| { + serde_urlencoded::from_str(steps) as Result, _> + }) + .expect("Invalid preprocess_steps configuration") + .as_slice(); + + Some(steps) + } else { + None + } + } +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Sled { diff --git a/src/config/primitives.rs b/src/config/primitives.rs index 1382499..9a0cb6e 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -121,6 +121,14 @@ impl ImageFormat { Self::Webp => "WEBP", } } + + pub(crate) fn as_ext(self) -> &'static str { + match self { + Self::Jpeg => ".jpeg", + Self::Png => ".png", + Self::Webp => ".webp", + } + } } impl From for Store { diff --git a/src/ingest.rs b/src/ingest.rs index e3a3fc6..7905c90 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,4 +1,5 @@ use crate::{ + either::Either, error::{Error, UploadError}, magick::ValidInputType, repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, @@ -32,15 +33,15 @@ where { futures_util::pin_mut!(stream); + let mut total_len = 0; let mut buf = Vec::new(); tracing::debug!("Reading stream to memory"); while let Some(res) = stream.next().await { let bytes = res?; + total_len += bytes.len(); buf.push(bytes); } - let total_len = buf.iter().fold(0, |acc, item| acc + item.len()); - let bytes_mut = buf .iter() .fold(BytesMut::with_capacity(total_len), |mut acc, item| { @@ -77,7 +78,22 @@ where ) .await?; - let hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let processed_reader = if let Some(operations) = CONFIG.media.preprocess_steps() { + if let Some(format) = input_type.to_format() { + let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?; + + let processed_reader = + crate::magick::process_image_async_read(validated_reader, magick_args, format)?; + + Either::left(processed_reader) + } else { + Either::right(validated_reader) + } + } else { + Either::right(validated_reader) + }; + + let hasher_reader = Hasher::new(processed_reader, Sha256::new()); let hasher = hasher_reader.hasher(); let identifier = store.save_async_read(hasher_reader).await?; diff --git a/src/magick.rs b/src/magick.rs index fdeba2a..29a6a57 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -39,7 +39,7 @@ pub(crate) enum ValidInputType { } impl ValidInputType { - fn as_str(&self) -> &'static str { + fn as_str(self) -> &'static str { match self { Self::Mp4 => "MP4", Self::Gif => "GIF", @@ -49,7 +49,7 @@ impl ValidInputType { } } - pub(crate) fn as_ext(&self) -> &'static str { + pub(crate) fn as_ext(self) -> &'static str { match self { Self::Mp4 => ".mp4", Self::Gif => ".gif", @@ -59,7 +59,7 @@ impl ValidInputType { } } - fn is_mp4(&self) -> bool { + fn is_mp4(self) -> bool { matches!(self, Self::Mp4) } @@ -70,6 +70,15 @@ impl ValidInputType { ImageFormat::Webp => ValidInputType::Webp, } } + + pub(crate) fn to_format(self) -> Option { + match self { + Self::Jpeg => Some(ImageFormat::Jpeg), + Self::Png => Some(ImageFormat::Png), + Self::Webp => Some(ImageFormat::Webp), + _ => None, + } + } } #[derive(Debug)] @@ -256,6 +265,19 @@ pub(crate) async fn input_type_bytes(input: Bytes) -> Result, format: ImageFormat) -> std::io::Result { + let command = "magick"; + let convert_args = ["convert", "-"]; + let last_arg = format!("{}:-", format.as_magick_format()); + + Process::spawn( + Command::new(command) + .args(convert_args) + .args(args) + .arg(last_arg), + ) +} + #[instrument(name = "Spawning process command")] pub(crate) fn process_image_store_read( store: S, @@ -263,18 +285,16 @@ pub(crate) fn process_image_store_read( args: Vec, format: ImageFormat, ) -> std::io::Result { - let command = "magick"; - let convert_args = ["convert", "-"]; - let last_arg = format!("{}:-", format.as_magick_format()); + Ok(process_image(args, format)?.store_read(store, identifier)) +} - let process = Process::spawn( - Command::new(command) - .args(convert_args) - .args(args) - .arg(last_arg), - )?; - - Ok(process.store_read(store, identifier)) +#[instrument(name = "Spawning process command", skip(async_read))] +pub(crate) fn process_image_async_read( + async_read: A, + args: Vec, + format: ImageFormat, +) -> std::io::Result { + Ok(process_image(args, format)?.pipe_async_read(async_read)) } impl Details { diff --git a/src/process.rs b/src/process.rs index d5cafa5..80aaf76 100644 --- a/src/process.rs +++ b/src/process.rs @@ -152,6 +152,55 @@ impl Process { } } + pub(crate) fn pipe_async_read( + mut self, + mut async_read: A, + ) -> impl AsyncRead + Unpin { + let mut stdin = self.child.stdin.take().expect("stdin exists"); + let stdout = self.child.stdout.take().expect("stdout exists"); + + let (tx, rx) = tracing::trace_span!(parent: None, "Create channel") + .in_scope(channel::); + + let span = tracing::info_span!(parent: None, "Background process task from bytes"); + span.follows_from(Span::current()); + + let mut child = self.child; + let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { + actix_rt::spawn( + async move { + if let Err(e) = tokio::io::copy(&mut async_read, &mut stdin).await { + let _ = tx.send(e); + return; + } + drop(stdin); + + match child.wait().await { + Ok(status) => { + if !status.success() { + let _ = tx.send(std::io::Error::new( + std::io::ErrorKind::Other, + &StatusError, + )); + } + } + Err(e) => { + let _ = tx.send(e); + } + } + } + .instrument(span), + ) + }); + + ProcessRead { + inner: stdout, + err_recv: rx, + err_closed: false, + handle: DropHandle { inner: handle }, + } + } + #[tracing::instrument] pub(crate) fn store_read( mut self,