Compare commits
20 commits
Author | SHA1 | Date | |
---|---|---|---|
asonix | 9ab99f162a | ||
asonix | 298843c31c | ||
asonix | 1c88a70021 | ||
asonix | 5f38d493c6 | ||
asonix | 0d3fd3b19e | ||
asonix | 05161821b6 | ||
asonix | 0f7614ec3b | ||
asonix | 8b422644fb | ||
asonix | a91c503378 | ||
asonix | 9e0e425e14 | ||
asonix | fb46235530 | ||
asonix | d3b16438c9 | ||
asonix | f3096ac76a | ||
asonix | e9456945d9 | ||
asonix | e701ea0e56 | ||
asonix | 4ec8205934 | ||
asonix | 20f80df9fd | ||
asonix | 46e5834b60 | ||
asonix | 1be6074cf4 | ||
asonix | 9195d5623c |
55
.forgejo/workflows/check.yaml
Normal file
55
.forgejo/workflows/check.yaml
Normal file
|
@ -0,0 +1,55 @@
|
|||
on:
|
||||
push:
|
||||
branches:
|
||||
- '*'
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
clippy:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Clippy
|
||||
run: |
|
||||
cargo clippy --no-default-features -- -D warnings
|
||||
|
||||
tests:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Test
|
||||
run: cargo test
|
||||
|
||||
check:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
- x86_64-unknown-linux-musl
|
||||
- armv7-unknown-linux-musleabihf
|
||||
- aarch64-unknown-linux-musl
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Debug builds
|
||||
run: cargo zigbuild --target ${{ matrix.target }}
|
78
.forgejo/workflows/publish.yaml
Normal file
78
.forgejo/workflows/publish.yaml
Normal file
|
@ -0,0 +1,78 @@
|
|||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*.*.*'
|
||||
|
||||
jobs:
|
||||
clippy:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Clippy
|
||||
run: |
|
||||
cargo clippy --no-default-features -- -D warnings
|
||||
|
||||
tests:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Test
|
||||
run: cargo test
|
||||
|
||||
build:
|
||||
needs:
|
||||
- clippy
|
||||
- tests
|
||||
runs-on: base-image
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
- x86_64-unknown-linux-musl
|
||||
- armv7-unknown-linux-musleabihf
|
||||
- aarch64-unknown-linux-musl
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Compile actix-form-data
|
||||
run: cargo zigbuild --target ${{ matrix.target }}
|
||||
|
||||
publish-forgejo:
|
||||
needs: [build]
|
||||
runs-on: base-image
|
||||
steps:
|
||||
- uses: actions/forgejo-release@v1
|
||||
with:
|
||||
direction: upload
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
publish-crate:
|
||||
needs: [build]
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Publish Crate
|
||||
run: cargo publish --token ${{ secrets.CRATES_IO_TOKEN }}
|
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -2,3 +2,5 @@
|
|||
**/*.rs.bk
|
||||
Cargo.lock
|
||||
/examples/filename*.png
|
||||
/.envrc
|
||||
/.direnv
|
||||
|
|
16
Cargo.toml
16
Cargo.toml
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "actix-form-data"
|
||||
description = "Multipart Form Data for Actix Web"
|
||||
version = "0.7.0-beta.0"
|
||||
version = "0.7.0-beta.7"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/actix-form-data.git"
|
||||
|
@ -10,19 +10,21 @@ keywords = ["actix", "form-data", "multipart", "async"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
actix-multipart = "0.4.0"
|
||||
actix-rt = "2.5.0"
|
||||
actix-multipart = { version = "0.6.0", default-features = false }
|
||||
actix-web = { version = "4.0.0", default-features = false }
|
||||
futures-util = "0.3.17"
|
||||
futures-core = "0.3.28"
|
||||
mime = "0.3.16"
|
||||
streem = "0.2.0"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
|
||||
tracing = "0.1.15"
|
||||
|
||||
[dev-dependencies]
|
||||
async-fs = "1.2.1"
|
||||
actix-rt = "2.5.0"
|
||||
async-fs = "2.1.0"
|
||||
anyhow = "1.0"
|
||||
futures-lite = "1.4.0"
|
||||
futures-lite = "2.1.0"
|
||||
futures-util = "0.3.17"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
|
|
|
@ -11,8 +11,8 @@ impl FormData for UploadedContent {
|
|||
type Item = ();
|
||||
type Error = Error;
|
||||
|
||||
fn form(_: &HttpRequest) -> Form<Self::Item, Self::Error> {
|
||||
Form::new()
|
||||
fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
Ok(Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
|
@ -29,7 +29,7 @@ impl FormData for UploadedContent {
|
|||
}
|
||||
Ok(()) as Result<(), Error>
|
||||
})),
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
|
|
|
@ -61,11 +61,11 @@ impl FormData for UploadedContent {
|
|||
type Item = PathBuf;
|
||||
type Error = Errors;
|
||||
|
||||
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
|
||||
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
let file_count = req.app_data::<Arc<AtomicUsize>>().expect("Set config");
|
||||
let file_count = Arc::clone(file_count);
|
||||
|
||||
Form::new()
|
||||
Ok(Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
|
@ -85,7 +85,7 @@ impl FormData for UploadedContent {
|
|||
.map_err(Errors::from)
|
||||
}
|
||||
})),
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
|
|
61
flake.lock
Normal file
61
flake.lock
Normal file
|
@ -0,0 +1,61 @@
|
|||
{
|
||||
"nodes": {
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1701680307,
|
||||
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1702151865,
|
||||
"narHash": "sha256-9VAt19t6yQa7pHZLDbil/QctAgVsA66DLnzdRGqDisg=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "666fc80e7b2afb570462423cb0e1cf1a3a34fedd",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
27
flake.nix
Normal file
27
flake.nix
Normal file
|
@ -0,0 +1,27 @@
|
|||
{
|
||||
description = "A very basic flake";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
flake-utils.url = "github:numtide/flake-utils";
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-utils }:
|
||||
flake-utils.lib.eachDefaultSystem (system:
|
||||
let
|
||||
pkgs = import nixpkgs {
|
||||
inherit system;
|
||||
};
|
||||
in
|
||||
{
|
||||
packages.default = pkgs.hello;
|
||||
|
||||
devShell = with pkgs; mkShell {
|
||||
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
|
||||
|
||||
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
|
||||
};
|
||||
|
||||
formatter = pkgs.nixpkgs-fmt;
|
||||
});
|
||||
}
|
101
src/error.rs
101
src/error.rs
|
@ -19,12 +19,11 @@
|
|||
|
||||
use std::{
|
||||
num::{ParseFloatError, ParseIntError},
|
||||
string::FromUtf8Error,
|
||||
str::Utf8Error,
|
||||
};
|
||||
|
||||
use actix_multipart::MultipartError;
|
||||
use actix_web::{
|
||||
error::{PayloadError, ResponseError},
|
||||
error::{ParseError, PayloadError, ResponseError},
|
||||
http::StatusCode,
|
||||
HttpResponse,
|
||||
};
|
||||
|
@ -34,9 +33,9 @@ pub enum Error {
|
|||
#[error("Error parsing payload")]
|
||||
Payload(#[from] PayloadError),
|
||||
#[error("Error in multipart creation")]
|
||||
Multipart(MultipartError),
|
||||
Multipart(#[from] MultipartError),
|
||||
#[error("Failed to parse field")]
|
||||
ParseField(#[from] FromUtf8Error),
|
||||
ParseField(#[from] Utf8Error),
|
||||
#[error("Failed to parse int")]
|
||||
ParseInt(#[from] ParseIntError),
|
||||
#[error("Failed to parse float")]
|
||||
|
@ -59,11 +58,85 @@ pub enum Error {
|
|||
FileCount,
|
||||
#[error("File too large")]
|
||||
FileSize,
|
||||
#[error("Task panicked")]
|
||||
Panicked,
|
||||
}
|
||||
|
||||
impl From<MultipartError> for Error {
|
||||
fn from(m: MultipartError) -> Self {
|
||||
Error::Multipart(m)
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MultipartError {
|
||||
#[error("No Content-Disposition `form-data` header")]
|
||||
NoContentDisposition,
|
||||
#[error("No Content-Type header found")]
|
||||
NoContentType,
|
||||
#[error("Cannot parse Content-Type header")]
|
||||
ParseContentType,
|
||||
#[error("Multipart boundary is not found")]
|
||||
Boundary,
|
||||
#[error("Nested multipart is not supported")]
|
||||
Nested,
|
||||
#[error("Multipart stream is incomplete")]
|
||||
Incomplete,
|
||||
#[error("Failed parsing")]
|
||||
Parse(#[source] ParseError),
|
||||
#[error("Multipart stream is not consumed")]
|
||||
NotConsumed,
|
||||
#[error("An error occured processing field `{field_name}`: `{zource}`")]
|
||||
Field { field_name: String, zource: String },
|
||||
#[error("Duplicate field found for: `{0}")]
|
||||
DuplicateField(String),
|
||||
#[error("Field with name `{0}` is required")]
|
||||
MissingField(String),
|
||||
#[error("Unsupported field `{0}`")]
|
||||
UnsupportedField(String),
|
||||
#[error("Unknown error occured: {0}")]
|
||||
Unknown(String),
|
||||
}
|
||||
|
||||
impl From<actix_multipart::MultipartError> for Error {
|
||||
fn from(value: actix_multipart::MultipartError) -> Self {
|
||||
match value {
|
||||
actix_multipart::MultipartError::NoContentDisposition => {
|
||||
Error::Multipart(MultipartError::NoContentDisposition)
|
||||
}
|
||||
actix_multipart::MultipartError::NoContentType => {
|
||||
Error::Multipart(MultipartError::NoContentType)
|
||||
}
|
||||
actix_multipart::MultipartError::ParseContentType => {
|
||||
Error::Multipart(MultipartError::ParseContentType)
|
||||
}
|
||||
actix_multipart::MultipartError::Boundary => Error::Multipart(MultipartError::Boundary),
|
||||
actix_multipart::MultipartError::Nested => Error::Multipart(MultipartError::Nested),
|
||||
actix_multipart::MultipartError::Incomplete => {
|
||||
Error::Multipart(MultipartError::Incomplete)
|
||||
}
|
||||
actix_multipart::MultipartError::Parse(e) => Error::Multipart(MultipartError::Parse(e)),
|
||||
actix_multipart::MultipartError::Payload(e) => Error::Payload(e),
|
||||
actix_multipart::MultipartError::NotConsumed => {
|
||||
Error::Multipart(MultipartError::NotConsumed)
|
||||
}
|
||||
actix_multipart::MultipartError::Field { field_name, source } => {
|
||||
Error::Multipart(MultipartError::Field {
|
||||
field_name,
|
||||
zource: source.to_string(),
|
||||
})
|
||||
}
|
||||
actix_multipart::MultipartError::DuplicateField(s) => {
|
||||
Error::Multipart(MultipartError::DuplicateField(s))
|
||||
}
|
||||
actix_multipart::MultipartError::MissingField(s) => {
|
||||
Error::Multipart(MultipartError::MissingField(s))
|
||||
}
|
||||
actix_multipart::MultipartError::UnsupportedField(s) => {
|
||||
Error::Multipart(MultipartError::UnsupportedField(s))
|
||||
}
|
||||
e => Error::Multipart(MultipartError::Unknown(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::task::JoinError> for Error {
|
||||
fn from(_: tokio::task::JoinError) -> Self {
|
||||
Self::Panicked
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,6 +151,7 @@ impl ResponseError for Error {
|
|||
fn error_response(&self) -> HttpResponse {
|
||||
match *self {
|
||||
Error::Payload(ref e) => e.error_response(),
|
||||
Error::Panicked => HttpResponse::InternalServerError().finish(),
|
||||
Error::Multipart(_)
|
||||
| Error::ParseField(_)
|
||||
| Error::ParseInt(_)
|
||||
|
@ -94,3 +168,14 @@ impl ResponseError for Error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Error;
|
||||
|
||||
#[test]
|
||||
fn assert_send() {
|
||||
fn is_send<E: Send>() {}
|
||||
is_send::<Error>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,13 +3,13 @@ use crate::{
|
|||
upload::handle_multipart,
|
||||
};
|
||||
use actix_web::{dev::Payload, FromRequest, HttpRequest, ResponseError};
|
||||
use std::{future::Future, pin::Pin};
|
||||
use std::{future::Future, pin::Pin, rc::Rc};
|
||||
|
||||
pub trait FormData {
|
||||
type Item: 'static;
|
||||
type Error: ResponseError + 'static;
|
||||
|
||||
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error>;
|
||||
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error>;
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
where
|
||||
|
@ -27,14 +27,16 @@ where
|
|||
|
||||
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
|
||||
let multipart = actix_multipart::Multipart::new(req.headers(), payload.take());
|
||||
let form = T::form(req);
|
||||
let res = T::form(req);
|
||||
|
||||
Box::pin(async move {
|
||||
let uploaded = match handle_multipart(multipart, &form).await {
|
||||
let form = Rc::new(res?);
|
||||
|
||||
let uploaded = match handle_multipart(multipart, Rc::clone(&form)).await {
|
||||
Ok(Ok(uploaded)) => uploaded,
|
||||
Ok(Err(e)) => return Err(e.into()),
|
||||
Err(e) => {
|
||||
if let Some(f) = form.transform_error {
|
||||
if let Some(f) = &form.transform_error {
|
||||
return Err((f)(e));
|
||||
} else {
|
||||
return Err(e.into());
|
||||
|
|
|
@ -28,19 +28,18 @@
|
|||
//! use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
|
||||
//! use actix_web::{
|
||||
//! web::{post, resource},
|
||||
//! App, HttpResponse, HttpServer,
|
||||
//! App, HttpRequest, HttpResponse, HttpServer,
|
||||
//! };
|
||||
//! use futures_util::stream::StreamExt;
|
||||
//!
|
||||
//! struct UploadedContent(Value<()>);
|
||||
//!
|
||||
//! impl FormData for UploadedContent {
|
||||
//! type Config = ();
|
||||
//! type Item = ();
|
||||
//! type Error = Error;
|
||||
//!
|
||||
//! fn form(_: Option<&()>) -> Form<Self::Item, Self::Error> {
|
||||
//! Form::new()
|
||||
//! fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
//! Ok(Form::new()
|
||||
//! .field("Hey", Field::text())
|
||||
//! .field(
|
||||
//! "Hi",
|
||||
|
@ -57,7 +56,7 @@
|
|||
//! }
|
||||
//! Ok(()) as Result<(), Error>
|
||||
//! })),
|
||||
//! )
|
||||
//! ))
|
||||
//! }
|
||||
//!
|
||||
//! fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
|
|
18
src/types.rs
18
src/types.rs
|
@ -19,7 +19,7 @@
|
|||
|
||||
use crate::Error;
|
||||
use actix_web::web::Bytes;
|
||||
use futures_util::Stream;
|
||||
use futures_core::Stream;
|
||||
use mime::Mime;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
|
@ -32,7 +32,7 @@ use tracing::trace;
|
|||
#[derive(Debug)]
|
||||
pub struct FileMeta<T> {
|
||||
pub filename: String,
|
||||
pub content_type: Mime,
|
||||
pub content_type: Option<Mime>,
|
||||
pub result: T,
|
||||
}
|
||||
|
||||
|
@ -154,10 +154,10 @@ impl<T> From<MultipartContent<T>> for Value<T> {
|
|||
|
||||
pub type FileFn<T, E> = Box<
|
||||
dyn Fn(
|
||||
String,
|
||||
Mime,
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>
|
||||
String,
|
||||
Option<Mime>,
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
|
||||
>;
|
||||
|
||||
/// The field type represents a field in the form-data that is allowed to be parsed.
|
||||
|
@ -215,7 +215,7 @@ impl<T, E> Field<T, E> {
|
|||
/// ```
|
||||
pub fn file<F, Fut>(f: F) -> Self
|
||||
where
|
||||
F: Fn(String, Mime, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
|
||||
F: Fn(String, Option<Mime>, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
|
||||
+ Clone
|
||||
+ 'static,
|
||||
Fut: Future<Output = Result<T, E>> + 'static,
|
||||
|
@ -437,8 +437,8 @@ impl<T, E> Map<T, E> {
|
|||
Some(NamePart::Map(name_part)) => self
|
||||
.inner
|
||||
.iter()
|
||||
.find(|&&(ref item, _)| *item == *name_part)
|
||||
.and_then(|&(_, ref field)| field.valid_field(name)),
|
||||
.find(|(ref item, _)| *item == *name_part)
|
||||
.and_then(|(_, ref field)| field.valid_field(name)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
|
244
src/upload.rs
244
src/upload.rs
|
@ -25,19 +25,10 @@ use crate::{
|
|||
},
|
||||
};
|
||||
use actix_web::web::BytesMut;
|
||||
use futures_util::{
|
||||
select,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tracing::trace;
|
||||
use std::{collections::HashMap, path::Path, rc::Rc};
|
||||
use streem::IntoStreamer;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::Instrument;
|
||||
|
||||
fn consolidate<T>(mf: MultipartForm<T>) -> Value<T> {
|
||||
mf.into_iter().fold(
|
||||
|
@ -76,16 +67,13 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
|
|||
NamePart::Map(part.to_owned())
|
||||
}
|
||||
})
|
||||
.fold(Ok(vec![]), |acc, part| match acc {
|
||||
Ok(mut v) => {
|
||||
if v.is_empty() && !part.is_map() {
|
||||
return Err(Error::ContentDisposition);
|
||||
}
|
||||
|
||||
v.push(part);
|
||||
Ok(v)
|
||||
.try_fold(vec![], |mut v, part| {
|
||||
if v.is_empty() && !part.is_map() {
|
||||
return Err(Error::ContentDisposition);
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
||||
v.push(part);
|
||||
Ok(v)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -115,32 +103,42 @@ where
|
|||
|
||||
let filename = filename.ok_or(Error::Filename)?.to_owned();
|
||||
|
||||
let file_size = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let content_type = field.content_type().clone();
|
||||
let content_type = field.content_type().cloned();
|
||||
|
||||
let max_file_size = form.max_file_size;
|
||||
|
||||
let field_stream = streem::try_from_fn(move |yielder| async move {
|
||||
let mut file_size = 0;
|
||||
|
||||
let mut stream = field.into_streamer();
|
||||
|
||||
while let Some(bytes) = stream.try_next().await? {
|
||||
tracing::trace!("Bytes from field");
|
||||
|
||||
file_size += bytes.len();
|
||||
|
||||
if file_size > max_file_size {
|
||||
drop(bytes);
|
||||
|
||||
while stream.try_next().await?.is_some() {
|
||||
tracing::trace!("Dropping oversized bytes");
|
||||
}
|
||||
|
||||
return Err(Error::FileSize);
|
||||
}
|
||||
|
||||
yielder.yield_ok(bytes).await;
|
||||
}
|
||||
|
||||
tracing::debug!("Finished consuming field");
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let result = file_fn(
|
||||
filename.clone(),
|
||||
content_type.clone(),
|
||||
Box::pin(field.then(move |res| {
|
||||
let file_size = file_size.clone();
|
||||
async move {
|
||||
match res {
|
||||
Ok(bytes) => {
|
||||
let size = file_size.fetch_add(bytes.len(), Ordering::Relaxed);
|
||||
|
||||
if size + bytes.len() > max_file_size {
|
||||
return Err(Error::FileSize);
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
})),
|
||||
Box::pin(field_stream),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -155,7 +153,7 @@ where
|
|||
}
|
||||
|
||||
async fn handle_form_data<'a, T, E>(
|
||||
mut field: actix_multipart::Field,
|
||||
field: actix_multipart::Field,
|
||||
term: FieldTerminator<'a, T, E>,
|
||||
form: &Form<T, E>,
|
||||
) -> Result<MultipartContent<T>, Error>
|
||||
|
@ -163,27 +161,54 @@ where
|
|||
T: 'static,
|
||||
E: 'static,
|
||||
{
|
||||
trace!("In handle_form_data, term: {:?}", term);
|
||||
let mut bytes = BytesMut::new();
|
||||
tracing::trace!("In handle_form_data, term: {:?}", term);
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let mut stream = field.into_streamer();
|
||||
|
||||
while let Some(bytes) = stream.try_next().await? {
|
||||
tracing::trace!("bytes from field");
|
||||
|
||||
if buf.len() + bytes.len() > form.max_field_size {
|
||||
drop(buf);
|
||||
|
||||
while stream.try_next().await?.is_some() {
|
||||
tracing::trace!("Dropping oversized bytes");
|
||||
}
|
||||
|
||||
while let Some(res) = field.next().await {
|
||||
let b = res?;
|
||||
if bytes.len() + b.len() > form.max_field_size {
|
||||
return Err(Error::FieldSize);
|
||||
}
|
||||
|
||||
bytes.extend(b);
|
||||
buf.push(bytes);
|
||||
}
|
||||
|
||||
let bytes = match buf.len() {
|
||||
0 => return Err(Error::FieldSize),
|
||||
1 => buf.pop().expect("contains an element"),
|
||||
_ => {
|
||||
let total_length = buf.iter().map(|b| b.len()).sum();
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(total_length);
|
||||
|
||||
for b in buf {
|
||||
bytes.extend(b);
|
||||
}
|
||||
|
||||
bytes.freeze()
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!("Finished consuming field");
|
||||
|
||||
if let FieldTerminator::Bytes = term {
|
||||
return Ok(MultipartContent::Bytes(bytes.freeze()));
|
||||
return Ok(MultipartContent::Bytes(bytes));
|
||||
}
|
||||
|
||||
let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?;
|
||||
let s = std::str::from_utf8(&bytes).map_err(Error::ParseField)?;
|
||||
|
||||
match term {
|
||||
FieldTerminator::Bytes | FieldTerminator::File(_) => Err(Error::FieldType),
|
||||
FieldTerminator::Text => Ok(MultipartContent::Text(s)),
|
||||
FieldTerminator::Text => Ok(MultipartContent::Text(String::from(s))),
|
||||
FieldTerminator::Float => s
|
||||
.parse()
|
||||
.map_err(Error::ParseFloat)
|
||||
|
@ -197,7 +222,7 @@ where
|
|||
|
||||
async fn handle_stream_field<T, E>(
|
||||
field: actix_multipart::Field,
|
||||
form: &Form<T, E>,
|
||||
form: Rc<Form<T, E>>,
|
||||
) -> Result<Result<MultipartHash<T>, E>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
|
@ -214,21 +239,22 @@ where
|
|||
|
||||
let content = match term {
|
||||
FieldTerminator::File(file_fn) => {
|
||||
match handle_file_upload(field, content_disposition.filename, form, file_fn).await? {
|
||||
match handle_file_upload(field, content_disposition.filename, &form, file_fn).await? {
|
||||
Ok(content) => content,
|
||||
Err(e) => return Ok(Err(e)),
|
||||
}
|
||||
}
|
||||
term => handle_form_data(field, term, form).await?,
|
||||
term => handle_form_data(field, term, &form).await?,
|
||||
};
|
||||
|
||||
Ok(Ok((name, content)))
|
||||
}
|
||||
|
||||
/// Handle multipart streams from Actix Web
|
||||
#[tracing::instrument(level = "TRACE", skip_all)]
|
||||
pub async fn handle_multipart<T, E>(
|
||||
m: actix_multipart::Multipart,
|
||||
form: &Form<T, E>,
|
||||
form: Rc<Form<T, E>>,
|
||||
) -> Result<Result<Value<T>, E>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
|
@ -238,35 +264,111 @@ where
|
|||
let mut file_count: u32 = 0;
|
||||
let mut field_count: u32 = 0;
|
||||
|
||||
let mut unordered = FuturesUnordered::new();
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
let mut m = m.fuse();
|
||||
let mut m = m.into_streamer();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
opt = m.next() => {
|
||||
if let Some(res) = opt {
|
||||
unordered.push(handle_stream_field(res?, form));
|
||||
let mut error: Option<Error> = None;
|
||||
let mut provided_error: Option<E> = None;
|
||||
let mut is_closed = false;
|
||||
let mut stream_error = false;
|
||||
|
||||
'outer: loop {
|
||||
tracing::trace!("multipart loop");
|
||||
|
||||
if error.is_some() || provided_error.is_some() {
|
||||
set.abort_all();
|
||||
|
||||
if !stream_error {
|
||||
while let Some(res) = m.next().await {
|
||||
tracing::trace!("draining multipart field");
|
||||
|
||||
if let Ok(field) = res {
|
||||
let mut stream = field.into_streamer();
|
||||
while stream.next().await.is_some() {
|
||||
tracing::trace!("Throwing away uploaded bytes, we have an error");
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
opt = unordered.next() => {
|
||||
|
||||
while set.join_next().await.is_some() {
|
||||
tracing::trace!("Throwing away joined result");
|
||||
}
|
||||
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
opt = m.next(), if !is_closed => {
|
||||
tracing::trace!("Selected stream");
|
||||
is_closed = opt.is_none();
|
||||
|
||||
if let Some(res) = opt {
|
||||
let (name_parts, content) = match res? {
|
||||
Ok(tup) => tup,
|
||||
Err(e) => return Ok(Err(e)),
|
||||
match res {
|
||||
Ok(field) => {
|
||||
set.spawn_local(handle_stream_field(field, Rc::clone(&form)).instrument(tracing::trace_span!("multipart-field")));
|
||||
},
|
||||
Err(e) => {
|
||||
is_closed = true;
|
||||
stream_error = true;
|
||||
error = Some(e.into());
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
opt = set.join_next(), if !set.is_empty() => {
|
||||
tracing::trace!("Selected set");
|
||||
if let Some(res) = opt {
|
||||
let (name_parts, content) = match res {
|
||||
Ok(Ok(Ok(tup))) => tup,
|
||||
Ok(Ok(Err(e))) => {
|
||||
provided_error = Some(e);
|
||||
continue 'outer;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error = Some(e);
|
||||
continue 'outer;
|
||||
},
|
||||
Err(e) => {
|
||||
error = Some(e.into());
|
||||
continue 'outer;
|
||||
},
|
||||
};
|
||||
|
||||
let (l, r) = match count(&content, file_count, field_count, &form) {
|
||||
Ok(tup) => tup,
|
||||
Err(e) => {
|
||||
error = Some(e);
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
|
||||
let (l, r) = count(&content, file_count, field_count, form)?;
|
||||
file_count = l;
|
||||
field_count = r;
|
||||
|
||||
multipart_form.push((name_parts, content));
|
||||
}
|
||||
}
|
||||
complete => break,
|
||||
else => {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Finished consuming multipart");
|
||||
|
||||
if let Some(e) = provided_error {
|
||||
return Ok(Err(e));
|
||||
}
|
||||
|
||||
if let Some(e) = error {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(Ok(consolidate(multipart_form)))
|
||||
}
|
||||
|
||||
|
@ -279,13 +381,13 @@ fn count<T, E>(
|
|||
match content {
|
||||
MultipartContent::File(_) => {
|
||||
file_count += 1;
|
||||
if file_count >= form.max_files {
|
||||
if file_count > form.max_files {
|
||||
return Err(Error::FileCount);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
field_count += 1;
|
||||
if field_count >= form.max_fields {
|
||||
if field_count > form.max_fields {
|
||||
return Err(Error::FieldCount);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue