From e15a82c0c7f30455cb8cd9920fd9c64042c10dfa Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 5 Sep 2023 20:45:07 -0500 Subject: [PATCH] Add external validation check --- dev.toml | 1 + pict-rs.toml | 64 ++++++++++++++++++++++++++++----------- src/config/commandline.rs | 8 +++++ src/config/file.rs | 2 ++ src/error.rs | 7 ++++- src/error_code.rs | 3 ++ src/ingest.rs | 23 ++++++++++++-- src/lib.rs | 59 +++++++++++++++++++++++++++--------- src/queue.rs | 22 ++++++++++++-- src/queue/process.rs | 11 +++++-- src/stream.rs | 62 +++++++++++++++++++++++++++++++++++++ 11 files changed, 222 insertions(+), 40 deletions(-) diff --git a/dev.toml b/dev.toml index 691b1fb..6ddbe30 100644 --- a/dev.toml +++ b/dev.toml @@ -23,6 +23,7 @@ path = 'data/sled-repo-local' cache_capacity = 67108864 [media] +# external_validation = 'http://localhost:8076' max_file_size = 40 filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] diff --git a/pict-rs.toml b/pict-rs.toml index ca2f8ab..90ec4ce 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -143,6 +143,15 @@ cache_capacity = 67108864 ## Media Processing Configuration [media] +## Optional: URL for external validation of media +# environment variable: PICTRS__MEDIA__EXTERNAL_VALIDATION +# default: empty +# +# The expected API for external validators is to accept a POST with the media as the request body, +# and a valid `Content-Type` header. The validator should return a 2XX response when the media +# passes validation. Any other status code is considered a validation failure. +external_validation = 'http://localhost:8076' + ## Optional: max file size (in Megabytes) # environment variable: PICTRS__MEDIA__MAX_FILE_SIZE # default: 40 @@ -499,13 +508,15 @@ crf_1440 = 24 crf_2160 = 15 -## Database configuration +### Database configuration + +## Sled repo configuration example [repo] ## Optional: database backend to use # environment variable: PICTRS__REPO__TYPE # default: sled # -# available options: sled +# available options: sled, postgres type = 'sled' ## Optional: path to sled repository @@ -527,7 +538,39 @@ cache_capacity = 67108864 export_path = "/mnt/exports" -## Media storage configuration +## Postgres repo configuration example +[repo] +## Optional: database backend to use +# environment variable: PICTRS__REPO__TYPE +# default: sled +# +# available options: sled, postgres +type = 'postgres' + +## Required: URL to postgres database +# environment variable: PICTRS__REPO__URL +# default: empty +url = 'postgres://user:password@host:5432/db' + + +### Media storage configuration + +## Filesystem media storage example +[store] +## Optional: type of media storage to use +# environment variable: PICTRS__STORE__TYPE +# default: filesystem +# +# available options: filesystem, object_storage +type = 'filesystem' + +## Optional: path to uploaded media +# environment variable: PICTRS__STORE__PATH +# default: /mnt/files +path = '/mnt/files' + + +## Object media storage example [store] ## Optional: type of media storage to use # environment variable: PICTRS__STORE__TYPE @@ -597,18 +640,3 @@ signature_expiration = 15 # This value is the total wait time, and not additional wait time on top of the # signature_expiration. client_timeout = 30 - -## Filesystem media storage example -# ## Media storage configuration -# [store] -# ## Optional: type of media storage to use -# # environment variable: PICTRS__STORE__TYPE -# # default: filesystem -# # -# # available options: filesystem, object_storage -# type = 'filesystem' -# -# ## Optional: path to uploaded media -# # environment variable: PICTRS__STORE__PATH -# # default: /mnt/files -# path = '/mnt/files' diff --git a/src/config/commandline.rs b/src/config/commandline.rs index c7ffad5..dd1754e 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -56,6 +56,7 @@ impl Args { client_timeout, metrics_prometheus_address, media_preprocess_steps, + media_external_validation, media_max_file_size, media_process_timeout, media_retention_variants, @@ -183,6 +184,7 @@ impl Args { max_file_size: media_max_file_size, process_timeout: media_process_timeout, preprocess_steps: media_preprocess_steps, + external_validation: media_external_validation, filters: media_filters, retention: retention.set(), image: image.set(), @@ -549,6 +551,8 @@ struct Media { #[serde(skip_serializing_if = "Option::is_none")] preprocess_steps: Option, #[serde(skip_serializing_if = "Option::is_none")] + external_validation: Option, + #[serde(skip_serializing_if = "Option::is_none")] filters: Option>, #[serde(skip_serializing_if = "Option::is_none")] retention: Option, @@ -884,6 +888,10 @@ struct Run { #[arg(long)] media_preprocess_steps: Option, + /// Optional endpoint to submit uploaded media to for validation + #[arg(long)] + media_external_validation: Option, + /// Which media filters should be enabled on the `process` endpoint #[arg(long)] media_filters: Option>, diff --git a/src/config/file.rs b/src/config/file.rs index 213c715..26da6f3 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -165,6 +165,8 @@ pub(crate) struct OldDb { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Media { + pub(crate) external_validation: Option, + pub(crate) max_file_size: usize, pub(crate) process_timeout: u64, diff --git a/src/error.rs b/src/error.rs index f4ff1ed..9d31dde 100644 --- a/src/error.rs +++ b/src/error.rs @@ -151,6 +151,9 @@ pub(crate) enum UploadError { #[error("Response timeout")] Timeout(#[from] crate::stream::TimeoutError), + + #[error("Failed external validation")] + FailedExternalValidation, } impl UploadError { @@ -184,6 +187,7 @@ impl UploadError { Self::Range => ErrorCode::RANGE_NOT_SATISFIABLE, Self::Limit(_) => ErrorCode::VALIDATE_FILE_SIZE, Self::Timeout(_) => ErrorCode::STREAM_TOO_SLOW, + Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, } } @@ -232,7 +236,8 @@ impl ResponseError for Error { | UploadError::Validation(_) | UploadError::UnsupportedProcessExtension | UploadError::InvalidProcessExtension - | UploadError::ReadOnly, + | UploadError::ReadOnly + | UploadError::FailedExternalValidation, ) => StatusCode::BAD_REQUEST, Some(UploadError::Magick(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, Some(UploadError::Ffmpeg(e)) if e.is_client_error() => StatusCode::BAD_REQUEST, diff --git a/src/error_code.rs b/src/error_code.rs index 0a940ad..3f9a192 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -141,4 +141,7 @@ impl ErrorCode { pub(crate) const UNKNOWN_ERROR: ErrorCode = ErrorCode { code: "unknown-error", }; + pub(crate) const FAILED_EXTERNAL_VALIDATION: ErrorCode = ErrorCode { + code: "failed-external-validation", + }; } diff --git a/src/ingest.rs b/src/ingest.rs index 75f27b3..2e394c7 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -7,10 +7,12 @@ use crate::{ formats::{InternalFormat, Validations}, repo::{Alias, ArcRepo, DeleteToken, Hash}, store::Store, - stream::IntoStreamer, + stream::{IntoStreamer, MakeSend}, }; use actix_web::web::Bytes; use futures_core::Stream; +use reqwest::Body; +use reqwest_middleware::ClientWithMiddleware; use tracing::{Instrument, Span}; mod hasher; @@ -41,10 +43,11 @@ where Ok(buf.into_bytes()) } -#[tracing::instrument(skip(repo, store, stream, media))] +#[tracing::instrument(skip(repo, store, client, stream, media))] pub(crate) async fn ingest( repo: &ArcRepo, store: &S, + client: &ClientWithMiddleware, stream: impl Stream> + Unpin + 'static, declared_alias: Option, media: &crate::config::Media, @@ -113,6 +116,22 @@ where identifier: Some(identifier.clone()), }; + if let Some(endpoint) = &media.external_validation { + let stream = store.to_stream(&identifier, None, None).await?.make_send(); + + let response = client + .post(endpoint.as_str()) + .header("Content-Type", input_type.media_type().as_ref()) + .body(Body::wrap_stream(stream)) + .send() + .instrument(tracing::info_span!("external-validation")) + .await?; + + if !response.status().is_success() { + return Err(UploadError::FailedExternalValidation.into()); + } + } + let (hash, size) = state.borrow_mut().finalize_reset(); let hash = Hash::new(hash, size, input_type); diff --git a/src/lib.rs b/src/lib.rs index b0e5680..9684677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,6 +140,10 @@ impl FormData for Upload { .app_data::>() .expect("No store in request") .clone(); + let client = req + .app_data::>() + .expect("No client in request") + .clone(); let config = req .app_data::>() .expect("No configuration in request") @@ -154,6 +158,7 @@ impl FormData for Upload { Field::array(Field::file(move |filename, _, stream| { let repo = repo.clone(); let store = store.clone(); + let client = client.clone(); let config = config.clone(); metrics::increment_counter!("pict-rs.files", "upload" => "inline"); @@ -168,7 +173,8 @@ impl FormData for Upload { return Err(UploadError::ReadOnly.into()); } - ingest::ingest(&repo, &**store, stream, None, &config.media).await + ingest::ingest(&repo, &**store, &client, stream, None, &config.media) + .await } .instrument(span), ) @@ -196,6 +202,10 @@ impl FormData for Import { .app_data::>() .expect("No store in request") .clone(); + let client = req + .app_data::() + .expect("No client in request") + .clone(); let config = req .app_data::>() .expect("No configuration in request") @@ -213,6 +223,7 @@ impl FormData for Import { Field::array(Field::file(move |filename, _, stream| { let repo = repo.clone(); let store = store.clone(); + let client = client.clone(); let config = config.clone(); metrics::increment_counter!("pict-rs.files", "import" => "inline"); @@ -230,6 +241,7 @@ impl FormData for Import { ingest::ingest( &repo, &**store, + &client, stream, Some(Alias::from_existing(&filename)), &config.media, @@ -479,9 +491,10 @@ async fn ingest_inline( stream: impl Stream> + Unpin + 'static, repo: &ArcRepo, store: &S, + client: &ClientWithMiddleware, config: &Configuration, ) -> Result<(Alias, DeleteToken, Details), Error> { - let session = ingest::ingest(repo, store, stream, None, &config.media).await?; + let session = ingest::ingest(repo, store, client, stream, None, &config.media).await?; let alias = session.alias().expect("alias should exist").to_owned(); @@ -501,17 +514,17 @@ async fn download( config: web::Data, query: web::Query, ) -> Result { - let stream = download_stream(client, &query.url, &config).await?; + let stream = download_stream(&client, &query.url, &config).await?; if query.backgrounded { do_download_backgrounded(stream, repo, store).await } else { - do_download_inline(stream, repo, store, config).await + do_download_inline(stream, repo, store, &client, config).await } } async fn download_stream( - client: web::Data, + client: &ClientWithMiddleware, url: &str, config: &Configuration, ) -> Result> + Unpin + 'static, Error> { @@ -533,16 +546,21 @@ async fn download_stream( Ok(stream) } -#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))] +#[tracing::instrument( + name = "Downloading file inline", + skip(stream, repo, store, client, config) +)] async fn do_download_inline( stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, + client: &ClientWithMiddleware, config: web::Data, ) -> Result { metrics::increment_counter!("pict-rs.files", "download" => "inline"); - let (alias, delete_token, details) = ingest_inline(stream, &repo, &store, &config).await?; + let (alias, delete_token, details) = + ingest_inline(stream, &repo, &store, &client, &config).await?; Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -817,9 +835,9 @@ async fn process( let alias = if let Some(alias) = repo.related(proxy.clone()).await? { alias } else if !config.server.read_only { - let stream = download_stream(client, proxy.as_str(), &config).await?; + let stream = download_stream(&client, proxy.as_str(), &config).await?; - let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?; + let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; repo.relate_url(proxy, alias.clone()).await?; @@ -1115,9 +1133,9 @@ async fn serve_query( let alias = if let Some(alias) = repo.related(proxy.clone()).await? { alias } else if !config.server.read_only { - let stream = download_stream(client, proxy.as_str(), &config).await?; + let stream = download_stream(&client, proxy.as_str(), &config).await?; - let (alias, _, _) = ingest_inline(stream, &repo, &store, &config).await?; + let (alias, _, _) = ingest_inline(stream, &repo, &store, &client, &config).await?; repo.relate_url(proxy, alias.clone()).await?; @@ -1703,8 +1721,13 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { }); } -fn spawn_workers(repo: ArcRepo, store: S, config: Configuration, process_map: ProcessMap) -where +fn spawn_workers( + repo: ArcRepo, + store: S, + client: ClientWithMiddleware, + config: Configuration, + process_map: ProcessMap, +) where S: Store + 'static, { crate::sync::spawn(queue::process_cleanup( @@ -1712,7 +1735,13 @@ where store.clone(), config.clone(), )); - crate::sync::spawn(queue::process_images(repo, store, process_map, config)); + crate::sync::spawn(queue::process_images( + repo, + store, + client, + process_map, + config, + )); } async fn launch_file_store( @@ -1738,6 +1767,7 @@ async fn launch_file_store( pub(crate) async fn process_images( repo: Arc, store: S, + client: ClientWithMiddleware, process_map: ProcessMap, config: Configuration, ) { process_image_jobs( &repo, &store, + &client, &process_map, &config, PROCESS_QUEUE, @@ -301,6 +304,7 @@ where async fn process_image_jobs( repo: &Arc, store: &S, + client: &ClientWithMiddleware, process_map: &ProcessMap, config: &Configuration, queue: &'static str, @@ -310,6 +314,7 @@ async fn process_image_jobs( for<'a> F: Fn( &'a Arc, &'a S, + &'a ClientWithMiddleware, &'a ProcessMap, &'a Configuration, serde_json::Value, @@ -319,8 +324,17 @@ async fn process_image_jobs( let worker_id = uuid::Uuid::new_v4(); loop { - let res = - image_job_loop(repo, store, process_map, config, worker_id, queue, callback).await; + let res = image_job_loop( + repo, + store, + client, + process_map, + config, + worker_id, + queue, + callback, + ) + .await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -340,6 +354,7 @@ async fn process_image_jobs( async fn image_job_loop( repo: &Arc, store: &S, + client: &ClientWithMiddleware, process_map: &ProcessMap, config: &Configuration, worker_id: uuid::Uuid, @@ -351,6 +366,7 @@ where for<'a> F: Fn( &'a Arc, &'a S, + &'a ClientWithMiddleware, &'a ProcessMap, &'a Configuration, serde_json::Value, @@ -372,7 +388,7 @@ where queue, worker_id, job_id, - (callback)(repo, store, process_map, config, job), + (callback)(repo, store, client, process_map, config, job), ) }) .instrument(span) diff --git a/src/queue/process.rs b/src/queue/process.rs index 14b0fce..b41e877 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -1,3 +1,5 @@ +use reqwest_middleware::ClientWithMiddleware; + use crate::{ concurrent_processor::ProcessMap, config::Configuration, @@ -16,6 +18,7 @@ use std::{path::PathBuf, sync::Arc}; pub(super) fn perform<'a, S>( repo: &'a ArcRepo, store: &'a S, + client: &'a ClientWithMiddleware, process_map: &'a ProcessMap, config: &'a Configuration, job: serde_json::Value, @@ -34,6 +37,7 @@ where process_ingest( repo, store, + client, Arc::from(identifier), Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), @@ -69,10 +73,11 @@ where }) } -#[tracing::instrument(skip(repo, store, media))] +#[tracing::instrument(skip(repo, store, client, media))] async fn process_ingest( repo: &ArcRepo, store: &S, + client: &ClientWithMiddleware, unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, @@ -85,6 +90,7 @@ where let ident = unprocessed_identifier.clone(); let store2 = store.clone(); let repo = repo.clone(); + let client = client.clone(); let media = media.clone(); let error_boundary = crate::sync::spawn(async move { @@ -94,7 +100,8 @@ where .map(|res| res.map_err(Error::from)); let session = - crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?; + crate::ingest::ingest(&repo, &store2, &client, stream, declared_alias, &media) + .await?; Ok(session) as Result }) diff --git a/src/stream.rs b/src/stream.rs index 1309e15..7a67a9e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -14,6 +14,68 @@ use std::{ time::Duration, }; +pub(crate) trait MakeSend: Stream> +where + T: 'static, +{ + fn make_send(self) -> MakeSendStream + where + Self: Sized + 'static, + { + let (tx, rx) = crate::sync::channel(4); + + MakeSendStream { + handle: crate::sync::spawn(async move { + let this = std::pin::pin!(self); + + let mut stream = this.into_streamer(); + + while let Some(res) = stream.next().await { + if tx.send_async(res).await.is_err() { + return; + } + } + }), + rx: rx.into_stream(), + } + } +} + +impl MakeSend for S +where + S: Stream>, + T: 'static, +{ +} + +pub(crate) struct MakeSendStream +where + T: 'static, +{ + handle: actix_rt::task::JoinHandle<()>, + rx: flume::r#async::RecvStream<'static, std::io::Result>, +} + +impl Stream for MakeSendStream +where + T: 'static, +{ + type Item = std::io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Ready(opt) => Poll::Ready(opt), + Poll::Pending if std::task::ready!(Pin::new(&mut self.handle).poll(cx)).is_err() => { + Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "Stream panicked", + )))) + } + Poll::Pending => Poll::Pending, + } + } +} + pin_project_lite::pin_project! { pub(crate) struct Map { #[pin]