Implement image pre-processing on upload
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
asonix 2022-09-25 15:17:33 -05:00
parent f98fec7d2a
commit 50d118a2a7
10 changed files with 190 additions and 29 deletions

1
Cargo.lock generated
View file

@ -1580,6 +1580,7 @@ dependencies = [
"serde", "serde",
"serde_cbor", "serde_cbor",
"serde_json", "serde_json",
"serde_urlencoded",
"sha2", "sha2",
"sled", "sled",
"storage-path-generator", "storage-path-generator",

View file

@ -40,11 +40,12 @@ once_cell = "1.4.0"
opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry = { version = "0.18", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11" opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.7" pin-project-lite = "0.2.7"
quick-xml = { version = "0.24.1", features = ["serialize"] }
rusty-s3 = "0.3.2" rusty-s3 = "0.3.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2" serde_cbor = "0.11.2"
serde_json = "1.0" serde_json = "1.0"
quick-xml = { version = "0.24.1", features = ["serialize"] } serde_urlencoded = "0.7.1"
sha2 = "0.10.0" sha2 = "0.10.0"
sled = { version = "0.34.7" } sled = { version = "0.34.7" }
storage-path-generator = "0.1.0" storage-path-generator = "0.1.0"

View file

@ -9,7 +9,7 @@ _a simple image hosting service_
## Usage ## Usage
### Running ### Running
``` ```
pict-rs 0.4.0-alpha.1 pict-rs 0.4.0-alpha.7
asonix <asonix@asonix.dog> asonix <asonix@asonix.dog>
A simple image hosting service A simple image hosting service
@ -77,6 +77,9 @@ OPTIONS:
-h, --help -h, --help
Print help information Print help information
--media-cache-duration <MEDIA_CACHE_DURATION>
How long, in hours, to keep media ingested through the "cached" endpoint
--media-enable-silent-video <MEDIA_ENABLE_SILENT_VIDEO> --media-enable-silent-video <MEDIA_ENABLE_SILENT_VIDEO>
Whether to enable GIF and silent MP4 uploads. Full videos are unsupported Whether to enable GIF and silent MP4 uploads. Full videos are unsupported
@ -98,11 +101,14 @@ OPTIONS:
--media-max-width <MEDIA_MAX_WIDTH> --media-max-width <MEDIA_MAX_WIDTH>
The maximum width, in pixels, for uploaded media The maximum width, in pixels, for uploaded media
--media-preprocess-steps <MEDIA_PREPROCESS_STEPS>
Optional pre-processing steps for uploaded media
--media-skip-validate-imports <MEDIA_SKIP_VALIDATE_IMPORTS> --media-skip-validate-imports <MEDIA_SKIP_VALIDATE_IMPORTS>
Whether to validate media on the "import" endpoint Whether to validate media on the "import" endpoint
--worker-id <WORKER_ID> --worker-id <WORKER_ID>
ID of this pict-rs node. Doesn't do much yet
SUBCOMMANDS: SUBCOMMANDS:
filesystem Run pict-rs with filesystem storage filesystem Run pict-rs with filesystem storage
@ -327,7 +333,7 @@ set.
A secure API key can be generated by any password generator. A secure API key can be generated by any password generator.
- `POST /internal/import` for uploading an image while preserving the filename as the first alias. - `POST /internal/import` for uploading an image while preserving the filename as the first alias.
The upload format and response format are the same as the `POST /image` endpoint. The upload format and response format are the same as the `POST /image` endpoint.
- `POST /internal/purge?alias={alias} Purge a file by it's alias. This removes all aliases and - `POST /internal/purge?alias={alias}` Purge a file by it's alias. This removes all aliases and
files associated with the query. files associated with the query.
This endpoint returns the following JSON This endpoint returns the following JSON
@ -337,10 +343,13 @@ A secure API key can be generated by any password generator.
"aliases": ["asdf.png"] "aliases": ["asdf.png"]
} }
``` ```
- `GET /internal/aliases?alias={alias} Get the aliases for a file by it's alias - `GET /internal/aliases?alias={alias}` Get the aliases for a file by it's alias
- `?alias={alias}` get aliases by alias - `?alias={alias}` get aliases by alias
This endpiont returns the same JSON as the purge endpoint This endpiont returns the same JSON as the purge endpoint
- `DELETE /internal/variants` Queue a cleanup for generated variants of uploaded images.
If any of the cleaned variants are fetched again, they will be re-generated.
Additionally, all endpoints support setting deadlines, after which the request will cease Additionally, all endpoints support setting deadlines, after which the request will cease
processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an processing. To enable deadlines for your requests, you can set the `X-Request-Deadline` header to an

View file

@ -115,6 +115,13 @@ path = '/mnt'
## Media Processing Configuration ## Media Processing Configuration
[media] [media]
## Optional: preprocessing steps for uploaded images
# environment variable: PICTRS__MEDIA__PREPROCESS_STEPS
# default: empty
#
# This configuration is the same format as the process endpoint's query arguments
preprocess_steps = 'crop=16x9&resize=1200&blur=0.2'
## Optional: max media width (in pixels) ## Optional: max media width (in pixels)
# environment variable: PICTRS__MEDIA__MAX_WIDTH # environment variable: PICTRS__MEDIA__MAX_WIDTH
# default: 10,000 # default: 10,000
@ -189,15 +196,36 @@ cache_capacity = 67108864
# available options: filesystem, object_storage # available options: filesystem, object_storage
type = 'object_storage' type = 'object_storage'
## Required: endpoint at which the object storage exists
# environment variable: PICTRS__STORE__ENDPOINT
# default: empty
#
# examples:
# - `http://localhost:9000` # minio
# - `https://s3.dualstack.eu-west-1.amazonaws.com` # s3
endpoint = 'http://minio:9000'
## Optional: How to format object storage requests
# environment variable: PICTRS__STORE__USE_PATH_STYLE
# default: false
#
# When this is true, objects will be fetched from http{s}://{endpoint}:{port}/{bucket_name}/{object}
# When false, objects will be fetched from http{s}://{bucket_name}.{endpoint}:{port}/{object}
#
# Set to true when using minio
use_path_style = false
## Required: object storage bucket name ## Required: object storage bucket name
# environment variable: PICTRS__STORE__BUCKET_NAME # environment variable: PICTRS__STORE__BUCKET_NAME
# default: empty # default: empty
bucket_name = 'BUCKET_NAME' bucket_name = 'pict-rs'
## Required: object storage region ## Required: object storage region
# environment variable: PICTRS__STORE__REGION # environment variable: PICTRS__STORE__REGION
# default: empty # default: empty
region = 'REGION' #
# When using minio, this can be set to `minio`
region = 'minio'
## Required: object storage access key ## Required: object storage access key
# environment variable: PICTRS__STORE__ACCESS_KEY # environment variable: PICTRS__STORE__ACCESS_KEY
@ -209,11 +237,6 @@ access_key = 'ACCESS_KEY'
# default: empty # default: empty
secret_key = 'SECRET_KEY' secret_key = 'SECRET_KEY'
## Optional: object storage security token
# environment variable: PICTRS__STORE__SECURITY_TOKEN
# default: empty
security_token = 'SECURITY_TOKEN'
## Optional: object storage session token ## Optional: object storage session token
# environment variable: PICTRS__STORE__SESSION_TOKEN # environment variable: PICTRS__STORE__SESSION_TOKEN
# default: empty # default: empty

View file

@ -45,6 +45,7 @@ impl Args {
address, address,
api_key, api_key,
worker_id, worker_id,
media_preprocess_steps,
media_skip_validate_imports, media_skip_validate_imports,
media_max_width, media_max_width,
media_max_height, media_max_height,
@ -62,6 +63,7 @@ impl Args {
worker_id, worker_id,
}; };
let media = Media { let media = Media {
preprocess_steps: media_preprocess_steps,
skip_validate_imports: media_skip_validate_imports, skip_validate_imports: media_skip_validate_imports,
max_width: media_max_width, max_width: media_max_width,
max_height: media_max_height, max_height: media_max_height,
@ -299,6 +301,8 @@ struct OldDb {
#[derive(Debug, Default, serde::Serialize)] #[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
struct Media { struct Media {
#[serde(skip_serializing_if = "Option::is_none")]
preprocess_steps: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
max_width: Option<usize>, max_width: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
@ -383,9 +387,16 @@ struct Run {
#[clap(long)] #[clap(long)]
api_key: Option<String>, api_key: Option<String>,
/// ID of this pict-rs node. Doesn't do much yet
#[clap(long)] #[clap(long)]
worker_id: Option<String>, worker_id: Option<String>,
/// Optional pre-processing steps for uploaded media.
///
/// All still images will be put through these steps before saving
#[clap(long)]
media_preprocess_steps: Option<String>,
/// Whether to validate media on the "import" endpoint /// Whether to validate media on the "import" endpoint
#[clap(long)] #[clap(long)]
media_skip_validate_imports: Option<bool>, media_skip_validate_imports: Option<bool>,

View file

@ -2,6 +2,7 @@ use crate::{
config::primitives::{ImageFormat, LogFormat, Store, Targets}, config::primitives::{ImageFormat, LogFormat, Store, Targets},
serde_str::Serde, serde_str::Serde,
}; };
use once_cell::sync::OnceCell;
use std::{collections::BTreeSet, net::SocketAddr, path::PathBuf}; use std::{collections::BTreeSet, net::SocketAddr, path::PathBuf};
use url::Url; use url::Url;
@ -86,6 +87,9 @@ pub(crate) struct OldDb {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub(crate) struct Media { pub(crate) struct Media {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) preprocess_steps: Option<String>,
pub(crate) max_width: usize, pub(crate) max_width: usize,
pub(crate) max_height: usize, pub(crate) max_height: usize,
@ -106,6 +110,25 @@ pub(crate) struct Media {
pub(crate) cache_duration: i64, pub(crate) cache_duration: i64,
} }
impl Media {
pub(crate) fn preprocess_steps(&self) -> Option<&[(String, String)]> {
static PREPROCESS_STEPS: OnceCell<Vec<(String, String)>> = OnceCell::new();
if let Some(steps) = &self.preprocess_steps {
let steps = PREPROCESS_STEPS
.get_or_try_init(|| {
serde_urlencoded::from_str(steps) as Result<Vec<(String, String)>, _>
})
.expect("Invalid preprocess_steps configuration")
.as_slice();
Some(steps)
} else {
None
}
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub(crate) struct Sled { pub(crate) struct Sled {

View file

@ -121,6 +121,14 @@ impl ImageFormat {
Self::Webp => "WEBP", Self::Webp => "WEBP",
} }
} }
pub(crate) fn as_ext(self) -> &'static str {
match self {
Self::Jpeg => ".jpeg",
Self::Png => ".png",
Self::Webp => ".webp",
}
}
} }
impl From<Filesystem> for Store { impl From<Filesystem> for Store {

View file

@ -1,4 +1,5 @@
use crate::{ use crate::{
either::Either,
error::{Error, UploadError}, error::{Error, UploadError},
magick::ValidInputType, magick::ValidInputType,
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo}, repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
@ -32,15 +33,15 @@ where
{ {
futures_util::pin_mut!(stream); futures_util::pin_mut!(stream);
let mut total_len = 0;
let mut buf = Vec::new(); let mut buf = Vec::new();
tracing::debug!("Reading stream to memory"); tracing::debug!("Reading stream to memory");
while let Some(res) = stream.next().await { while let Some(res) = stream.next().await {
let bytes = res?; let bytes = res?;
total_len += bytes.len();
buf.push(bytes); buf.push(bytes);
} }
let total_len = buf.iter().fold(0, |acc, item| acc + item.len());
let bytes_mut = buf let bytes_mut = buf
.iter() .iter()
.fold(BytesMut::with_capacity(total_len), |mut acc, item| { .fold(BytesMut::with_capacity(total_len), |mut acc, item| {
@ -77,7 +78,22 @@ where
) )
.await?; .await?;
let hasher_reader = Hasher::new(validated_reader, Sha256::new()); let processed_reader = if let Some(operations) = CONFIG.media.preprocess_steps() {
if let Some(format) = input_type.to_format() {
let (_, magick_args) = crate::processor::build_chain(operations, format.as_ext())?;
let processed_reader =
crate::magick::process_image_async_read(validated_reader, magick_args, format)?;
Either::left(processed_reader)
} else {
Either::right(validated_reader)
}
} else {
Either::right(validated_reader)
};
let hasher_reader = Hasher::new(processed_reader, Sha256::new());
let hasher = hasher_reader.hasher(); let hasher = hasher_reader.hasher();
let identifier = store.save_async_read(hasher_reader).await?; let identifier = store.save_async_read(hasher_reader).await?;

View file

@ -39,7 +39,7 @@ pub(crate) enum ValidInputType {
} }
impl ValidInputType { impl ValidInputType {
fn as_str(&self) -> &'static str { fn as_str(self) -> &'static str {
match self { match self {
Self::Mp4 => "MP4", Self::Mp4 => "MP4",
Self::Gif => "GIF", Self::Gif => "GIF",
@ -49,7 +49,7 @@ impl ValidInputType {
} }
} }
pub(crate) fn as_ext(&self) -> &'static str { pub(crate) fn as_ext(self) -> &'static str {
match self { match self {
Self::Mp4 => ".mp4", Self::Mp4 => ".mp4",
Self::Gif => ".gif", Self::Gif => ".gif",
@ -59,7 +59,7 @@ impl ValidInputType {
} }
} }
fn is_mp4(&self) -> bool { fn is_mp4(self) -> bool {
matches!(self, Self::Mp4) matches!(self, Self::Mp4)
} }
@ -70,6 +70,15 @@ impl ValidInputType {
ImageFormat::Webp => ValidInputType::Webp, ImageFormat::Webp => ValidInputType::Webp,
} }
} }
pub(crate) fn to_format(self) -> Option<ImageFormat> {
match self {
Self::Jpeg => Some(ImageFormat::Jpeg),
Self::Png => Some(ImageFormat::Png),
Self::Webp => Some(ImageFormat::Webp),
_ => None,
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -256,6 +265,19 @@ pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Err
details_bytes(input, None).await?.validate_input() details_bytes(input, None).await?.validate_input()
} }
fn process_image(args: Vec<String>, format: ImageFormat) -> std::io::Result<Process> {
let command = "magick";
let convert_args = ["convert", "-"];
let last_arg = format!("{}:-", format.as_magick_format());
Process::spawn(
Command::new(command)
.args(convert_args)
.args(args)
.arg(last_arg),
)
}
#[instrument(name = "Spawning process command")] #[instrument(name = "Spawning process command")]
pub(crate) fn process_image_store_read<S: Store + 'static>( pub(crate) fn process_image_store_read<S: Store + 'static>(
store: S, store: S,
@ -263,18 +285,16 @@ pub(crate) fn process_image_store_read<S: Store + 'static>(
args: Vec<String>, args: Vec<String>,
format: ImageFormat, format: ImageFormat,
) -> std::io::Result<impl AsyncRead + Unpin> { ) -> std::io::Result<impl AsyncRead + Unpin> {
let command = "magick"; Ok(process_image(args, format)?.store_read(store, identifier))
let convert_args = ["convert", "-"]; }
let last_arg = format!("{}:-", format.as_magick_format());
let process = Process::spawn( #[instrument(name = "Spawning process command", skip(async_read))]
Command::new(command) pub(crate) fn process_image_async_read<A: AsyncRead + Unpin + 'static>(
.args(convert_args) async_read: A,
.args(args) args: Vec<String>,
.arg(last_arg), format: ImageFormat,
)?; ) -> std::io::Result<impl AsyncRead + Unpin> {
Ok(process_image(args, format)?.pipe_async_read(async_read))
Ok(process.store_read(store, identifier))
} }
impl Details { impl Details {

View file

@ -152,6 +152,55 @@ impl Process {
} }
} }
pub(crate) fn pipe_async_read<A: AsyncRead + Unpin + 'static>(
mut self,
mut async_read: A,
) -> impl AsyncRead + Unpin {
let mut stdin = self.child.stdin.take().expect("stdin exists");
let stdout = self.child.stdout.take().expect("stdout exists");
let (tx, rx) = tracing::trace_span!(parent: None, "Create channel")
.in_scope(channel::<std::io::Error>);
let span = tracing::info_span!(parent: None, "Background process task from bytes");
span.follows_from(Span::current());
let mut child = self.child;
let handle = tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Err(e) = tokio::io::copy(&mut async_read, &mut stdin).await {
let _ = tx.send(e);
return;
}
drop(stdin);
match child.wait().await {
Ok(status) => {
if !status.success() {
let _ = tx.send(std::io::Error::new(
std::io::ErrorKind::Other,
&StatusError,
));
}
}
Err(e) => {
let _ = tx.send(e);
}
}
}
.instrument(span),
)
});
ProcessRead {
inner: stdout,
err_recv: rx,
err_closed: false,
handle: DropHandle { inner: handle },
}
}
#[tracing::instrument] #[tracing::instrument]
pub(crate) fn store_read<S: Store + 'static>( pub(crate) fn store_read<S: Store + 'static>(
mut self, mut self,