Start threading upload configuration into ingest

This commit is contained in:
asonix 2024-03-27 16:57:22 -05:00
parent 34b9919428
commit 84a882392a
12 changed files with 141 additions and 38 deletions

3
Cargo.lock generated
View file

@ -22,8 +22,7 @@ dependencies = [
[[package]]
name = "actix-form-data"
version = "0.7.0-beta.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0588d156cb871d8c237d55ce398e2a65727370fb98352ba5b65c15a2f834b0f"
source = "git+https://git.asonix.dog/asonix/actix-form-data#1c88a70021df9b8b394c2d9aee1265c5ef55d59b"
dependencies = [
"actix-multipart",
"actix-web",

View file

@ -19,7 +19,7 @@ poll-timer-warnings = []
random-errors = ["dep:nanorand"]
[dependencies]
actix-form-data = "0.7.0-beta.6"
actix-form-data = { version = "0.7.0-beta.6", git = "https://git.asonix.dog/asonix/actix-form-data" }
actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] }
async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] }

View file

@ -82,7 +82,7 @@ pub(crate) enum UploadError {
Io(#[from] std::io::Error),
#[error("Error validating upload")]
Validation(#[from] crate::validate::ValidationError),
Validation(#[from] crate::ingest::ValidationError),
#[error("Error in store")]
Store(#[source] crate::store::StoreError),
@ -111,6 +111,9 @@ pub(crate) enum UploadError {
#[error("Invalid job popped from job queue: {1}")]
InvalidJob(#[source] serde_json::Error, String),
#[error("Error parsing upload query")]
InvalidUploadQuery(#[source] actix_web::error::QueryPayloadError),
#[error("pict-rs is in read-only mode")]
ReadOnly,
@ -209,6 +212,7 @@ impl UploadError {
Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT,
Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION,
Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB,
Self::InvalidUploadQuery(_) => ErrorCode::INVALID_UPLOAD_QUERY,
#[cfg(feature = "random-errors")]
Self::RandomError => ErrorCode::RANDOM_ERROR,
}
@ -248,7 +252,7 @@ impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind() {
Some(UploadError::Upload(actix_form_data::Error::FileSize))
| Some(UploadError::Validation(crate::validate::ValidationError::Filesize)) => {
| Some(UploadError::Validation(crate::ingest::ValidationError::Filesize)) => {
StatusCode::PAYLOAD_TOO_LARGE
}
Some(
@ -261,6 +265,7 @@ impl ResponseError for Error {
))
| UploadError::Repo(crate::repo::RepoError::AlreadyClaimed)
| UploadError::Validation(_)
| UploadError::InvalidUploadQuery(_)
| UploadError::UnsupportedProcessExtension
| UploadError::ReadOnly
| UploadError::FailedExternalValidation

View file

@ -147,6 +147,9 @@ impl ErrorCode {
pub(crate) const INVALID_JOB: ErrorCode = ErrorCode {
code: "invalid-job",
};
pub(crate) const INVALID_UPLOAD_QUERY: ErrorCode = ErrorCode {
code: "invalid-upload-query",
};
#[cfg(feature = "random-errors")]
pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode {
code: "random-error",

View file

@ -1,3 +1,6 @@
mod hasher;
mod validate;
use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration};
use crate::{
@ -9,16 +12,17 @@ use crate::{
repo::{Alias, ArcRepo, DeleteToken, Hash},
state::State,
store::Store,
UploadQuery,
};
use actix_web::web::Bytes;
use futures_core::Stream;
use reqwest::Body;
use tracing::{Instrument, Span};
mod hasher;
use hasher::Hasher;
pub(crate) use validate::ValidationError;
#[derive(Debug)]
pub(crate) struct Session {
repo: ArcRepo,
@ -31,6 +35,7 @@ pub(crate) struct Session {
async fn process_ingest<S>(
state: &State<S>,
stream: impl Stream<Item = Result<Bytes, Error>>,
upload_query: &UploadQuery,
) -> Result<
(
InternalFormat,
@ -54,9 +59,10 @@ where
let permit = crate::process_semaphore().acquire().await?;
tracing::trace!("Validating bytes");
let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes)
.with_poll_timer("validate-bytes-stream")
.await?;
let (input_type, process_read) =
validate::validate_bytes_stream(state, bytes, &upload_query.limits)
.with_poll_timer("validate-bytes-stream")
.await?;
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
if let Some(format) = input_type.processable_format() {
@ -159,6 +165,7 @@ pub(crate) async fn ingest<S>(
state: &State<S>,
stream: impl Stream<Item = Result<Bytes, Error>>,
declared_alias: Option<Alias>,
upload_query: &UploadQuery,
) -> Result<Session, Error>
where
S: Store,
@ -166,7 +173,7 @@ where
let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode {
dummy_ingest(state, stream).await?
} else {
process_ingest(state, stream)
process_ingest(state, stream, upload_query)
.with_poll_timer("ingest-future")
.await?
};

View file

@ -14,6 +14,7 @@ use crate::{
future::WithPollTimer,
process::{Process, ProcessRead},
state::State,
UploadLimits,
};
#[derive(Debug, thiserror::Error)]
@ -60,6 +61,7 @@ const MEGABYTES: usize = 1024 * 1024;
pub(crate) async fn validate_bytes_stream<S>(
state: &State<S>,
bytes: BytesStream,
upload_limits: &UploadLimits,
) -> Result<(InternalFormat, ProcessRead), Error> {
if bytes.is_empty() {
return Err(ValidationError::Empty.into());

View file

@ -35,7 +35,6 @@ mod stream;
mod sync;
mod tls;
mod tmp_file;
mod validate;
use actix_form_data::{Field, Form, FormData, Multipart, Value};
use actix_web::{
@ -59,6 +58,7 @@ use std::{
marker::PhantomData,
path::Path,
path::PathBuf,
rc::Rc,
sync::{Arc, OnceLock},
time::{Duration, SystemTime},
};
@ -147,22 +147,64 @@ async fn ensure_details_identifier<S: Store + 'static>(
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default)]
struct UploadLimits {
max_width: Option<u16>,
max_height: Option<u16>,
max_area: Option<u32>,
max_frame_count: Option<u32>,
max_file_size: Option<usize>,
allow_image: bool,
allow_animation: bool,
allow_video: bool,
}
impl Default for UploadLimits {
fn default() -> Self {
Self {
max_width: None,
max_height: None,
max_area: None,
max_frame_count: None,
max_file_size: None,
allow_image: true,
allow_animation: true,
allow_video: true,
}
}
}
#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)]
struct UploadQuery {
#[serde(flatten)]
limits: UploadLimits,
#[serde(with = "tuple_vec_map", flatten)]
operations: Vec<(String, String)>,
}
struct Upload<S>(Value<Session>, PhantomData<S>);
impl<S: Store + 'static> FormData for Upload<S> {
type Item = Session;
type Error = Error;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidUploadQuery)?;
let upload_query = Rc::new(upload_query);
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Form::new()
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -170,6 +212,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
"images",
Field::array(Field::file(move |filename, _, stream| {
let state = state.clone();
let upload_query = upload_query.clone();
metrics::counter!(crate::init_metrics::FILES, "upload" => "inline")
.increment(1);
@ -184,13 +227,13 @@ impl<S: Store + 'static> FormData for Upload<S> {
let stream = crate::stream::from_err(stream);
ingest::ingest(&state, stream, None).await
ingest::ingest(&state, stream, None, &upload_query).await
}
.with_poll_timer("file-upload")
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error> {
@ -204,16 +247,21 @@ impl<S: Store + 'static> FormData for Import<S> {
type Item = Session;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
fn form(req: &actix_web::HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidUploadQuery)?;
let upload_query = Rc::new(upload_query);
// Create a new Multipart Form validator for internal imports
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Form::new()
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -221,6 +269,7 @@ impl<S: Store + 'static> FormData for Import<S> {
"images",
Field::array(Field::file(move |filename, _, stream| {
let state = state.clone();
let upload_query = upload_query.clone();
metrics::counter!(crate::init_metrics::FILES, "import" => "inline")
.increment(1);
@ -235,14 +284,19 @@ impl<S: Store + 'static> FormData for Import<S> {
let stream = crate::stream::from_err(stream);
ingest::ingest(&state, stream, Some(Alias::from_existing(&filename)))
.await
ingest::ingest(
&state,
stream,
Some(Alias::from_existing(&filename)),
&upload_query,
)
.await
}
.with_poll_timer("file-import")
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
@ -320,16 +374,16 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
type Item = Backgrounded;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
// Create a new Multipart Form validator for backgrounded uploads
//
// This form is expecting a single array field, 'images' with at most 10 files in it
fn form(req: &actix_web::HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
Form::new()
// Create a new Multipart Form validator for backgrounded uploads
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -357,7 +411,7 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
@ -372,6 +426,7 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
async fn upload_backgrounded<S: Store>(
Multipart(BackgroundedUpload(value, _)): Multipart<BackgroundedUpload<S>>,
state: web::Data<State<S>>,
web::Query(upload_query): web::Query<UploadQuery>,
) -> Result<HttpResponse, Error> {
let images = value
.map()
@ -389,7 +444,14 @@ async fn upload_backgrounded<S: Store>(
let upload_id = image.result.upload_id().expect("Upload ID exists");
let identifier = image.result.identifier().expect("Identifier exists");
queue::queue_ingest(&state.repo, identifier, upload_id, None).await?;
queue::queue_ingest(
&state.repo,
identifier,
upload_id,
None,
upload_query.clone(),
)
.await?;
files.push(serde_json::json!({
"upload_id": upload_id.to_string(),
@ -462,11 +524,21 @@ struct UrlQuery {
backgrounded: bool,
}
#[derive(Debug, serde::Deserialize)]
struct DownloadQuery {
#[serde(flatten)]
url_query: UrlQuery,
#[serde(flatten)]
upload_query: UploadQuery,
}
async fn ingest_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: &State<S>,
upload_query: &UploadQuery,
) -> Result<(Alias, DeleteToken, Details), Error> {
let session = ingest::ingest(state, stream, None).await?;
let session = ingest::ingest(state, stream, None, upload_query).await?;
let alias = session.alias().expect("alias should exist").to_owned();
@ -480,15 +552,18 @@ async fn ingest_inline<S: Store + 'static>(
/// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(state))]
async fn download<S: Store + 'static>(
query: web::Query<UrlQuery>,
web::Query(DownloadQuery {
url_query,
upload_query,
}): web::Query<DownloadQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let stream = download_stream(&query.url, &state).await?;
let stream = download_stream(&url_query.url, &state).await?;
if query.backgrounded {
do_download_backgrounded(stream, state).await
if url_query.backgrounded {
do_download_backgrounded(stream, state, upload_query).await
} else {
do_download_inline(stream, &state).await
do_download_inline(stream, &state, &upload_query).await
}
}
@ -518,10 +593,11 @@ async fn download_stream<S>(
async fn do_download_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: &State<S>,
upload_query: &UploadQuery,
) -> Result<HttpResponse, Error> {
metrics::counter!(crate::init_metrics::FILES, "download" => "inline").increment(1);
let (alias, delete_token, details) = ingest_inline(stream, state).await?;
let (alias, delete_token, details) = ingest_inline(stream, state, upload_query).await?;
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
@ -537,6 +613,7 @@ async fn do_download_inline<S: Store + 'static>(
async fn do_download_backgrounded<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: web::Data<State<S>>,
upload_query: UploadQuery,
) -> Result<HttpResponse, Error> {
metrics::counter!(crate::init_metrics::FILES, "download" => "background").increment(1);
@ -545,7 +622,7 @@ async fn do_download_backgrounded<S: Store + 'static>(
let upload_id = backgrounded.upload_id().expect("Upload ID exists");
let identifier = backgrounded.identifier().expect("Identifier exists");
queue::queue_ingest(&state.repo, identifier, upload_id, None).await?;
queue::queue_ingest(&state.repo, identifier, upload_id, None, upload_query).await?;
backgrounded.disarm();
@ -1212,7 +1289,7 @@ async fn proxy_alias_from_query<S: Store + 'static>(
} else if !state.config.server.read_only {
let stream = download_stream(proxy.as_str(), state).await?;
let (alias, _, _) = ingest_inline(stream, state).await?;
let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?;
state.repo.relate_url(proxy, alias.clone()).await?;

View file

@ -7,6 +7,7 @@ use crate::{
serde_str::Serde,
state::State,
store::Store,
UploadQuery,
};
use std::{
@ -56,6 +57,8 @@ enum Process {
identifier: String,
upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>,
#[serde(default)]
upload_query: UploadQuery,
},
Generate {
target_format: InputProcessableFormat,
@ -158,11 +161,13 @@ pub(crate) async fn queue_ingest(
identifier: &Arc<str>,
upload_id: UploadId,
declared_alias: Option<Alias>,
upload_query: UploadQuery,
) -> Result<(), Error> {
let job = serde_json::to_value(Process::Ingest {
identifier: identifier.to_string(),
declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id),
upload_query,
})
.map_err(UploadError::PushJob)?;
repo.push(PROCESS_QUEUE, job, None).await?;

View file

@ -12,6 +12,7 @@ use crate::{
serde_str::Serde,
state::State,
store::Store,
UploadQuery,
};
use std::{path::PathBuf, sync::Arc};
@ -37,12 +38,14 @@ where
identifier,
upload_id,
declared_alias,
upload_query,
} => {
process_ingest(
state,
Arc::from(identifier),
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
upload_query,
)
.with_poll_timer("process-ingest")
.await?
@ -110,6 +113,7 @@ async fn process_ingest<S>(
unprocessed_identifier: Arc<str>,
upload_id: UploadId,
declared_alias: Option<Alias>,
upload_query: UploadQuery,
) -> JobResult
where
S: Store + 'static,
@ -129,7 +133,8 @@ where
let stream =
crate::stream::from_err(state2.store.to_stream(&ident, None, None).await?);
let session = crate::ingest::ingest(&state2, stream, declared_alias).await?;
let session =
crate::ingest::ingest(&state2, stream, declared_alias, &upload_query).await?;
Ok(session) as Result<Session, Error>
}