Compare commits

...

28 commits

Author SHA1 Message Date
asonix 9ab99f162a Add forgejo actions
All checks were successful
/ check (aarch64-unknown-linux-musl) (push) Successful in 34s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 32s
/ check (x86_64-unknown-linux-musl) (push) Successful in 25s
/ clippy (push) Successful in 29s
/ tests (push) Successful in 36s
/ build (aarch64-unknown-linux-musl) (push) Successful in 37s
/ build (armv7-unknown-linux-musleabihf) (push) Successful in 37s
/ build (x86_64-unknown-linux-musl) (push) Successful in 24s
/ publish-forgejo (push) Successful in 8s
/ publish-crate (push) Successful in 27s
2024-03-27 19:04:02 -05:00
asonix 298843c31c Bump version 2024-03-27 19:03:45 -05:00
asonix 1c88a70021 Make form construction fallible 2024-03-27 16:16:06 -05:00
asonix 5f38d493c6 Bump version 2023-12-22 13:55:21 -06:00
asonix 0d3fd3b19e Propagate span tree into field handler 2023-12-22 13:40:57 -06:00
asonix 05161821b6 Update version, remove dependency on actix-rt 2023-12-10 18:20:06 -06:00
asonix 0f7614ec3b Simplify stream implementations with streem 2023-12-10 18:16:25 -06:00
asonix 8b422644fb Add streem, update dependencies 2023-12-10 18:16:10 -06:00
asonix a91c503378 Update flake 2023-12-10 18:15:42 -06:00
asonix 9e0e425e14 Bump version 2023-07-18 18:04:41 -05:00
asonix fb46235530 Fix off-by-one in file & field count 2023-07-18 18:03:51 -05:00
asonix d3b16438c9 Add explicit drop on sender, log when finished consuming 2023-07-16 15:07:26 -05:00
asonix f3096ac76a Simplify select with guards 2023-07-16 15:00:09 -05:00
asonix e9456945d9 If set is empty don't select on it 2023-07-16 14:18:42 -05:00
asonix e701ea0e56 More trace 2023-07-16 14:13:15 -05:00
asonix 4ec8205934 Try to avoid spinning forever 2023-07-16 14:08:18 -05:00
asonix 20f80df9fd Attempt to drain multipart before returning errors 2023-07-16 13:49:21 -05:00
asonix 46e5834b60 Ensure error is Send 2023-06-01 17:16:23 -05:00
asonix 1be6074cf4 Update deps 2023-06-01 16:19:17 -05:00
asonix 9195d5623c Update lib.rs example 2022-09-24 17:19:53 -05:00
asonix a2e1ffa091 Remove Send + Sync bounds from callbacks 2022-09-24 16:22:00 -05:00
asonix 3525bcd09c Remove config from FormData trait 2022-09-10 10:55:52 -05:00
asonix c5265d286e Move to extractor pattern 2022-09-07 18:38:10 -05:00
Aode (Lion) 4cbc7cb78e Update repo url, readme examples 2022-03-08 12:07:33 -06:00
Aode (Lion) a36f9a9411 Bump readme version 2022-03-08 11:51:12 -06:00
Aode (Lion) ec85a80f5d stable deps 2022-03-08 11:27:49 -06:00
Aode (lion) 493d99f0ed Update to latest beta 2021-11-22 18:21:10 -06:00
Aode (Lion) 4dd4b8db41 2021 2021-10-21 16:23:12 -05:00
15 changed files with 756 additions and 452 deletions

View file

@ -0,0 +1,55 @@
on:
push:
branches:
- '*'
pull_request:
branches:
- main
jobs:
clippy:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Clippy
run: |
cargo clippy --no-default-features -- -D warnings
tests:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Test
run: cargo test
check:
strategy:
fail-fast: false
matrix:
target:
- x86_64-unknown-linux-musl
- armv7-unknown-linux-musleabihf
- aarch64-unknown-linux-musl
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Debug builds
run: cargo zigbuild --target ${{ matrix.target }}

View file

@ -0,0 +1,78 @@
on:
push:
tags:
- 'v*.*.*'
jobs:
clippy:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Clippy
run: |
cargo clippy --no-default-features -- -D warnings
tests:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Test
run: cargo test
build:
needs:
- clippy
- tests
runs-on: base-image
strategy:
fail-fast: false
matrix:
target:
- x86_64-unknown-linux-musl
- armv7-unknown-linux-musleabihf
- aarch64-unknown-linux-musl
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Compile actix-form-data
run: cargo zigbuild --target ${{ matrix.target }}
publish-forgejo:
needs: [build]
runs-on: base-image
steps:
- uses: actions/forgejo-release@v1
with:
direction: upload
token: ${{ secrets.GITHUB_TOKEN }}
publish-crate:
needs: [build]
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Publish Crate
run: cargo publish --token ${{ secrets.CRATES_IO_TOKEN }}

2
.gitignore vendored
View file

@ -2,3 +2,5 @@
**/*.rs.bk
Cargo.lock
/examples/filename*.png
/.envrc
/.direnv

View file

@ -1,29 +1,35 @@
[package]
name = "actix-form-data"
description = "Multipart Form Data for Actix Web"
version = "0.6.0-beta.10"
version = "0.7.0-beta.7"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/actix-form-data.git"
repository = "https://git.asonix.dog/asonix/actix-form-data.git"
readme = "README.md"
keywords = ["actix", "form-data", "multipart", "async"]
edition = "2018"
edition = "2021"
[dependencies]
actix-multipart = "0.4.0-beta.7"
actix-rt = "2.3.0"
actix-web = { version = "4.0.0-beta.10", default-features = false }
futures-util = "0.3.17"
actix-multipart = { version = "0.6.0", default-features = false }
actix-web = { version = "4.0.0", default-features = false }
futures-core = "0.3.28"
mime = "0.3.16"
streem = "0.2.0"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] }
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
tracing = "0.1.15"
[dev-dependencies]
async-fs = "1.2.1"
actix-rt = "2.5.0"
async-fs = "2.1.0"
anyhow = "1.0"
futures-lite = "1.4.0"
futures-lite = "2.1.0"
futures-util = "0.3.17"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
tracing-subscriber = { version = "0.3.9", features = [
"env-filter",
"fmt",
"tracing-log",
] }

View file

@ -13,8 +13,8 @@ Add it to your dependencies.
# Cargo.toml
[dependencies]
actix-web = "4.0.0-beta.3"
actix-form-data = "0.6.0-beta.1"
actix-web = "4.0.0"
actix-form-data = "0.6.0"
```
Require it in your project.
@ -41,7 +41,7 @@ App::new()
In your handler, get the value
```rust
async fn upload(value: Value) -> {
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
...
}
```
@ -67,7 +67,7 @@ use actix_web::{
web::{post, resource},
App, HttpResponse, HttpServer,
};
use futures::stream::StreamExt;
use futures_util::stream::StreamExt;
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
println!("Uploaded Content: {:#?}", uploaded_content);

View file

@ -1,46 +1,56 @@
use actix_form_data::{Error, Field, Form, Value};
use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
use actix_web::{
web::{post, resource},
App, HttpResponse, HttpServer,
App, HttpRequest, HttpResponse, HttpServer,
};
use futures_util::stream::StreamExt;
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
println!("Uploaded Content: {:#?}", uploaded_content);
struct UploadedContent(Value<()>);
impl FormData for UploadedContent {
type Item = ();
type Error = Error;
fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
Ok(Form::new()
.field("Hey", Field::text())
.field(
"Hi",
Field::map()
.field("One", Field::int())
.field("Two", Field::float())
.finalize(),
)
.field(
"files",
Field::array(Field::file(|_, _, mut stream| async move {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as Result<(), Error>
})),
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(UploadedContent(value))
}
}
async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
println!("Uploaded Content: {:#?}", value);
HttpResponse::Created().finish()
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
let form = Form::new()
.field("Hey", Field::text())
.field(
"Hi",
Field::map()
.field("One", Field::int())
.field("Two", Field::float())
.finalize(),
)
.field(
"files",
Field::array(Field::file(|_, _, mut stream| async move {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as Result<(), Error>
})),
);
println!("{:?}", form);
HttpServer::new(move || {
App::new()
.wrap(form.clone())
.service(resource("/upload").route(post().to(upload)))
})
.bind("127.0.0.1:8080")?
.run()
.await?;
HttpServer::new(move || App::new().service(resource("/upload").route(post().to(upload))))
.bind("127.0.0.1:8080")?
.run()
.await?;
Ok(())
}

View file

@ -1,9 +1,9 @@
use actix_form_data::{Error, Field, Form, Value};
use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
use actix_web::{
http::StatusCode,
middleware::Logger,
web::{post, resource, Bytes},
App, HttpResponse, HttpServer, ResponseError,
App, HttpRequest, HttpResponse, HttpServer, ResponseError,
};
use futures_util::stream::{Stream, StreamExt, TryStreamExt};
use std::{
@ -17,11 +17,6 @@ use std::{
};
use tracing::info;
#[derive(Clone, Debug)]
struct AppState {
form: Form<PathBuf, Errors>,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
struct JsonError {
msg: String,
@ -60,8 +55,49 @@ impl ResponseError for Errors {
}
}
async fn upload(uploaded_content: Value<PathBuf>) -> HttpResponse {
info!("Uploaded Content: {:#?}", uploaded_content);
struct UploadedContent(Value<PathBuf>);
impl FormData for UploadedContent {
type Item = PathBuf;
type Error = Errors;
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let file_count = req.app_data::<Arc<AtomicUsize>>().expect("Set config");
let file_count = Arc::clone(file_count);
Ok(Form::new()
.field("Hey", Field::text())
.field(
"Hi",
Field::map()
.field("One", Field::int())
.field("Two", Field::float())
.finalize(),
)
.field(
"files",
Field::array(Field::file(move |_filename, _content_type, stream| {
let count = file_count.fetch_add(1, Ordering::Relaxed);
async move {
save_file(stream, count)
.await
.map(PathBuf::from)
.map_err(Errors::from)
}
})),
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(UploadedContent(value))
}
}
async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
info!("Uploaded Content: {:#?}", value);
HttpResponse::Created().finish()
}
@ -96,34 +132,10 @@ async fn main() -> Result<(), anyhow::Error> {
let file_count = Arc::new(AtomicUsize::new(0));
let form = Form::new()
.field("Hey", Field::text())
.field(
"Hi",
Field::map()
.field("One", Field::int())
.field("Two", Field::float())
.finalize(),
)
.field(
"files",
Field::array(Field::file(move |_filename, _content_type, stream| {
let count = file_count.clone().fetch_add(1, Ordering::Relaxed);
async move {
save_file(stream, count)
.await
.map(PathBuf::from)
.map_err(Errors::from)
}
})),
);
info!("{:#?}", form);
HttpServer::new(move || {
App::new()
.wrap(form.clone())
.wrap(Logger::default())
.app_data(file_count.clone())
.service(resource("/upload").route(post().to(upload)))
})
.bind("127.0.0.1:8080")?

61
flake.lock Normal file
View file

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1702151865,
"narHash": "sha256-9VAt19t6yQa7pHZLDbil/QctAgVsA66DLnzdRGqDisg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "666fc80e7b2afb570462423cb0e1cf1a3a34fedd",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

27
flake.nix Normal file
View file

@ -0,0 +1,27 @@
{
description = "A very basic flake";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
packages.default = pkgs.hello;
devShell = with pkgs; mkShell {
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
};
formatter = pkgs.nixpkgs-fmt;
});
}

View file

@ -19,27 +19,26 @@
use std::{
num::{ParseFloatError, ParseIntError},
string::FromUtf8Error,
str::Utf8Error,
};
use actix_multipart::MultipartError;
use actix_web::{
error::{PayloadError, ResponseError},
error::{ParseError, PayloadError, ResponseError},
http::StatusCode,
HttpResponse,
};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Error parsing payload, {0}")]
#[error("Error parsing payload")]
Payload(#[from] PayloadError),
#[error("Error in multipart creation, {0}")]
Multipart(MultipartError),
#[error("Failed to parse field, {0}")]
ParseField(#[from] FromUtf8Error),
#[error("Failed to parse int, {0}")]
#[error("Error in multipart creation")]
Multipart(#[from] MultipartError),
#[error("Failed to parse field")]
ParseField(#[from] Utf8Error),
#[error("Failed to parse int")]
ParseInt(#[from] ParseIntError),
#[error("Failed to parse float, {0}")]
#[error("Failed to parse float")]
ParseFloat(#[from] ParseFloatError),
#[error("Bad Content-Type")]
ContentType,
@ -59,11 +58,85 @@ pub enum Error {
FileCount,
#[error("File too large")]
FileSize,
#[error("Task panicked")]
Panicked,
}
impl From<MultipartError> for Error {
fn from(m: MultipartError) -> Self {
Error::Multipart(m)
#[derive(Debug, thiserror::Error)]
pub enum MultipartError {
#[error("No Content-Disposition `form-data` header")]
NoContentDisposition,
#[error("No Content-Type header found")]
NoContentType,
#[error("Cannot parse Content-Type header")]
ParseContentType,
#[error("Multipart boundary is not found")]
Boundary,
#[error("Nested multipart is not supported")]
Nested,
#[error("Multipart stream is incomplete")]
Incomplete,
#[error("Failed parsing")]
Parse(#[source] ParseError),
#[error("Multipart stream is not consumed")]
NotConsumed,
#[error("An error occured processing field `{field_name}`: `{zource}`")]
Field { field_name: String, zource: String },
#[error("Duplicate field found for: `{0}")]
DuplicateField(String),
#[error("Field with name `{0}` is required")]
MissingField(String),
#[error("Unsupported field `{0}`")]
UnsupportedField(String),
#[error("Unknown error occured: {0}")]
Unknown(String),
}
impl From<actix_multipart::MultipartError> for Error {
fn from(value: actix_multipart::MultipartError) -> Self {
match value {
actix_multipart::MultipartError::NoContentDisposition => {
Error::Multipart(MultipartError::NoContentDisposition)
}
actix_multipart::MultipartError::NoContentType => {
Error::Multipart(MultipartError::NoContentType)
}
actix_multipart::MultipartError::ParseContentType => {
Error::Multipart(MultipartError::ParseContentType)
}
actix_multipart::MultipartError::Boundary => Error::Multipart(MultipartError::Boundary),
actix_multipart::MultipartError::Nested => Error::Multipart(MultipartError::Nested),
actix_multipart::MultipartError::Incomplete => {
Error::Multipart(MultipartError::Incomplete)
}
actix_multipart::MultipartError::Parse(e) => Error::Multipart(MultipartError::Parse(e)),
actix_multipart::MultipartError::Payload(e) => Error::Payload(e),
actix_multipart::MultipartError::NotConsumed => {
Error::Multipart(MultipartError::NotConsumed)
}
actix_multipart::MultipartError::Field { field_name, source } => {
Error::Multipart(MultipartError::Field {
field_name,
zource: source.to_string(),
})
}
actix_multipart::MultipartError::DuplicateField(s) => {
Error::Multipart(MultipartError::DuplicateField(s))
}
actix_multipart::MultipartError::MissingField(s) => {
Error::Multipart(MultipartError::MissingField(s))
}
actix_multipart::MultipartError::UnsupportedField(s) => {
Error::Multipart(MultipartError::UnsupportedField(s))
}
e => Error::Multipart(MultipartError::Unknown(e.to_string())),
}
}
}
impl From<tokio::task::JoinError> for Error {
fn from(_: tokio::task::JoinError) -> Self {
Self::Panicked
}
}
@ -78,6 +151,7 @@ impl ResponseError for Error {
fn error_response(&self) -> HttpResponse {
match *self {
Error::Payload(ref e) => e.error_response(),
Error::Panicked => HttpResponse::InternalServerError().finish(),
Error::Multipart(_)
| Error::ParseField(_)
| Error::ParseInt(_)
@ -94,3 +168,14 @@ impl ResponseError for Error {
}
}
}
#[cfg(test)]
mod tests {
use super::Error;
#[test]
fn assert_send() {
fn is_send<E: Send>() {}
is_send::<Error>();
}
}

50
src/extractor.rs Normal file
View file

@ -0,0 +1,50 @@
use crate::{
types::{Form, Value},
upload::handle_multipart,
};
use actix_web::{dev::Payload, FromRequest, HttpRequest, ResponseError};
use std::{future::Future, pin::Pin, rc::Rc};
pub trait FormData {
type Item: 'static;
type Error: ResponseError + 'static;
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error>;
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
where
Self: Sized;
}
pub struct Multipart<T>(pub T);
impl<T> FromRequest for Multipart<T>
where
T: FormData,
{
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>> + 'static>>;
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
let multipart = actix_multipart::Multipart::new(req.headers(), payload.take());
let res = T::form(req);
Box::pin(async move {
let form = Rc::new(res?);
let uploaded = match handle_multipart(multipart, Rc::clone(&form)).await {
Ok(Ok(uploaded)) => uploaded,
Ok(Err(e)) => return Err(e.into()),
Err(e) => {
if let Some(f) = &form.transform_error {
return Err((f)(e));
} else {
return Err(e.into());
}
}
};
Ok(Multipart(T::extract(uploaded)?))
})
}
}

View file

@ -25,62 +25,72 @@
//! # Example
//!
//!```rust
//! use actix_form_data::{Error, Field, Form, Value};
//! use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
//! use actix_web::{
//! web::{post, resource},
//! App, HttpResponse, HttpServer,
//! App, HttpRequest, HttpResponse, HttpServer,
//! };
//! use futures_util::stream::StreamExt;
//!
//! async fn upload(uploaded_content: Value<()>) -> HttpResponse {
//! println!("Uploaded Content: {:#?}", uploaded_content);
//! struct UploadedContent(Value<()>);
//!
//! impl FormData for UploadedContent {
//! type Item = ();
//! type Error = Error;
//!
//! fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
//! Ok(Form::new()
//! .field("Hey", Field::text())
//! .field(
//! "Hi",
//! Field::map()
//! .field("One", Field::int())
//! .field("Two", Field::float())
//! .finalize(),
//! )
//! .field(
//! "files",
//! Field::array(Field::file(|_, _, mut stream| async move {
//! while let Some(res) = stream.next().await {
//! res?;
//! }
//! Ok(()) as Result<(), Error>
//! })),
//! ))
//! }
//!
//! fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
//! where
//! Self: Sized,
//! {
//! Ok(UploadedContent(value))
//! }
//! }
//!
//! async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
//! println!("Uploaded Content: {:#?}", value);
//! HttpResponse::Created().finish()
//! }
//!
//! #[actix_rt::main]
//! async fn main() -> Result<(), anyhow::Error> {
//! let form = Form::new()
//! .field("Hey", Field::text())
//! .field(
//! "Hi",
//! Field::map()
//! .field("One", Field::int())
//! .field("Two", Field::float())
//! .finalize(),
//! )
//! .field(
//! "files",
//! Field::array(Field::file(|_, _, mut stream| async move {
//! while let Some(_) = stream.next().await {
//! // do something
//! }
//! Ok(()) as Result<(), Error>
//! })),
//! );
//!
//! println!("{:?}", form);
//!
//! HttpServer::new(move || {
//! App::new()
//! .wrap(form.clone())
//! .service(resource("/upload").route(post().to(upload)))
//! })
//! .bind("127.0.0.1:8082")?;
//! // commented out to prevent infinite doctest
//! // .run()
//! // .await?;
//! HttpServer::new(move || App::new().service(resource("/upload").route(post().to(upload))))
//! .bind("127.0.0.1:8080")?
//! .run();
//! // .await?;
//!
//! Ok(())
//! }
//!```
mod error;
mod middleware;
mod extractor;
mod types;
mod upload;
pub use self::{
error::Error,
extractor::{FormData, Multipart},
types::{Field, FileMeta, Form, Value},
upload::handle_multipart,
};

View file

@ -1,146 +0,0 @@
/*
* This file is part of Actix Form Data.
*
* Copyright © 2020 Riley Trautman
*
* Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Actix Form Data is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Actix Form Data. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::{
types::{Form, Value},
upload::handle_multipart,
};
use actix_web::{
dev::{Payload, Service, ServiceRequest, Transform},
http::StatusCode,
FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
};
use futures_util::future::LocalBoxFuture;
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use tokio::sync::oneshot::{channel, Receiver};
#[derive(Debug, thiserror::Error)]
pub enum FromRequestError {
#[error("Uploaded guard used without Multipart middleware")]
MissingMiddleware,
#[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
TxDropped,
}
impl ResponseError for FromRequestError {
fn status_code(&self) -> StatusCode {
match self {
Self::MissingMiddleware | Self::TxDropped => StatusCode::INTERNAL_SERVER_ERROR,
}
}
fn error_response(&self) -> HttpResponse {
match self {
Self::MissingMiddleware | Self::TxDropped => {
HttpResponse::InternalServerError().finish()
}
}
}
}
struct Uploaded<T> {
rx: Receiver<Value<T>>,
}
pub struct MultipartMiddleware<S, T, E> {
form: Form<T, E>,
service: S,
}
impl<T> FromRequest for Value<T>
where
T: 'static,
{
type Error = FromRequestError;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let opt = req.extensions_mut().remove::<Uploaded<T>>();
Box::pin(async move {
let fut = opt.ok_or(FromRequestError::MissingMiddleware)?;
fut.rx.await.map_err(|_| FromRequestError::TxDropped)
})
}
}
impl<S, T, E> Transform<S, ServiceRequest> for Form<T, E>
where
S: Service<ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
T: 'static,
E: ResponseError + 'static,
{
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = MultipartMiddleware<S, T, E>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(MultipartMiddleware {
form: self.clone(),
service,
}))
}
}
impl<S, T, E> Service<ServiceRequest> for MultipartMiddleware<S, T, E>
where
S: Service<ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
T: 'static,
E: ResponseError + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, mut req: ServiceRequest) -> Self::Future {
let (tx, rx) = channel();
req.extensions_mut().insert(Uploaded { rx });
let payload = req.take_payload();
let multipart = actix_multipart::Multipart::new(req.headers(), payload);
let form = self.form.clone();
let fut = self.service.call(req);
Box::pin(async move {
let uploaded = match handle_multipart(multipart, form.clone()).await {
Ok(Ok(uploaded)) => uploaded,
Ok(Err(e)) => return Err(e.into()),
Err(e) => {
if let Some(f) = form.transform_error.clone() {
return Err((f)(e));
} else {
return Err(e.into());
}
}
};
let _ = tx.send(uploaded);
fut.await
})
}
}

View file

@ -19,21 +19,20 @@
use crate::Error;
use actix_web::web::Bytes;
use futures_util::Stream;
use futures_core::Stream;
use mime::Mime;
use std::{
collections::{HashMap, VecDeque},
fmt,
future::Future,
pin::Pin,
sync::Arc,
};
use tracing::trace;
#[derive(Debug)]
pub struct FileMeta<T> {
pub filename: String,
pub content_type: Mime,
pub content_type: Option<Mime>,
pub result: T,
}
@ -153,14 +152,12 @@ impl<T> From<MultipartContent<T>> for Value<T> {
}
}
pub type FileFn<T, E> = Arc<
pub type FileFn<T, E> = Box<
dyn Fn(
String,
Mime,
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>
+ Send
+ Sync,
String,
Option<Mime>,
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
>;
/// The field type represents a field in the form-data that is allowed to be parsed.
@ -174,20 +171,6 @@ pub enum Field<T, E> {
Text,
}
impl<T, E> Clone for Field<T, E> {
fn clone(&self) -> Self {
match self {
Self::Array(a) => Self::Array(a.clone()),
Self::Map(m) => Self::Map(m.clone()),
Self::File(file_fn) => Self::File(Arc::clone(file_fn)),
Self::Bytes => Self::Bytes,
Self::Int => Self::Int,
Self::Float => Self::Float,
Self::Text => Self::Text,
}
}
}
impl<T, E> fmt::Debug for Field<T, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
@ -232,17 +215,15 @@ impl<T, E> Field<T, E> {
/// ```
pub fn file<F, Fut>(f: F) -> Self
where
F: Fn(String, Mime, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
+ Send
+ Sync
F: Fn(String, Option<Mime>, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
+ Clone
+ 'static,
Fut: Future<Output = Result<T, E>> + 'static,
E: 'static,
{
Field::File(Arc::new(move |filename, mime, stream| {
Field::File(Box::new(move |filename, mime, stream| {
let f = f.clone();
Box::pin(async move { (f)(filename, mime, stream).await })
Box::pin((f)(filename, mime, stream))
}))
}
@ -252,7 +233,7 @@ impl<T, E> Field<T, E> {
/// ```rust
/// # use actix_form_data::{Error, Form, Field};
/// let form = Form::<(), Error>::new().field("text-field", Field::bytes());
pub fn bytes() -> Self {
pub const fn bytes() -> Self {
Field::Bytes
}
@ -262,7 +243,7 @@ impl<T, E> Field<T, E> {
/// ```rust
/// # use actix_form_data::{Error, Form, Field};
/// let form = Form::<(), Error>::new().field("text-field", Field::text());
pub fn text() -> Self {
pub const fn text() -> Self {
Field::Text
}
@ -273,7 +254,7 @@ impl<T, E> Field<T, E> {
/// # use actix_form_data::{Error, Form, Field};
/// let form = Form::<(), Error>::new().field("int-field", Field::int());
/// ```
pub fn int() -> Self {
pub const fn int() -> Self {
Field::Int
}
@ -284,7 +265,7 @@ impl<T, E> Field<T, E> {
/// # use actix_form_data::{Error, Form, Field};
/// let form = Form::<(), Error>::new().field("float-field", Field::float());
/// ```
pub fn float() -> Self {
pub const fn float() -> Self {
Field::Float
}
@ -321,18 +302,18 @@ impl<T, E> Field<T, E> {
/// );
/// # }
/// ```
pub fn map() -> Map<T, E> {
pub const fn map() -> Map<T, E> {
Map::new()
}
fn valid_field(&self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
fn valid_field<'a>(&'a self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<'a, T, E>> {
trace!("Checking {:?} and {:?}", self, name);
match *self {
Field::Array(ref arr) => arr.valid_field(name),
Field::Map(ref map) => map.valid_field(name),
Field::File(ref file_fn) => {
if name.is_empty() {
Some(FieldTerminator::File(file_fn.clone()))
Some(FieldTerminator::File(file_fn))
} else {
None
}
@ -377,14 +358,6 @@ pub struct Array<T, E> {
inner: Box<Field<T, E>>,
}
impl<T, E> Clone for Array<T, E> {
fn clone(&self) -> Self {
Array {
inner: Box::new((*self.inner).clone()),
}
}
}
impl<T, E> fmt::Debug for Array<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Array").field("inner", &self.inner).finish()
@ -398,7 +371,10 @@ impl<T, E> Array<T, E> {
}
}
fn valid_field(&self, mut name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
fn valid_field<'a>(
&'a self,
mut name: VecDeque<&NamePart>,
) -> Option<FieldTerminator<'a, T, E>> {
trace!("Checking {:?} and {:?}", self, name);
match name.pop_front() {
Some(NamePart::Array) => self.inner.valid_field(name),
@ -412,14 +388,6 @@ pub struct Map<T, E> {
inner: Vec<(String, Field<T, E>)>,
}
impl<T, E> Clone for Map<T, E> {
fn clone(&self) -> Self {
Map {
inner: self.inner.clone(),
}
}
}
impl<T, E> fmt::Debug for Map<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Map").field("inner", &self.inner).finish()
@ -427,7 +395,7 @@ impl<T, E> fmt::Debug for Map<T, E> {
}
impl<T, E> Map<T, E> {
fn new() -> Self {
const fn new() -> Self {
Map { inner: Vec::new() }
}
@ -456,18 +424,21 @@ impl<T, E> Map<T, E> {
/// .field("sub-field-two", Field::text())
/// .finalize();
/// ```
pub fn finalize(self) -> Field<T, E> {
pub const fn finalize(self) -> Field<T, E> {
Field::Map(self)
}
fn valid_field(&self, mut name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
fn valid_field<'a>(
&'a self,
mut name: VecDeque<&NamePart>,
) -> Option<FieldTerminator<'a, T, E>> {
trace!("Checking {:?} and {:?}", self, name);
match name.pop_front() {
Some(NamePart::Map(name_part)) => self
.inner
.iter()
.find(|&&(ref item, _)| *item == *name_part)
.and_then(|&(_, ref field)| field.valid_field(name)),
.find(|(ref item, _)| *item == *name_part)
.and_then(|(_, ref field)| field.valid_field(name)),
_ => None,
}
}
@ -502,23 +473,10 @@ pub struct Form<T, E> {
pub(crate) max_field_size: usize,
pub(crate) max_files: u32,
pub(crate) max_file_size: usize,
pub(crate) transform_error: Option<Arc<dyn Fn(Error) -> actix_web::Error + Send + Sync>>,
pub(crate) transform_error: Option<Box<dyn Fn(Error) -> actix_web::Error + Sync>>,
inner: Map<T, E>,
}
impl<T, E> Clone for Form<T, E> {
fn clone(&self) -> Self {
Form {
max_fields: self.max_fields,
max_field_size: self.max_field_size,
max_files: self.max_files,
max_file_size: self.max_file_size,
transform_error: None,
inner: self.inner.clone(),
}
}
}
impl<T, E> Default for Form<T, E> {
fn default() -> Self {
Self::new()
@ -535,7 +493,7 @@ impl<T, E> Form<T, E> {
/// - max_field_size: 10_000 bytes
/// - max_files: 20
/// - max_files_size: 10_000_000 bytes
pub fn new() -> Self {
pub const fn new() -> Self {
Form {
max_fields: 100,
max_field_size: 10_000,
@ -549,16 +507,16 @@ impl<T, E> Form<T, E> {
/// Set the Transform Error method to convert Error types into actix_web::Error by hand
pub fn transform_error(
mut self,
f: impl Fn(Error) -> actix_web::Error + Send + Sync + 'static,
f: impl Fn(Error) -> actix_web::Error + Sync + 'static,
) -> Self {
self.transform_error = Some(Arc::new(f));
self.transform_error = Some(Box::new(f));
self
}
/// Set the maximum number of fields allowed in the upload
///
/// The upload will error if too many fields are provided.
pub fn max_fields(mut self, max: u32) -> Self {
pub const fn max_fields(mut self, max: u32) -> Self {
self.max_fields = max;
self
@ -567,7 +525,7 @@ impl<T, E> Form<T, E> {
/// Set the maximum size of a field (in bytes)
///
/// The upload will error if a provided field is too large.
pub fn max_field_size(mut self, max: usize) -> Self {
pub const fn max_field_size(mut self, max: usize) -> Self {
self.max_field_size = max;
self
@ -576,7 +534,7 @@ impl<T, E> Form<T, E> {
/// Set the maximum number of files allowed in the upload
///
/// THe upload will error if too many files are provided.
pub fn max_files(mut self, max: u32) -> Self {
pub const fn max_files(mut self, max: u32) -> Self {
self.max_files = max;
self
@ -585,7 +543,7 @@ impl<T, E> Form<T, E> {
/// Set the maximum size for files (in bytes)
///
/// The upload will error if a provided file is too large.
pub fn max_file_size(mut self, max: usize) -> Self {
pub const fn max_file_size(mut self, max: usize) -> Self {
self.max_file_size = max;
self
@ -597,8 +555,11 @@ impl<T, E> Form<T, E> {
self
}
pub(crate) fn valid_field(&self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
self.inner.valid_field(name.clone())
pub(crate) fn valid_field<'a>(
&'a self,
name: VecDeque<&NamePart>,
) -> Option<FieldTerminator<'a, T, E>> {
self.inner.valid_field(name)
}
}
@ -614,15 +575,6 @@ pub(crate) struct ContentDisposition {
pub filename: Option<String>,
}
impl ContentDisposition {
pub(crate) fn empty() -> Self {
ContentDisposition {
name: None,
filename: None,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum NamePart {
Map(String),
@ -630,21 +582,21 @@ pub(crate) enum NamePart {
}
impl NamePart {
pub fn is_map(&self) -> bool {
pub const fn is_map(&self) -> bool {
matches!(self, NamePart::Map(_))
}
}
#[derive(Clone)]
pub(crate) enum FieldTerminator<T, E> {
File(FileFn<T, E>),
pub(crate) enum FieldTerminator<'a, T, E> {
File(&'a FileFn<T, E>),
Bytes,
Int,
Float,
Text,
}
impl<T, E> fmt::Debug for FieldTerminator<T, E> {
impl<'a, T, E> fmt::Debug for FieldTerminator<'a, T, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
FieldTerminator::File(_) => write!(f, "File"),

View file

@ -25,19 +25,10 @@ use crate::{
},
};
use actix_web::web::BytesMut;
use futures_util::{
select,
stream::{FuturesUnordered, StreamExt},
};
use std::{
collections::HashMap,
path::Path,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tracing::trace;
use std::{collections::HashMap, path::Path, rc::Rc};
use streem::IntoStreamer;
use tokio::task::JoinSet;
use tracing::Instrument;
fn consolidate<T>(mf: MultipartForm<T>) -> Value<T> {
mf.into_iter().fold(
@ -76,34 +67,30 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
NamePart::Map(part.to_owned())
}
})
.fold(Ok(vec![]), |acc, part| match acc {
Ok(mut v) => {
if v.is_empty() && !part.is_map() {
return Err(Error::ContentDisposition);
}
v.push(part);
Ok(v)
.try_fold(vec![], |mut v, part| {
if v.is_empty() && !part.is_map() {
return Err(Error::ContentDisposition);
}
Err(e) => Err(e),
v.push(part);
Ok(v)
})
}
fn parse_content_disposition(field: &actix_multipart::Field) -> ContentDisposition {
match field.content_disposition() {
Some(x) => ContentDisposition {
name: x.get_name().map(|v| v.to_string()),
filename: x.get_filename().map(|v| v.to_string()),
},
None => ContentDisposition::empty(),
let content_disposition = field.content_disposition();
ContentDisposition {
name: content_disposition.get_name().map(|v| v.to_string()),
filename: content_disposition.get_filename().map(|v| v.to_string()),
}
}
async fn handle_file_upload<T, E>(
field: actix_multipart::Field,
filename: Option<String>,
form: Form<T, E>,
file_fn: FileFn<T, E>,
form: &Form<T, E>,
file_fn: &FileFn<T, E>,
) -> Result<Result<MultipartContent<T>, E>, Error>
where
T: 'static,
@ -116,31 +103,42 @@ where
let filename = filename.ok_or(Error::Filename)?.to_owned();
let file_size = Arc::new(AtomicUsize::new(0));
let content_type = field.content_type().cloned();
let content_type = field.content_type().clone();
let max_file_size = form.max_file_size;
let field_stream = streem::try_from_fn(move |yielder| async move {
let mut file_size = 0;
let mut stream = field.into_streamer();
while let Some(bytes) = stream.try_next().await? {
tracing::trace!("Bytes from field");
file_size += bytes.len();
if file_size > max_file_size {
drop(bytes);
while stream.try_next().await?.is_some() {
tracing::trace!("Dropping oversized bytes");
}
return Err(Error::FileSize);
}
yielder.yield_ok(bytes).await;
}
tracing::debug!("Finished consuming field");
Ok(())
});
let result = file_fn(
filename.clone(),
content_type.clone(),
Box::pin(field.then(move |res| {
let form = form.clone();
let file_size = file_size.clone();
async move {
match res {
Ok(bytes) => {
let size = file_size.fetch_add(bytes.len(), Ordering::Relaxed);
if size + bytes.len() > form.max_file_size {
return Err(Error::FileSize);
}
Ok(bytes)
}
Err(e) => Err(Error::from(e)),
}
}
})),
Box::pin(field_stream),
)
.await;
@ -154,36 +152,63 @@ where
}
}
async fn handle_form_data<T, E>(
mut field: actix_multipart::Field,
term: FieldTerminator<T, E>,
form: Form<T, E>,
async fn handle_form_data<'a, T, E>(
field: actix_multipart::Field,
term: FieldTerminator<'a, T, E>,
form: &Form<T, E>,
) -> Result<MultipartContent<T>, Error>
where
T: 'static,
E: 'static,
{
trace!("In handle_form_data, term: {:?}", term);
let mut bytes = BytesMut::new();
tracing::trace!("In handle_form_data, term: {:?}", term);
let mut buf = Vec::new();
let mut stream = field.into_streamer();
while let Some(bytes) = stream.try_next().await? {
tracing::trace!("bytes from field");
if buf.len() + bytes.len() > form.max_field_size {
drop(buf);
while stream.try_next().await?.is_some() {
tracing::trace!("Dropping oversized bytes");
}
while let Some(res) = field.next().await {
let b = res?;
if bytes.len() + b.len() > form.max_field_size {
return Err(Error::FieldSize);
}
bytes.extend(b);
buf.push(bytes);
}
let bytes = match buf.len() {
0 => return Err(Error::FieldSize),
1 => buf.pop().expect("contains an element"),
_ => {
let total_length = buf.iter().map(|b| b.len()).sum();
let mut bytes = BytesMut::with_capacity(total_length);
for b in buf {
bytes.extend(b);
}
bytes.freeze()
}
};
tracing::debug!("Finished consuming field");
if let FieldTerminator::Bytes = term {
return Ok(MultipartContent::Bytes(bytes.freeze()));
return Ok(MultipartContent::Bytes(bytes));
}
let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?;
let s = std::str::from_utf8(&bytes).map_err(Error::ParseField)?;
match term {
FieldTerminator::Bytes | FieldTerminator::File(_) => Err(Error::FieldType),
FieldTerminator::Text => Ok(MultipartContent::Text(s)),
FieldTerminator::Text => Ok(MultipartContent::Text(String::from(s))),
FieldTerminator::Float => s
.parse()
.map_err(Error::ParseFloat)
@ -197,7 +222,7 @@ where
async fn handle_stream_field<T, E>(
field: actix_multipart::Field,
form: Form<T, E>,
form: Rc<Form<T, E>>,
) -> Result<Result<MultipartHash<T>, E>, Error>
where
T: 'static,
@ -214,21 +239,22 @@ where
let content = match term {
FieldTerminator::File(file_fn) => {
match handle_file_upload(field, content_disposition.filename, form, file_fn).await? {
match handle_file_upload(field, content_disposition.filename, &form, file_fn).await? {
Ok(content) => content,
Err(e) => return Ok(Err(e)),
}
}
term => handle_form_data(field, term, form.clone()).await?,
term => handle_form_data(field, term, &form).await?,
};
Ok(Ok((name, content)))
}
/// Handle multipart streams from Actix Web
#[tracing::instrument(level = "TRACE", skip_all)]
pub async fn handle_multipart<T, E>(
m: actix_multipart::Multipart,
form: Form<T, E>,
form: Rc<Form<T, E>>,
) -> Result<Result<Value<T>, E>, Error>
where
T: 'static,
@ -238,35 +264,111 @@ where
let mut file_count: u32 = 0;
let mut field_count: u32 = 0;
let mut unordered = FuturesUnordered::new();
let mut set = JoinSet::new();
let mut m = m.fuse();
let mut m = m.into_streamer();
loop {
select! {
opt = m.next() => {
if let Some(res) = opt {
unordered.push(handle_stream_field(res?, form.clone()));
let mut error: Option<Error> = None;
let mut provided_error: Option<E> = None;
let mut is_closed = false;
let mut stream_error = false;
'outer: loop {
tracing::trace!("multipart loop");
if error.is_some() || provided_error.is_some() {
set.abort_all();
if !stream_error {
while let Some(res) = m.next().await {
tracing::trace!("draining multipart field");
if let Ok(field) = res {
let mut stream = field.into_streamer();
while stream.next().await.is_some() {
tracing::trace!("Throwing away uploaded bytes, we have an error");
}
} else {
break;
}
}
}
opt = unordered.next() => {
while set.join_next().await.is_some() {
tracing::trace!("Throwing away joined result");
}
break 'outer;
}
tokio::select! {
opt = m.next(), if !is_closed => {
tracing::trace!("Selected stream");
is_closed = opt.is_none();
if let Some(res) = opt {
let (name_parts, content) = match res? {
Ok(tup) => tup,
Err(e) => return Ok(Err(e)),
match res {
Ok(field) => {
set.spawn_local(handle_stream_field(field, Rc::clone(&form)).instrument(tracing::trace_span!("multipart-field")));
},
Err(e) => {
is_closed = true;
stream_error = true;
error = Some(e.into());
continue 'outer;
}
}
}
}
opt = set.join_next(), if !set.is_empty() => {
tracing::trace!("Selected set");
if let Some(res) = opt {
let (name_parts, content) = match res {
Ok(Ok(Ok(tup))) => tup,
Ok(Ok(Err(e))) => {
provided_error = Some(e);
continue 'outer;
}
Ok(Err(e)) => {
error = Some(e);
continue 'outer;
},
Err(e) => {
error = Some(e.into());
continue 'outer;
},
};
let (l, r) = match count(&content, file_count, field_count, &form) {
Ok(tup) => tup,
Err(e) => {
error = Some(e);
continue 'outer;
}
};
let (l, r) = count(&content, file_count, field_count, &form)?;
file_count = l;
field_count = r;
multipart_form.push((name_parts, content));
}
}
complete => break,
else => {
break 'outer;
}
}
}
tracing::debug!("Finished consuming multipart");
if let Some(e) = provided_error {
return Ok(Err(e));
}
if let Some(e) = error {
return Err(e);
}
Ok(Ok(consolidate(multipart_form)))
}
@ -279,13 +381,13 @@ fn count<T, E>(
match content {
MultipartContent::File(_) => {
file_count += 1;
if file_count >= form.max_files {
if file_count > form.max_files {
return Err(Error::FileCount);
}
}
_ => {
field_count += 1;
if field_count >= form.max_fields {
if field_count > form.max_fields {
return Err(Error::FieldCount);
}
}