diff --git a/Cargo.lock b/Cargo.lock index 5704963..64896d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,8 +22,7 @@ dependencies = [ [[package]] name = "actix-form-data" version = "0.7.0-beta.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0588d156cb871d8c237d55ce398e2a65727370fb98352ba5b65c15a2f834b0f" +source = "git+https://git.asonix.dog/asonix/actix-form-data#1c88a70021df9b8b394c2d9aee1265c5ef55d59b" dependencies = [ "actix-multipart", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index ca38916..70ccc5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ poll-timer-warnings = [] random-errors = ["dep:nanorand"] [dependencies] -actix-form-data = "0.7.0-beta.6" +actix-form-data = { version = "0.7.0-beta.6", git = "https://git.asonix.dog/asonix/actix-form-data" } actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] } async-trait = "0.1.51" barrel = { version = "0.7.0", features = ["pg"] } diff --git a/src/error.rs b/src/error.rs index 5529499..b6cb1cc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -82,7 +82,7 @@ pub(crate) enum UploadError { Io(#[from] std::io::Error), #[error("Error validating upload")] - Validation(#[from] crate::validate::ValidationError), + Validation(#[from] crate::ingest::ValidationError), #[error("Error in store")] Store(#[source] crate::store::StoreError), @@ -111,6 +111,9 @@ pub(crate) enum UploadError { #[error("Invalid job popped from job queue: {1}")] InvalidJob(#[source] serde_json::Error, String), + #[error("Error parsing upload query")] + InvalidUploadQuery(#[source] actix_web::error::QueryPayloadError), + #[error("pict-rs is in read-only mode")] ReadOnly, @@ -209,6 +212,7 @@ impl UploadError { Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT, Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB, + Self::InvalidUploadQuery(_) => ErrorCode::INVALID_UPLOAD_QUERY, #[cfg(feature = "random-errors")] Self::RandomError => ErrorCode::RANDOM_ERROR, } @@ -248,7 +252,7 @@ impl ResponseError for Error { fn status_code(&self) -> StatusCode { match self.kind() { Some(UploadError::Upload(actix_form_data::Error::FileSize)) - | Some(UploadError::Validation(crate::validate::ValidationError::Filesize)) => { + | Some(UploadError::Validation(crate::ingest::ValidationError::Filesize)) => { StatusCode::PAYLOAD_TOO_LARGE } Some( @@ -261,6 +265,7 @@ impl ResponseError for Error { )) | UploadError::Repo(crate::repo::RepoError::AlreadyClaimed) | UploadError::Validation(_) + | UploadError::InvalidUploadQuery(_) | UploadError::UnsupportedProcessExtension | UploadError::ReadOnly | UploadError::FailedExternalValidation diff --git a/src/error_code.rs b/src/error_code.rs index 9e9b936..46a7652 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -147,6 +147,9 @@ impl ErrorCode { pub(crate) const INVALID_JOB: ErrorCode = ErrorCode { code: "invalid-job", }; + pub(crate) const INVALID_UPLOAD_QUERY: ErrorCode = ErrorCode { + code: "invalid-upload-query", + }; #[cfg(feature = "random-errors")] pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode { code: "random-error", diff --git a/src/ingest.rs b/src/ingest.rs index 3d5004b..983b1d8 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,3 +1,6 @@ +mod hasher; +mod validate; + use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration}; use crate::{ @@ -9,16 +12,17 @@ use crate::{ repo::{Alias, ArcRepo, DeleteToken, Hash}, state::State, store::Store, + UploadQuery, }; use actix_web::web::Bytes; use futures_core::Stream; use reqwest::Body; - use tracing::{Instrument, Span}; -mod hasher; use hasher::Hasher; +pub(crate) use validate::ValidationError; + #[derive(Debug)] pub(crate) struct Session { repo: ArcRepo, @@ -31,6 +35,7 @@ pub(crate) struct Session { async fn process_ingest( state: &State, stream: impl Stream>, + upload_query: &UploadQuery, ) -> Result< ( InternalFormat, @@ -54,9 +59,10 @@ where let permit = crate::process_semaphore().acquire().await?; tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes) - .with_poll_timer("validate-bytes-stream") - .await?; + let (input_type, process_read) = + validate::validate_bytes_stream(state, bytes, &upload_query.limits) + .with_poll_timer("validate-bytes-stream") + .await?; let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { @@ -159,6 +165,7 @@ pub(crate) async fn ingest( state: &State, stream: impl Stream>, declared_alias: Option, + upload_query: &UploadQuery, ) -> Result where S: Store, @@ -166,7 +173,7 @@ where let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode { dummy_ingest(state, stream).await? } else { - process_ingest(state, stream) + process_ingest(state, stream, upload_query) .with_poll_timer("ingest-future") .await? }; diff --git a/src/validate.rs b/src/ingest/validate.rs similarity index 99% rename from src/validate.rs rename to src/ingest/validate.rs index 5460553..4d23bb2 100644 --- a/src/validate.rs +++ b/src/ingest/validate.rs @@ -14,6 +14,7 @@ use crate::{ future::WithPollTimer, process::{Process, ProcessRead}, state::State, + UploadLimits, }; #[derive(Debug, thiserror::Error)] @@ -60,6 +61,7 @@ const MEGABYTES: usize = 1024 * 1024; pub(crate) async fn validate_bytes_stream( state: &State, bytes: BytesStream, + upload_limits: &UploadLimits, ) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); diff --git a/src/validate/exiftool.rs b/src/ingest/validate/exiftool.rs similarity index 100% rename from src/validate/exiftool.rs rename to src/ingest/validate/exiftool.rs diff --git a/src/validate/ffmpeg.rs b/src/ingest/validate/ffmpeg.rs similarity index 100% rename from src/validate/ffmpeg.rs rename to src/ingest/validate/ffmpeg.rs diff --git a/src/validate/magick.rs b/src/ingest/validate/magick.rs similarity index 100% rename from src/validate/magick.rs rename to src/ingest/validate/magick.rs diff --git a/src/lib.rs b/src/lib.rs index 5395e56..eed4d54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ mod stream; mod sync; mod tls; mod tmp_file; -mod validate; use actix_form_data::{Field, Form, FormData, Multipart, Value}; use actix_web::{ @@ -59,6 +58,7 @@ use std::{ marker::PhantomData, path::Path, path::PathBuf, + rc::Rc, sync::{Arc, OnceLock}, time::{Duration, SystemTime}, }; @@ -147,22 +147,64 @@ async fn ensure_details_identifier( } } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(default)] +struct UploadLimits { + max_width: Option, + max_height: Option, + max_area: Option, + max_frame_count: Option, + max_file_size: Option, + allow_image: bool, + allow_animation: bool, + allow_video: bool, +} + +impl Default for UploadLimits { + fn default() -> Self { + Self { + max_width: None, + max_height: None, + max_area: None, + max_frame_count: None, + max_file_size: None, + allow_image: true, + allow_animation: true, + allow_video: true, + } + } +} + +#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)] +struct UploadQuery { + #[serde(flatten)] + limits: UploadLimits, + + #[serde(with = "tuple_vec_map", flatten)] + operations: Vec<(String, String)>, +} + struct Upload(Value, PhantomData); impl FormData for Upload { type Item = Session; type Error = Error; - fn form(req: &HttpRequest) -> Form { + fn form(req: &HttpRequest) -> Result, Self::Error> { let state = req .app_data::>>() .expect("No state in request") .clone(); + let web::Query(upload_query) = web::Query::::from_query(req.query_string()) + .map_err(UploadError::InvalidUploadQuery)?; + + let upload_query = Rc::new(upload_query); + // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it - Form::new() + Ok(Form::new() .max_files(state.config.server.max_file_count) .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) @@ -170,6 +212,7 @@ impl FormData for Upload { "images", Field::array(Field::file(move |filename, _, stream| { let state = state.clone(); + let upload_query = upload_query.clone(); metrics::counter!(crate::init_metrics::FILES, "upload" => "inline") .increment(1); @@ -184,13 +227,13 @@ impl FormData for Upload { let stream = crate::stream::from_err(stream); - ingest::ingest(&state, stream, None).await + ingest::ingest(&state, stream, None, &upload_query).await } .with_poll_timer("file-upload") .instrument(span), ) })), - ) + )) } fn extract(value: Value) -> Result { @@ -204,16 +247,21 @@ impl FormData for Import { type Item = Session; type Error = Error; - fn form(req: &actix_web::HttpRequest) -> Form { + fn form(req: &actix_web::HttpRequest) -> Result, Self::Error> { let state = req .app_data::>>() .expect("No state in request") .clone(); + let web::Query(upload_query) = web::Query::::from_query(req.query_string()) + .map_err(UploadError::InvalidUploadQuery)?; + + let upload_query = Rc::new(upload_query); + // Create a new Multipart Form validator for internal imports // // This form is expecting a single array field, 'images' with at most 10 files in it - Form::new() + Ok(Form::new() .max_files(state.config.server.max_file_count) .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) @@ -221,6 +269,7 @@ impl FormData for Import { "images", Field::array(Field::file(move |filename, _, stream| { let state = state.clone(); + let upload_query = upload_query.clone(); metrics::counter!(crate::init_metrics::FILES, "import" => "inline") .increment(1); @@ -235,14 +284,19 @@ impl FormData for Import { let stream = crate::stream::from_err(stream); - ingest::ingest(&state, stream, Some(Alias::from_existing(&filename))) - .await + ingest::ingest( + &state, + stream, + Some(Alias::from_existing(&filename)), + &upload_query, + ) + .await } .with_poll_timer("file-import") .instrument(span), ) })), - ) + )) } fn extract(value: Value) -> Result @@ -320,16 +374,16 @@ impl FormData for BackgroundedUpload { type Item = Backgrounded; type Error = Error; - fn form(req: &actix_web::HttpRequest) -> Form { - // Create a new Multipart Form validator for backgrounded uploads - // - // This form is expecting a single array field, 'images' with at most 10 files in it + fn form(req: &actix_web::HttpRequest) -> Result, Self::Error> { let state = req .app_data::>>() .expect("No state in request") .clone(); - Form::new() + // Create a new Multipart Form validator for backgrounded uploads + // + // This form is expecting a single array field, 'images' with at most 10 files in it + Ok(Form::new() .max_files(state.config.server.max_file_count) .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) @@ -357,7 +411,7 @@ impl FormData for BackgroundedUpload { .instrument(span), ) })), - ) + )) } fn extract(value: Value) -> Result @@ -372,6 +426,7 @@ impl FormData for BackgroundedUpload { async fn upload_backgrounded( Multipart(BackgroundedUpload(value, _)): Multipart>, state: web::Data>, + web::Query(upload_query): web::Query, ) -> Result { let images = value .map() @@ -389,7 +444,14 @@ async fn upload_backgrounded( let upload_id = image.result.upload_id().expect("Upload ID exists"); let identifier = image.result.identifier().expect("Identifier exists"); - queue::queue_ingest(&state.repo, identifier, upload_id, None).await?; + queue::queue_ingest( + &state.repo, + identifier, + upload_id, + None, + upload_query.clone(), + ) + .await?; files.push(serde_json::json!({ "upload_id": upload_id.to_string(), @@ -462,11 +524,21 @@ struct UrlQuery { backgrounded: bool, } +#[derive(Debug, serde::Deserialize)] +struct DownloadQuery { + #[serde(flatten)] + url_query: UrlQuery, + + #[serde(flatten)] + upload_query: UploadQuery, +} + async fn ingest_inline( stream: impl Stream>, state: &State, + upload_query: &UploadQuery, ) -> Result<(Alias, DeleteToken, Details), Error> { - let session = ingest::ingest(state, stream, None).await?; + let session = ingest::ingest(state, stream, None, upload_query).await?; let alias = session.alias().expect("alias should exist").to_owned(); @@ -480,15 +552,18 @@ async fn ingest_inline( /// download an image from a URL #[tracing::instrument(name = "Downloading file", skip(state))] async fn download( - query: web::Query, + web::Query(DownloadQuery { + url_query, + upload_query, + }): web::Query, state: web::Data>, ) -> Result { - let stream = download_stream(&query.url, &state).await?; + let stream = download_stream(&url_query.url, &state).await?; - if query.backgrounded { - do_download_backgrounded(stream, state).await + if url_query.backgrounded { + do_download_backgrounded(stream, state, upload_query).await } else { - do_download_inline(stream, &state).await + do_download_inline(stream, &state, &upload_query).await } } @@ -518,10 +593,11 @@ async fn download_stream( async fn do_download_inline( stream: impl Stream>, state: &State, + upload_query: &UploadQuery, ) -> Result { metrics::counter!(crate::init_metrics::FILES, "download" => "inline").increment(1); - let (alias, delete_token, details) = ingest_inline(stream, state).await?; + let (alias, delete_token, details) = ingest_inline(stream, state, upload_query).await?; Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -537,6 +613,7 @@ async fn do_download_inline( async fn do_download_backgrounded( stream: impl Stream>, state: web::Data>, + upload_query: UploadQuery, ) -> Result { metrics::counter!(crate::init_metrics::FILES, "download" => "background").increment(1); @@ -545,7 +622,7 @@ async fn do_download_backgrounded( let upload_id = backgrounded.upload_id().expect("Upload ID exists"); let identifier = backgrounded.identifier().expect("Identifier exists"); - queue::queue_ingest(&state.repo, identifier, upload_id, None).await?; + queue::queue_ingest(&state.repo, identifier, upload_id, None, upload_query).await?; backgrounded.disarm(); @@ -1212,7 +1289,7 @@ async fn proxy_alias_from_query( } else if !state.config.server.read_only { let stream = download_stream(proxy.as_str(), state).await?; - let (alias, _, _) = ingest_inline(stream, state).await?; + let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?; state.repo.relate_url(proxy, alias.clone()).await?; diff --git a/src/queue.rs b/src/queue.rs index 42da976..96cebf8 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -7,6 +7,7 @@ use crate::{ serde_str::Serde, state::State, store::Store, + UploadQuery, }; use std::{ @@ -56,6 +57,8 @@ enum Process { identifier: String, upload_id: Serde, declared_alias: Option>, + #[serde(default)] + upload_query: UploadQuery, }, Generate { target_format: InputProcessableFormat, @@ -158,11 +161,13 @@ pub(crate) async fn queue_ingest( identifier: &Arc, upload_id: UploadId, declared_alias: Option, + upload_query: UploadQuery, ) -> Result<(), Error> { let job = serde_json::to_value(Process::Ingest { identifier: identifier.to_string(), declared_alias: declared_alias.map(Serde::new), upload_id: Serde::new(upload_id), + upload_query, }) .map_err(UploadError::PushJob)?; repo.push(PROCESS_QUEUE, job, None).await?; diff --git a/src/queue/process.rs b/src/queue/process.rs index 653ca26..76ff626 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -12,6 +12,7 @@ use crate::{ serde_str::Serde, state::State, store::Store, + UploadQuery, }; use std::{path::PathBuf, sync::Arc}; @@ -37,12 +38,14 @@ where identifier, upload_id, declared_alias, + upload_query, } => { process_ingest( state, Arc::from(identifier), Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), + upload_query, ) .with_poll_timer("process-ingest") .await? @@ -110,6 +113,7 @@ async fn process_ingest( unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, + upload_query: UploadQuery, ) -> JobResult where S: Store + 'static, @@ -129,7 +133,8 @@ where let stream = crate::stream::from_err(state2.store.to_stream(&ident, None, None).await?); - let session = crate::ingest::ingest(&state2, stream, declared_alias).await?; + let session = + crate::ingest::ingest(&state2, stream, declared_alias, &upload_query).await?; Ok(session) as Result }