Compare commits
28 commits
v0.6.0-bet
...
main
Author | SHA1 | Date | |
---|---|---|---|
asonix | 9ab99f162a | ||
asonix | 298843c31c | ||
asonix | 1c88a70021 | ||
asonix | 5f38d493c6 | ||
asonix | 0d3fd3b19e | ||
asonix | 05161821b6 | ||
asonix | 0f7614ec3b | ||
asonix | 8b422644fb | ||
asonix | a91c503378 | ||
asonix | 9e0e425e14 | ||
asonix | fb46235530 | ||
asonix | d3b16438c9 | ||
asonix | f3096ac76a | ||
asonix | e9456945d9 | ||
asonix | e701ea0e56 | ||
asonix | 4ec8205934 | ||
asonix | 20f80df9fd | ||
asonix | 46e5834b60 | ||
asonix | 1be6074cf4 | ||
asonix | 9195d5623c | ||
asonix | a2e1ffa091 | ||
asonix | 3525bcd09c | ||
asonix | c5265d286e | ||
Aode (Lion) | 4cbc7cb78e | ||
Aode (Lion) | a36f9a9411 | ||
Aode (Lion) | ec85a80f5d | ||
Aode (lion) | 493d99f0ed | ||
Aode (Lion) | 4dd4b8db41 |
55
.forgejo/workflows/check.yaml
Normal file
55
.forgejo/workflows/check.yaml
Normal file
|
@ -0,0 +1,55 @@
|
|||
on:
|
||||
push:
|
||||
branches:
|
||||
- '*'
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
|
||||
jobs:
|
||||
clippy:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Clippy
|
||||
run: |
|
||||
cargo clippy --no-default-features -- -D warnings
|
||||
|
||||
tests:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Test
|
||||
run: cargo test
|
||||
|
||||
check:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
- x86_64-unknown-linux-musl
|
||||
- armv7-unknown-linux-musleabihf
|
||||
- aarch64-unknown-linux-musl
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Debug builds
|
||||
run: cargo zigbuild --target ${{ matrix.target }}
|
78
.forgejo/workflows/publish.yaml
Normal file
78
.forgejo/workflows/publish.yaml
Normal file
|
@ -0,0 +1,78 @@
|
|||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*.*.*'
|
||||
|
||||
jobs:
|
||||
clippy:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Clippy
|
||||
run: |
|
||||
cargo clippy --no-default-features -- -D warnings
|
||||
|
||||
tests:
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Test
|
||||
run: cargo test
|
||||
|
||||
build:
|
||||
needs:
|
||||
- clippy
|
||||
- tests
|
||||
runs-on: base-image
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
- x86_64-unknown-linux-musl
|
||||
- armv7-unknown-linux-musleabihf
|
||||
- aarch64-unknown-linux-musl
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Compile actix-form-data
|
||||
run: cargo zigbuild --target ${{ matrix.target }}
|
||||
|
||||
publish-forgejo:
|
||||
needs: [build]
|
||||
runs-on: base-image
|
||||
steps:
|
||||
- uses: actions/forgejo-release@v1
|
||||
with:
|
||||
direction: upload
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
publish-crate:
|
||||
needs: [build]
|
||||
runs-on: base-image
|
||||
steps:
|
||||
-
|
||||
name: Checkout actix-form-data
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
-
|
||||
name: Cargo Cache
|
||||
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
|
||||
-
|
||||
name: Publish Crate
|
||||
run: cargo publish --token ${{ secrets.CRATES_IO_TOKEN }}
|
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -2,3 +2,5 @@
|
|||
**/*.rs.bk
|
||||
Cargo.lock
|
||||
/examples/filename*.png
|
||||
/.envrc
|
||||
/.direnv
|
||||
|
|
28
Cargo.toml
28
Cargo.toml
|
@ -1,29 +1,35 @@
|
|||
[package]
|
||||
name = "actix-form-data"
|
||||
description = "Multipart Form Data for Actix Web"
|
||||
version = "0.6.0-beta.10"
|
||||
version = "0.7.0-beta.7"
|
||||
license = "GPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/actix-form-data.git"
|
||||
repository = "https://git.asonix.dog/asonix/actix-form-data.git"
|
||||
readme = "README.md"
|
||||
keywords = ["actix", "form-data", "multipart", "async"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
actix-multipart = "0.4.0-beta.7"
|
||||
actix-rt = "2.3.0"
|
||||
actix-web = { version = "4.0.0-beta.10", default-features = false }
|
||||
futures-util = "0.3.17"
|
||||
actix-multipart = { version = "0.6.0", default-features = false }
|
||||
actix-web = { version = "4.0.0", default-features = false }
|
||||
futures-core = "0.3.28"
|
||||
mime = "0.3.16"
|
||||
streem = "0.2.0"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
|
||||
tracing = "0.1.15"
|
||||
|
||||
[dev-dependencies]
|
||||
async-fs = "1.2.1"
|
||||
actix-rt = "2.5.0"
|
||||
async-fs = "2.1.0"
|
||||
anyhow = "1.0"
|
||||
futures-lite = "1.4.0"
|
||||
futures-lite = "2.1.0"
|
||||
futures-util = "0.3.17"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
|
||||
tracing-subscriber = { version = "0.3.9", features = [
|
||||
"env-filter",
|
||||
"fmt",
|
||||
"tracing-log",
|
||||
] }
|
||||
|
|
|
@ -13,8 +13,8 @@ Add it to your dependencies.
|
|||
# Cargo.toml
|
||||
|
||||
[dependencies]
|
||||
actix-web = "4.0.0-beta.3"
|
||||
actix-form-data = "0.6.0-beta.1"
|
||||
actix-web = "4.0.0"
|
||||
actix-form-data = "0.6.0"
|
||||
```
|
||||
|
||||
Require it in your project.
|
||||
|
@ -41,7 +41,7 @@ App::new()
|
|||
In your handler, get the value
|
||||
|
||||
```rust
|
||||
async fn upload(value: Value) -> {
|
||||
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
|
||||
...
|
||||
}
|
||||
```
|
||||
|
@ -67,7 +67,7 @@ use actix_web::{
|
|||
web::{post, resource},
|
||||
App, HttpResponse, HttpServer,
|
||||
};
|
||||
use futures::stream::StreamExt;
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
|
||||
println!("Uploaded Content: {:#?}", uploaded_content);
|
||||
|
|
|
@ -1,46 +1,56 @@
|
|||
use actix_form_data::{Error, Field, Form, Value};
|
||||
use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
|
||||
use actix_web::{
|
||||
web::{post, resource},
|
||||
App, HttpResponse, HttpServer,
|
||||
App, HttpRequest, HttpResponse, HttpServer,
|
||||
};
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
async fn upload(uploaded_content: Value<()>) -> HttpResponse {
|
||||
println!("Uploaded Content: {:#?}", uploaded_content);
|
||||
struct UploadedContent(Value<()>);
|
||||
|
||||
impl FormData for UploadedContent {
|
||||
type Item = ();
|
||||
type Error = Error;
|
||||
|
||||
fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
Ok(Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
Field::map()
|
||||
.field("One", Field::int())
|
||||
.field("Two", Field::float())
|
||||
.finalize(),
|
||||
)
|
||||
.field(
|
||||
"files",
|
||||
Field::array(Field::file(|_, _, mut stream| async move {
|
||||
while let Some(res) = stream.next().await {
|
||||
res?;
|
||||
}
|
||||
Ok(()) as Result<(), Error>
|
||||
})),
|
||||
))
|
||||
}
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(UploadedContent(value))
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
|
||||
println!("Uploaded Content: {:#?}", value);
|
||||
HttpResponse::Created().finish()
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), anyhow::Error> {
|
||||
let form = Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
Field::map()
|
||||
.field("One", Field::int())
|
||||
.field("Two", Field::float())
|
||||
.finalize(),
|
||||
)
|
||||
.field(
|
||||
"files",
|
||||
Field::array(Field::file(|_, _, mut stream| async move {
|
||||
while let Some(res) = stream.next().await {
|
||||
res?;
|
||||
}
|
||||
Ok(()) as Result<(), Error>
|
||||
})),
|
||||
);
|
||||
|
||||
println!("{:?}", form);
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(form.clone())
|
||||
.service(resource("/upload").route(post().to(upload)))
|
||||
})
|
||||
.bind("127.0.0.1:8080")?
|
||||
.run()
|
||||
.await?;
|
||||
HttpServer::new(move || App::new().service(resource("/upload").route(post().to(upload))))
|
||||
.bind("127.0.0.1:8080")?
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use actix_form_data::{Error, Field, Form, Value};
|
||||
use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
|
||||
use actix_web::{
|
||||
http::StatusCode,
|
||||
middleware::Logger,
|
||||
web::{post, resource, Bytes},
|
||||
App, HttpResponse, HttpServer, ResponseError,
|
||||
App, HttpRequest, HttpResponse, HttpServer, ResponseError,
|
||||
};
|
||||
use futures_util::stream::{Stream, StreamExt, TryStreamExt};
|
||||
use std::{
|
||||
|
@ -17,11 +17,6 @@ use std::{
|
|||
};
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct AppState {
|
||||
form: Form<PathBuf, Errors>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
struct JsonError {
|
||||
msg: String,
|
||||
|
@ -60,8 +55,49 @@ impl ResponseError for Errors {
|
|||
}
|
||||
}
|
||||
|
||||
async fn upload(uploaded_content: Value<PathBuf>) -> HttpResponse {
|
||||
info!("Uploaded Content: {:#?}", uploaded_content);
|
||||
struct UploadedContent(Value<PathBuf>);
|
||||
|
||||
impl FormData for UploadedContent {
|
||||
type Item = PathBuf;
|
||||
type Error = Errors;
|
||||
|
||||
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
let file_count = req.app_data::<Arc<AtomicUsize>>().expect("Set config");
|
||||
let file_count = Arc::clone(file_count);
|
||||
|
||||
Ok(Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
Field::map()
|
||||
.field("One", Field::int())
|
||||
.field("Two", Field::float())
|
||||
.finalize(),
|
||||
)
|
||||
.field(
|
||||
"files",
|
||||
Field::array(Field::file(move |_filename, _content_type, stream| {
|
||||
let count = file_count.fetch_add(1, Ordering::Relaxed);
|
||||
async move {
|
||||
save_file(stream, count)
|
||||
.await
|
||||
.map(PathBuf::from)
|
||||
.map_err(Errors::from)
|
||||
}
|
||||
})),
|
||||
))
|
||||
}
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(UploadedContent(value))
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
|
||||
info!("Uploaded Content: {:#?}", value);
|
||||
|
||||
HttpResponse::Created().finish()
|
||||
}
|
||||
|
@ -96,34 +132,10 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
|
||||
let file_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let form = Form::new()
|
||||
.field("Hey", Field::text())
|
||||
.field(
|
||||
"Hi",
|
||||
Field::map()
|
||||
.field("One", Field::int())
|
||||
.field("Two", Field::float())
|
||||
.finalize(),
|
||||
)
|
||||
.field(
|
||||
"files",
|
||||
Field::array(Field::file(move |_filename, _content_type, stream| {
|
||||
let count = file_count.clone().fetch_add(1, Ordering::Relaxed);
|
||||
async move {
|
||||
save_file(stream, count)
|
||||
.await
|
||||
.map(PathBuf::from)
|
||||
.map_err(Errors::from)
|
||||
}
|
||||
})),
|
||||
);
|
||||
|
||||
info!("{:#?}", form);
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(form.clone())
|
||||
.wrap(Logger::default())
|
||||
.app_data(file_count.clone())
|
||||
.service(resource("/upload").route(post().to(upload)))
|
||||
})
|
||||
.bind("127.0.0.1:8080")?
|
||||
|
|
61
flake.lock
Normal file
61
flake.lock
Normal file
|
@ -0,0 +1,61 @@
|
|||
{
|
||||
"nodes": {
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1701680307,
|
||||
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1702151865,
|
||||
"narHash": "sha256-9VAt19t6yQa7pHZLDbil/QctAgVsA66DLnzdRGqDisg=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "666fc80e7b2afb570462423cb0e1cf1a3a34fedd",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
27
flake.nix
Normal file
27
flake.nix
Normal file
|
@ -0,0 +1,27 @@
|
|||
{
|
||||
description = "A very basic flake";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
flake-utils.url = "github:numtide/flake-utils";
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-utils }:
|
||||
flake-utils.lib.eachDefaultSystem (system:
|
||||
let
|
||||
pkgs = import nixpkgs {
|
||||
inherit system;
|
||||
};
|
||||
in
|
||||
{
|
||||
packages.default = pkgs.hello;
|
||||
|
||||
devShell = with pkgs; mkShell {
|
||||
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
|
||||
|
||||
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
|
||||
};
|
||||
|
||||
formatter = pkgs.nixpkgs-fmt;
|
||||
});
|
||||
}
|
111
src/error.rs
111
src/error.rs
|
@ -19,27 +19,26 @@
|
|||
|
||||
use std::{
|
||||
num::{ParseFloatError, ParseIntError},
|
||||
string::FromUtf8Error,
|
||||
str::Utf8Error,
|
||||
};
|
||||
|
||||
use actix_multipart::MultipartError;
|
||||
use actix_web::{
|
||||
error::{PayloadError, ResponseError},
|
||||
error::{ParseError, PayloadError, ResponseError},
|
||||
http::StatusCode,
|
||||
HttpResponse,
|
||||
};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("Error parsing payload, {0}")]
|
||||
#[error("Error parsing payload")]
|
||||
Payload(#[from] PayloadError),
|
||||
#[error("Error in multipart creation, {0}")]
|
||||
Multipart(MultipartError),
|
||||
#[error("Failed to parse field, {0}")]
|
||||
ParseField(#[from] FromUtf8Error),
|
||||
#[error("Failed to parse int, {0}")]
|
||||
#[error("Error in multipart creation")]
|
||||
Multipart(#[from] MultipartError),
|
||||
#[error("Failed to parse field")]
|
||||
ParseField(#[from] Utf8Error),
|
||||
#[error("Failed to parse int")]
|
||||
ParseInt(#[from] ParseIntError),
|
||||
#[error("Failed to parse float, {0}")]
|
||||
#[error("Failed to parse float")]
|
||||
ParseFloat(#[from] ParseFloatError),
|
||||
#[error("Bad Content-Type")]
|
||||
ContentType,
|
||||
|
@ -59,11 +58,85 @@ pub enum Error {
|
|||
FileCount,
|
||||
#[error("File too large")]
|
||||
FileSize,
|
||||
#[error("Task panicked")]
|
||||
Panicked,
|
||||
}
|
||||
|
||||
impl From<MultipartError> for Error {
|
||||
fn from(m: MultipartError) -> Self {
|
||||
Error::Multipart(m)
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MultipartError {
|
||||
#[error("No Content-Disposition `form-data` header")]
|
||||
NoContentDisposition,
|
||||
#[error("No Content-Type header found")]
|
||||
NoContentType,
|
||||
#[error("Cannot parse Content-Type header")]
|
||||
ParseContentType,
|
||||
#[error("Multipart boundary is not found")]
|
||||
Boundary,
|
||||
#[error("Nested multipart is not supported")]
|
||||
Nested,
|
||||
#[error("Multipart stream is incomplete")]
|
||||
Incomplete,
|
||||
#[error("Failed parsing")]
|
||||
Parse(#[source] ParseError),
|
||||
#[error("Multipart stream is not consumed")]
|
||||
NotConsumed,
|
||||
#[error("An error occured processing field `{field_name}`: `{zource}`")]
|
||||
Field { field_name: String, zource: String },
|
||||
#[error("Duplicate field found for: `{0}")]
|
||||
DuplicateField(String),
|
||||
#[error("Field with name `{0}` is required")]
|
||||
MissingField(String),
|
||||
#[error("Unsupported field `{0}`")]
|
||||
UnsupportedField(String),
|
||||
#[error("Unknown error occured: {0}")]
|
||||
Unknown(String),
|
||||
}
|
||||
|
||||
impl From<actix_multipart::MultipartError> for Error {
|
||||
fn from(value: actix_multipart::MultipartError) -> Self {
|
||||
match value {
|
||||
actix_multipart::MultipartError::NoContentDisposition => {
|
||||
Error::Multipart(MultipartError::NoContentDisposition)
|
||||
}
|
||||
actix_multipart::MultipartError::NoContentType => {
|
||||
Error::Multipart(MultipartError::NoContentType)
|
||||
}
|
||||
actix_multipart::MultipartError::ParseContentType => {
|
||||
Error::Multipart(MultipartError::ParseContentType)
|
||||
}
|
||||
actix_multipart::MultipartError::Boundary => Error::Multipart(MultipartError::Boundary),
|
||||
actix_multipart::MultipartError::Nested => Error::Multipart(MultipartError::Nested),
|
||||
actix_multipart::MultipartError::Incomplete => {
|
||||
Error::Multipart(MultipartError::Incomplete)
|
||||
}
|
||||
actix_multipart::MultipartError::Parse(e) => Error::Multipart(MultipartError::Parse(e)),
|
||||
actix_multipart::MultipartError::Payload(e) => Error::Payload(e),
|
||||
actix_multipart::MultipartError::NotConsumed => {
|
||||
Error::Multipart(MultipartError::NotConsumed)
|
||||
}
|
||||
actix_multipart::MultipartError::Field { field_name, source } => {
|
||||
Error::Multipart(MultipartError::Field {
|
||||
field_name,
|
||||
zource: source.to_string(),
|
||||
})
|
||||
}
|
||||
actix_multipart::MultipartError::DuplicateField(s) => {
|
||||
Error::Multipart(MultipartError::DuplicateField(s))
|
||||
}
|
||||
actix_multipart::MultipartError::MissingField(s) => {
|
||||
Error::Multipart(MultipartError::MissingField(s))
|
||||
}
|
||||
actix_multipart::MultipartError::UnsupportedField(s) => {
|
||||
Error::Multipart(MultipartError::UnsupportedField(s))
|
||||
}
|
||||
e => Error::Multipart(MultipartError::Unknown(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::task::JoinError> for Error {
|
||||
fn from(_: tokio::task::JoinError) -> Self {
|
||||
Self::Panicked
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,6 +151,7 @@ impl ResponseError for Error {
|
|||
fn error_response(&self) -> HttpResponse {
|
||||
match *self {
|
||||
Error::Payload(ref e) => e.error_response(),
|
||||
Error::Panicked => HttpResponse::InternalServerError().finish(),
|
||||
Error::Multipart(_)
|
||||
| Error::ParseField(_)
|
||||
| Error::ParseInt(_)
|
||||
|
@ -94,3 +168,14 @@ impl ResponseError for Error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Error;
|
||||
|
||||
#[test]
|
||||
fn assert_send() {
|
||||
fn is_send<E: Send>() {}
|
||||
is_send::<Error>();
|
||||
}
|
||||
}
|
||||
|
|
50
src/extractor.rs
Normal file
50
src/extractor.rs
Normal file
|
@ -0,0 +1,50 @@
|
|||
use crate::{
|
||||
types::{Form, Value},
|
||||
upload::handle_multipart,
|
||||
};
|
||||
use actix_web::{dev::Payload, FromRequest, HttpRequest, ResponseError};
|
||||
use std::{future::Future, pin::Pin, rc::Rc};
|
||||
|
||||
pub trait FormData {
|
||||
type Item: 'static;
|
||||
type Error: ResponseError + 'static;
|
||||
|
||||
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error>;
|
||||
|
||||
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
pub struct Multipart<T>(pub T);
|
||||
|
||||
impl<T> FromRequest for Multipart<T>
|
||||
where
|
||||
T: FormData,
|
||||
{
|
||||
type Error = actix_web::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>> + 'static>>;
|
||||
|
||||
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
|
||||
let multipart = actix_multipart::Multipart::new(req.headers(), payload.take());
|
||||
let res = T::form(req);
|
||||
|
||||
Box::pin(async move {
|
||||
let form = Rc::new(res?);
|
||||
|
||||
let uploaded = match handle_multipart(multipart, Rc::clone(&form)).await {
|
||||
Ok(Ok(uploaded)) => uploaded,
|
||||
Ok(Err(e)) => return Err(e.into()),
|
||||
Err(e) => {
|
||||
if let Some(f) = &form.transform_error {
|
||||
return Err((f)(e));
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Multipart(T::extract(uploaded)?))
|
||||
})
|
||||
}
|
||||
}
|
80
src/lib.rs
80
src/lib.rs
|
@ -25,62 +25,72 @@
|
|||
//! # Example
|
||||
//!
|
||||
//!```rust
|
||||
//! use actix_form_data::{Error, Field, Form, Value};
|
||||
//! use actix_form_data::{Error, Field, Form, FormData, Multipart, Value};
|
||||
//! use actix_web::{
|
||||
//! web::{post, resource},
|
||||
//! App, HttpResponse, HttpServer,
|
||||
//! App, HttpRequest, HttpResponse, HttpServer,
|
||||
//! };
|
||||
//! use futures_util::stream::StreamExt;
|
||||
//!
|
||||
//! async fn upload(uploaded_content: Value<()>) -> HttpResponse {
|
||||
//! println!("Uploaded Content: {:#?}", uploaded_content);
|
||||
//! struct UploadedContent(Value<()>);
|
||||
//!
|
||||
//! impl FormData for UploadedContent {
|
||||
//! type Item = ();
|
||||
//! type Error = Error;
|
||||
//!
|
||||
//! fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
|
||||
//! Ok(Form::new()
|
||||
//! .field("Hey", Field::text())
|
||||
//! .field(
|
||||
//! "Hi",
|
||||
//! Field::map()
|
||||
//! .field("One", Field::int())
|
||||
//! .field("Two", Field::float())
|
||||
//! .finalize(),
|
||||
//! )
|
||||
//! .field(
|
||||
//! "files",
|
||||
//! Field::array(Field::file(|_, _, mut stream| async move {
|
||||
//! while let Some(res) = stream.next().await {
|
||||
//! res?;
|
||||
//! }
|
||||
//! Ok(()) as Result<(), Error>
|
||||
//! })),
|
||||
//! ))
|
||||
//! }
|
||||
//!
|
||||
//! fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
|
||||
//! where
|
||||
//! Self: Sized,
|
||||
//! {
|
||||
//! Ok(UploadedContent(value))
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! async fn upload(Multipart(UploadedContent(value)): Multipart<UploadedContent>) -> HttpResponse {
|
||||
//! println!("Uploaded Content: {:#?}", value);
|
||||
//! HttpResponse::Created().finish()
|
||||
//! }
|
||||
//!
|
||||
//! #[actix_rt::main]
|
||||
//! async fn main() -> Result<(), anyhow::Error> {
|
||||
//! let form = Form::new()
|
||||
//! .field("Hey", Field::text())
|
||||
//! .field(
|
||||
//! "Hi",
|
||||
//! Field::map()
|
||||
//! .field("One", Field::int())
|
||||
//! .field("Two", Field::float())
|
||||
//! .finalize(),
|
||||
//! )
|
||||
//! .field(
|
||||
//! "files",
|
||||
//! Field::array(Field::file(|_, _, mut stream| async move {
|
||||
//! while let Some(_) = stream.next().await {
|
||||
//! // do something
|
||||
//! }
|
||||
//! Ok(()) as Result<(), Error>
|
||||
//! })),
|
||||
//! );
|
||||
//!
|
||||
//! println!("{:?}", form);
|
||||
//!
|
||||
//! HttpServer::new(move || {
|
||||
//! App::new()
|
||||
//! .wrap(form.clone())
|
||||
//! .service(resource("/upload").route(post().to(upload)))
|
||||
//! })
|
||||
//! .bind("127.0.0.1:8082")?;
|
||||
//! // commented out to prevent infinite doctest
|
||||
//! // .run()
|
||||
//! // .await?;
|
||||
//! HttpServer::new(move || App::new().service(resource("/upload").route(post().to(upload))))
|
||||
//! .bind("127.0.0.1:8080")?
|
||||
//! .run();
|
||||
//! // .await?;
|
||||
//!
|
||||
//! Ok(())
|
||||
//! }
|
||||
//!```
|
||||
|
||||
mod error;
|
||||
mod middleware;
|
||||
mod extractor;
|
||||
mod types;
|
||||
mod upload;
|
||||
|
||||
pub use self::{
|
||||
error::Error,
|
||||
extractor::{FormData, Multipart},
|
||||
types::{Field, FileMeta, Form, Value},
|
||||
upload::handle_multipart,
|
||||
};
|
||||
|
|
|
@ -1,146 +0,0 @@
|
|||
/*
|
||||
* This file is part of Actix Form Data.
|
||||
*
|
||||
* Copyright © 2020 Riley Trautman
|
||||
*
|
||||
* Actix Form Data is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Actix Form Data is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Actix Form Data. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use crate::{
|
||||
types::{Form, Value},
|
||||
upload::handle_multipart,
|
||||
};
|
||||
use actix_web::{
|
||||
dev::{Payload, Service, ServiceRequest, Transform},
|
||||
http::StatusCode,
|
||||
FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
|
||||
};
|
||||
use futures_util::future::LocalBoxFuture;
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::oneshot::{channel, Receiver};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FromRequestError {
|
||||
#[error("Uploaded guard used without Multipart middleware")]
|
||||
MissingMiddleware,
|
||||
#[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
|
||||
TxDropped,
|
||||
}
|
||||
|
||||
impl ResponseError for FromRequestError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Self::MissingMiddleware | Self::TxDropped => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
match self {
|
||||
Self::MissingMiddleware | Self::TxDropped => {
|
||||
HttpResponse::InternalServerError().finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Uploaded<T> {
|
||||
rx: Receiver<Value<T>>,
|
||||
}
|
||||
|
||||
pub struct MultipartMiddleware<S, T, E> {
|
||||
form: Form<T, E>,
|
||||
service: S,
|
||||
}
|
||||
|
||||
impl<T> FromRequest for Value<T>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
type Error = FromRequestError;
|
||||
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
|
||||
|
||||
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
|
||||
let opt = req.extensions_mut().remove::<Uploaded<T>>();
|
||||
Box::pin(async move {
|
||||
let fut = opt.ok_or(FromRequestError::MissingMiddleware)?;
|
||||
|
||||
fut.rx.await.map_err(|_| FromRequestError::TxDropped)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, E> Transform<S, ServiceRequest> for Form<T, E>
|
||||
where
|
||||
S: Service<ServiceRequest, Error = actix_web::Error>,
|
||||
S::Future: 'static,
|
||||
T: 'static,
|
||||
E: ResponseError + 'static,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type InitError = ();
|
||||
type Transform = MultipartMiddleware<S, T, E>;
|
||||
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ready(Ok(MultipartMiddleware {
|
||||
form: self.clone(),
|
||||
service,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, E> Service<ServiceRequest> for MultipartMiddleware<S, T, E>
|
||||
where
|
||||
S: Service<ServiceRequest, Error = actix_web::Error>,
|
||||
S::Future: 'static,
|
||||
T: 'static,
|
||||
E: ResponseError + 'static,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&self, mut req: ServiceRequest) -> Self::Future {
|
||||
let (tx, rx) = channel();
|
||||
req.extensions_mut().insert(Uploaded { rx });
|
||||
let payload = req.take_payload();
|
||||
let multipart = actix_multipart::Multipart::new(req.headers(), payload);
|
||||
let form = self.form.clone();
|
||||
let fut = self.service.call(req);
|
||||
|
||||
Box::pin(async move {
|
||||
let uploaded = match handle_multipart(multipart, form.clone()).await {
|
||||
Ok(Ok(uploaded)) => uploaded,
|
||||
Ok(Err(e)) => return Err(e.into()),
|
||||
Err(e) => {
|
||||
if let Some(f) = form.transform_error.clone() {
|
||||
return Err((f)(e));
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
let _ = tx.send(uploaded);
|
||||
fut.await
|
||||
})
|
||||
}
|
||||
}
|
140
src/types.rs
140
src/types.rs
|
@ -19,21 +19,20 @@
|
|||
|
||||
use crate::Error;
|
||||
use actix_web::web::Bytes;
|
||||
use futures_util::Stream;
|
||||
use futures_core::Stream;
|
||||
use mime::Mime;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
fmt,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::trace;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileMeta<T> {
|
||||
pub filename: String,
|
||||
pub content_type: Mime,
|
||||
pub content_type: Option<Mime>,
|
||||
pub result: T,
|
||||
}
|
||||
|
||||
|
@ -153,14 +152,12 @@ impl<T> From<MultipartContent<T>> for Value<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub type FileFn<T, E> = Arc<
|
||||
pub type FileFn<T, E> = Box<
|
||||
dyn Fn(
|
||||
String,
|
||||
Mime,
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
String,
|
||||
Option<Mime>,
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
|
||||
>;
|
||||
|
||||
/// The field type represents a field in the form-data that is allowed to be parsed.
|
||||
|
@ -174,20 +171,6 @@ pub enum Field<T, E> {
|
|||
Text,
|
||||
}
|
||||
|
||||
impl<T, E> Clone for Field<T, E> {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::Array(a) => Self::Array(a.clone()),
|
||||
Self::Map(m) => Self::Map(m.clone()),
|
||||
Self::File(file_fn) => Self::File(Arc::clone(file_fn)),
|
||||
Self::Bytes => Self::Bytes,
|
||||
Self::Int => Self::Int,
|
||||
Self::Float => Self::Float,
|
||||
Self::Text => Self::Text,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> fmt::Debug for Field<T, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
|
@ -232,17 +215,15 @@ impl<T, E> Field<T, E> {
|
|||
/// ```
|
||||
pub fn file<F, Fut>(f: F) -> Self
|
||||
where
|
||||
F: Fn(String, Mime, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
|
||||
+ Send
|
||||
+ Sync
|
||||
F: Fn(String, Option<Mime>, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
|
||||
+ Clone
|
||||
+ 'static,
|
||||
Fut: Future<Output = Result<T, E>> + 'static,
|
||||
E: 'static,
|
||||
{
|
||||
Field::File(Arc::new(move |filename, mime, stream| {
|
||||
Field::File(Box::new(move |filename, mime, stream| {
|
||||
let f = f.clone();
|
||||
Box::pin(async move { (f)(filename, mime, stream).await })
|
||||
Box::pin((f)(filename, mime, stream))
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -252,7 +233,7 @@ impl<T, E> Field<T, E> {
|
|||
/// ```rust
|
||||
/// # use actix_form_data::{Error, Form, Field};
|
||||
/// let form = Form::<(), Error>::new().field("text-field", Field::bytes());
|
||||
pub fn bytes() -> Self {
|
||||
pub const fn bytes() -> Self {
|
||||
Field::Bytes
|
||||
}
|
||||
|
||||
|
@ -262,7 +243,7 @@ impl<T, E> Field<T, E> {
|
|||
/// ```rust
|
||||
/// # use actix_form_data::{Error, Form, Field};
|
||||
/// let form = Form::<(), Error>::new().field("text-field", Field::text());
|
||||
pub fn text() -> Self {
|
||||
pub const fn text() -> Self {
|
||||
Field::Text
|
||||
}
|
||||
|
||||
|
@ -273,7 +254,7 @@ impl<T, E> Field<T, E> {
|
|||
/// # use actix_form_data::{Error, Form, Field};
|
||||
/// let form = Form::<(), Error>::new().field("int-field", Field::int());
|
||||
/// ```
|
||||
pub fn int() -> Self {
|
||||
pub const fn int() -> Self {
|
||||
Field::Int
|
||||
}
|
||||
|
||||
|
@ -284,7 +265,7 @@ impl<T, E> Field<T, E> {
|
|||
/// # use actix_form_data::{Error, Form, Field};
|
||||
/// let form = Form::<(), Error>::new().field("float-field", Field::float());
|
||||
/// ```
|
||||
pub fn float() -> Self {
|
||||
pub const fn float() -> Self {
|
||||
Field::Float
|
||||
}
|
||||
|
||||
|
@ -321,18 +302,18 @@ impl<T, E> Field<T, E> {
|
|||
/// );
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn map() -> Map<T, E> {
|
||||
pub const fn map() -> Map<T, E> {
|
||||
Map::new()
|
||||
}
|
||||
|
||||
fn valid_field(&self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
|
||||
fn valid_field<'a>(&'a self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<'a, T, E>> {
|
||||
trace!("Checking {:?} and {:?}", self, name);
|
||||
match *self {
|
||||
Field::Array(ref arr) => arr.valid_field(name),
|
||||
Field::Map(ref map) => map.valid_field(name),
|
||||
Field::File(ref file_fn) => {
|
||||
if name.is_empty() {
|
||||
Some(FieldTerminator::File(file_fn.clone()))
|
||||
Some(FieldTerminator::File(file_fn))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
@ -377,14 +358,6 @@ pub struct Array<T, E> {
|
|||
inner: Box<Field<T, E>>,
|
||||
}
|
||||
|
||||
impl<T, E> Clone for Array<T, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Array {
|
||||
inner: Box::new((*self.inner).clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> fmt::Debug for Array<T, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Array").field("inner", &self.inner).finish()
|
||||
|
@ -398,7 +371,10 @@ impl<T, E> Array<T, E> {
|
|||
}
|
||||
}
|
||||
|
||||
fn valid_field(&self, mut name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
|
||||
fn valid_field<'a>(
|
||||
&'a self,
|
||||
mut name: VecDeque<&NamePart>,
|
||||
) -> Option<FieldTerminator<'a, T, E>> {
|
||||
trace!("Checking {:?} and {:?}", self, name);
|
||||
match name.pop_front() {
|
||||
Some(NamePart::Array) => self.inner.valid_field(name),
|
||||
|
@ -412,14 +388,6 @@ pub struct Map<T, E> {
|
|||
inner: Vec<(String, Field<T, E>)>,
|
||||
}
|
||||
|
||||
impl<T, E> Clone for Map<T, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Map {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> fmt::Debug for Map<T, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Map").field("inner", &self.inner).finish()
|
||||
|
@ -427,7 +395,7 @@ impl<T, E> fmt::Debug for Map<T, E> {
|
|||
}
|
||||
|
||||
impl<T, E> Map<T, E> {
|
||||
fn new() -> Self {
|
||||
const fn new() -> Self {
|
||||
Map { inner: Vec::new() }
|
||||
}
|
||||
|
||||
|
@ -456,18 +424,21 @@ impl<T, E> Map<T, E> {
|
|||
/// .field("sub-field-two", Field::text())
|
||||
/// .finalize();
|
||||
/// ```
|
||||
pub fn finalize(self) -> Field<T, E> {
|
||||
pub const fn finalize(self) -> Field<T, E> {
|
||||
Field::Map(self)
|
||||
}
|
||||
|
||||
fn valid_field(&self, mut name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
|
||||
fn valid_field<'a>(
|
||||
&'a self,
|
||||
mut name: VecDeque<&NamePart>,
|
||||
) -> Option<FieldTerminator<'a, T, E>> {
|
||||
trace!("Checking {:?} and {:?}", self, name);
|
||||
match name.pop_front() {
|
||||
Some(NamePart::Map(name_part)) => self
|
||||
.inner
|
||||
.iter()
|
||||
.find(|&&(ref item, _)| *item == *name_part)
|
||||
.and_then(|&(_, ref field)| field.valid_field(name)),
|
||||
.find(|(ref item, _)| *item == *name_part)
|
||||
.and_then(|(_, ref field)| field.valid_field(name)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -502,23 +473,10 @@ pub struct Form<T, E> {
|
|||
pub(crate) max_field_size: usize,
|
||||
pub(crate) max_files: u32,
|
||||
pub(crate) max_file_size: usize,
|
||||
pub(crate) transform_error: Option<Arc<dyn Fn(Error) -> actix_web::Error + Send + Sync>>,
|
||||
pub(crate) transform_error: Option<Box<dyn Fn(Error) -> actix_web::Error + Sync>>,
|
||||
inner: Map<T, E>,
|
||||
}
|
||||
|
||||
impl<T, E> Clone for Form<T, E> {
|
||||
fn clone(&self) -> Self {
|
||||
Form {
|
||||
max_fields: self.max_fields,
|
||||
max_field_size: self.max_field_size,
|
||||
max_files: self.max_files,
|
||||
max_file_size: self.max_file_size,
|
||||
transform_error: None,
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> Default for Form<T, E> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
|
@ -535,7 +493,7 @@ impl<T, E> Form<T, E> {
|
|||
/// - max_field_size: 10_000 bytes
|
||||
/// - max_files: 20
|
||||
/// - max_files_size: 10_000_000 bytes
|
||||
pub fn new() -> Self {
|
||||
pub const fn new() -> Self {
|
||||
Form {
|
||||
max_fields: 100,
|
||||
max_field_size: 10_000,
|
||||
|
@ -549,16 +507,16 @@ impl<T, E> Form<T, E> {
|
|||
/// Set the Transform Error method to convert Error types into actix_web::Error by hand
|
||||
pub fn transform_error(
|
||||
mut self,
|
||||
f: impl Fn(Error) -> actix_web::Error + Send + Sync + 'static,
|
||||
f: impl Fn(Error) -> actix_web::Error + Sync + 'static,
|
||||
) -> Self {
|
||||
self.transform_error = Some(Arc::new(f));
|
||||
self.transform_error = Some(Box::new(f));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum number of fields allowed in the upload
|
||||
///
|
||||
/// The upload will error if too many fields are provided.
|
||||
pub fn max_fields(mut self, max: u32) -> Self {
|
||||
pub const fn max_fields(mut self, max: u32) -> Self {
|
||||
self.max_fields = max;
|
||||
|
||||
self
|
||||
|
@ -567,7 +525,7 @@ impl<T, E> Form<T, E> {
|
|||
/// Set the maximum size of a field (in bytes)
|
||||
///
|
||||
/// The upload will error if a provided field is too large.
|
||||
pub fn max_field_size(mut self, max: usize) -> Self {
|
||||
pub const fn max_field_size(mut self, max: usize) -> Self {
|
||||
self.max_field_size = max;
|
||||
|
||||
self
|
||||
|
@ -576,7 +534,7 @@ impl<T, E> Form<T, E> {
|
|||
/// Set the maximum number of files allowed in the upload
|
||||
///
|
||||
/// THe upload will error if too many files are provided.
|
||||
pub fn max_files(mut self, max: u32) -> Self {
|
||||
pub const fn max_files(mut self, max: u32) -> Self {
|
||||
self.max_files = max;
|
||||
|
||||
self
|
||||
|
@ -585,7 +543,7 @@ impl<T, E> Form<T, E> {
|
|||
/// Set the maximum size for files (in bytes)
|
||||
///
|
||||
/// The upload will error if a provided file is too large.
|
||||
pub fn max_file_size(mut self, max: usize) -> Self {
|
||||
pub const fn max_file_size(mut self, max: usize) -> Self {
|
||||
self.max_file_size = max;
|
||||
|
||||
self
|
||||
|
@ -597,8 +555,11 @@ impl<T, E> Form<T, E> {
|
|||
self
|
||||
}
|
||||
|
||||
pub(crate) fn valid_field(&self, name: VecDeque<&NamePart>) -> Option<FieldTerminator<T, E>> {
|
||||
self.inner.valid_field(name.clone())
|
||||
pub(crate) fn valid_field<'a>(
|
||||
&'a self,
|
||||
name: VecDeque<&NamePart>,
|
||||
) -> Option<FieldTerminator<'a, T, E>> {
|
||||
self.inner.valid_field(name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,15 +575,6 @@ pub(crate) struct ContentDisposition {
|
|||
pub filename: Option<String>,
|
||||
}
|
||||
|
||||
impl ContentDisposition {
|
||||
pub(crate) fn empty() -> Self {
|
||||
ContentDisposition {
|
||||
name: None,
|
||||
filename: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum NamePart {
|
||||
Map(String),
|
||||
|
@ -630,21 +582,21 @@ pub(crate) enum NamePart {
|
|||
}
|
||||
|
||||
impl NamePart {
|
||||
pub fn is_map(&self) -> bool {
|
||||
pub const fn is_map(&self) -> bool {
|
||||
matches!(self, NamePart::Map(_))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum FieldTerminator<T, E> {
|
||||
File(FileFn<T, E>),
|
||||
pub(crate) enum FieldTerminator<'a, T, E> {
|
||||
File(&'a FileFn<T, E>),
|
||||
Bytes,
|
||||
Int,
|
||||
Float,
|
||||
Text,
|
||||
}
|
||||
|
||||
impl<T, E> fmt::Debug for FieldTerminator<T, E> {
|
||||
impl<'a, T, E> fmt::Debug for FieldTerminator<'a, T, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
FieldTerminator::File(_) => write!(f, "File"),
|
||||
|
|
266
src/upload.rs
266
src/upload.rs
|
@ -25,19 +25,10 @@ use crate::{
|
|||
},
|
||||
};
|
||||
use actix_web::web::BytesMut;
|
||||
use futures_util::{
|
||||
select,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tracing::trace;
|
||||
use std::{collections::HashMap, path::Path, rc::Rc};
|
||||
use streem::IntoStreamer;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::Instrument;
|
||||
|
||||
fn consolidate<T>(mf: MultipartForm<T>) -> Value<T> {
|
||||
mf.into_iter().fold(
|
||||
|
@ -76,34 +67,30 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
|
|||
NamePart::Map(part.to_owned())
|
||||
}
|
||||
})
|
||||
.fold(Ok(vec![]), |acc, part| match acc {
|
||||
Ok(mut v) => {
|
||||
if v.is_empty() && !part.is_map() {
|
||||
return Err(Error::ContentDisposition);
|
||||
}
|
||||
|
||||
v.push(part);
|
||||
Ok(v)
|
||||
.try_fold(vec![], |mut v, part| {
|
||||
if v.is_empty() && !part.is_map() {
|
||||
return Err(Error::ContentDisposition);
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
||||
v.push(part);
|
||||
Ok(v)
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_content_disposition(field: &actix_multipart::Field) -> ContentDisposition {
|
||||
match field.content_disposition() {
|
||||
Some(x) => ContentDisposition {
|
||||
name: x.get_name().map(|v| v.to_string()),
|
||||
filename: x.get_filename().map(|v| v.to_string()),
|
||||
},
|
||||
None => ContentDisposition::empty(),
|
||||
let content_disposition = field.content_disposition();
|
||||
|
||||
ContentDisposition {
|
||||
name: content_disposition.get_name().map(|v| v.to_string()),
|
||||
filename: content_disposition.get_filename().map(|v| v.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_file_upload<T, E>(
|
||||
field: actix_multipart::Field,
|
||||
filename: Option<String>,
|
||||
form: Form<T, E>,
|
||||
file_fn: FileFn<T, E>,
|
||||
form: &Form<T, E>,
|
||||
file_fn: &FileFn<T, E>,
|
||||
) -> Result<Result<MultipartContent<T>, E>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
|
@ -116,31 +103,42 @@ where
|
|||
|
||||
let filename = filename.ok_or(Error::Filename)?.to_owned();
|
||||
|
||||
let file_size = Arc::new(AtomicUsize::new(0));
|
||||
let content_type = field.content_type().cloned();
|
||||
|
||||
let content_type = field.content_type().clone();
|
||||
let max_file_size = form.max_file_size;
|
||||
|
||||
let field_stream = streem::try_from_fn(move |yielder| async move {
|
||||
let mut file_size = 0;
|
||||
|
||||
let mut stream = field.into_streamer();
|
||||
|
||||
while let Some(bytes) = stream.try_next().await? {
|
||||
tracing::trace!("Bytes from field");
|
||||
|
||||
file_size += bytes.len();
|
||||
|
||||
if file_size > max_file_size {
|
||||
drop(bytes);
|
||||
|
||||
while stream.try_next().await?.is_some() {
|
||||
tracing::trace!("Dropping oversized bytes");
|
||||
}
|
||||
|
||||
return Err(Error::FileSize);
|
||||
}
|
||||
|
||||
yielder.yield_ok(bytes).await;
|
||||
}
|
||||
|
||||
tracing::debug!("Finished consuming field");
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let result = file_fn(
|
||||
filename.clone(),
|
||||
content_type.clone(),
|
||||
Box::pin(field.then(move |res| {
|
||||
let form = form.clone();
|
||||
let file_size = file_size.clone();
|
||||
async move {
|
||||
match res {
|
||||
Ok(bytes) => {
|
||||
let size = file_size.fetch_add(bytes.len(), Ordering::Relaxed);
|
||||
|
||||
if size + bytes.len() > form.max_file_size {
|
||||
return Err(Error::FileSize);
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
Err(e) => Err(Error::from(e)),
|
||||
}
|
||||
}
|
||||
})),
|
||||
Box::pin(field_stream),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -154,36 +152,63 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_form_data<T, E>(
|
||||
mut field: actix_multipart::Field,
|
||||
term: FieldTerminator<T, E>,
|
||||
form: Form<T, E>,
|
||||
async fn handle_form_data<'a, T, E>(
|
||||
field: actix_multipart::Field,
|
||||
term: FieldTerminator<'a, T, E>,
|
||||
form: &Form<T, E>,
|
||||
) -> Result<MultipartContent<T>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
E: 'static,
|
||||
{
|
||||
trace!("In handle_form_data, term: {:?}", term);
|
||||
let mut bytes = BytesMut::new();
|
||||
tracing::trace!("In handle_form_data, term: {:?}", term);
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let mut stream = field.into_streamer();
|
||||
|
||||
while let Some(bytes) = stream.try_next().await? {
|
||||
tracing::trace!("bytes from field");
|
||||
|
||||
if buf.len() + bytes.len() > form.max_field_size {
|
||||
drop(buf);
|
||||
|
||||
while stream.try_next().await?.is_some() {
|
||||
tracing::trace!("Dropping oversized bytes");
|
||||
}
|
||||
|
||||
while let Some(res) = field.next().await {
|
||||
let b = res?;
|
||||
if bytes.len() + b.len() > form.max_field_size {
|
||||
return Err(Error::FieldSize);
|
||||
}
|
||||
|
||||
bytes.extend(b);
|
||||
buf.push(bytes);
|
||||
}
|
||||
|
||||
let bytes = match buf.len() {
|
||||
0 => return Err(Error::FieldSize),
|
||||
1 => buf.pop().expect("contains an element"),
|
||||
_ => {
|
||||
let total_length = buf.iter().map(|b| b.len()).sum();
|
||||
|
||||
let mut bytes = BytesMut::with_capacity(total_length);
|
||||
|
||||
for b in buf {
|
||||
bytes.extend(b);
|
||||
}
|
||||
|
||||
bytes.freeze()
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!("Finished consuming field");
|
||||
|
||||
if let FieldTerminator::Bytes = term {
|
||||
return Ok(MultipartContent::Bytes(bytes.freeze()));
|
||||
return Ok(MultipartContent::Bytes(bytes));
|
||||
}
|
||||
|
||||
let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?;
|
||||
let s = std::str::from_utf8(&bytes).map_err(Error::ParseField)?;
|
||||
|
||||
match term {
|
||||
FieldTerminator::Bytes | FieldTerminator::File(_) => Err(Error::FieldType),
|
||||
FieldTerminator::Text => Ok(MultipartContent::Text(s)),
|
||||
FieldTerminator::Text => Ok(MultipartContent::Text(String::from(s))),
|
||||
FieldTerminator::Float => s
|
||||
.parse()
|
||||
.map_err(Error::ParseFloat)
|
||||
|
@ -197,7 +222,7 @@ where
|
|||
|
||||
async fn handle_stream_field<T, E>(
|
||||
field: actix_multipart::Field,
|
||||
form: Form<T, E>,
|
||||
form: Rc<Form<T, E>>,
|
||||
) -> Result<Result<MultipartHash<T>, E>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
|
@ -214,21 +239,22 @@ where
|
|||
|
||||
let content = match term {
|
||||
FieldTerminator::File(file_fn) => {
|
||||
match handle_file_upload(field, content_disposition.filename, form, file_fn).await? {
|
||||
match handle_file_upload(field, content_disposition.filename, &form, file_fn).await? {
|
||||
Ok(content) => content,
|
||||
Err(e) => return Ok(Err(e)),
|
||||
}
|
||||
}
|
||||
term => handle_form_data(field, term, form.clone()).await?,
|
||||
term => handle_form_data(field, term, &form).await?,
|
||||
};
|
||||
|
||||
Ok(Ok((name, content)))
|
||||
}
|
||||
|
||||
/// Handle multipart streams from Actix Web
|
||||
#[tracing::instrument(level = "TRACE", skip_all)]
|
||||
pub async fn handle_multipart<T, E>(
|
||||
m: actix_multipart::Multipart,
|
||||
form: Form<T, E>,
|
||||
form: Rc<Form<T, E>>,
|
||||
) -> Result<Result<Value<T>, E>, Error>
|
||||
where
|
||||
T: 'static,
|
||||
|
@ -238,35 +264,111 @@ where
|
|||
let mut file_count: u32 = 0;
|
||||
let mut field_count: u32 = 0;
|
||||
|
||||
let mut unordered = FuturesUnordered::new();
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
let mut m = m.fuse();
|
||||
let mut m = m.into_streamer();
|
||||
|
||||
loop {
|
||||
select! {
|
||||
opt = m.next() => {
|
||||
if let Some(res) = opt {
|
||||
unordered.push(handle_stream_field(res?, form.clone()));
|
||||
let mut error: Option<Error> = None;
|
||||
let mut provided_error: Option<E> = None;
|
||||
let mut is_closed = false;
|
||||
let mut stream_error = false;
|
||||
|
||||
'outer: loop {
|
||||
tracing::trace!("multipart loop");
|
||||
|
||||
if error.is_some() || provided_error.is_some() {
|
||||
set.abort_all();
|
||||
|
||||
if !stream_error {
|
||||
while let Some(res) = m.next().await {
|
||||
tracing::trace!("draining multipart field");
|
||||
|
||||
if let Ok(field) = res {
|
||||
let mut stream = field.into_streamer();
|
||||
while stream.next().await.is_some() {
|
||||
tracing::trace!("Throwing away uploaded bytes, we have an error");
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
opt = unordered.next() => {
|
||||
|
||||
while set.join_next().await.is_some() {
|
||||
tracing::trace!("Throwing away joined result");
|
||||
}
|
||||
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
opt = m.next(), if !is_closed => {
|
||||
tracing::trace!("Selected stream");
|
||||
is_closed = opt.is_none();
|
||||
|
||||
if let Some(res) = opt {
|
||||
let (name_parts, content) = match res? {
|
||||
Ok(tup) => tup,
|
||||
Err(e) => return Ok(Err(e)),
|
||||
match res {
|
||||
Ok(field) => {
|
||||
set.spawn_local(handle_stream_field(field, Rc::clone(&form)).instrument(tracing::trace_span!("multipart-field")));
|
||||
},
|
||||
Err(e) => {
|
||||
is_closed = true;
|
||||
stream_error = true;
|
||||
error = Some(e.into());
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
opt = set.join_next(), if !set.is_empty() => {
|
||||
tracing::trace!("Selected set");
|
||||
if let Some(res) = opt {
|
||||
let (name_parts, content) = match res {
|
||||
Ok(Ok(Ok(tup))) => tup,
|
||||
Ok(Ok(Err(e))) => {
|
||||
provided_error = Some(e);
|
||||
continue 'outer;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error = Some(e);
|
||||
continue 'outer;
|
||||
},
|
||||
Err(e) => {
|
||||
error = Some(e.into());
|
||||
continue 'outer;
|
||||
},
|
||||
};
|
||||
|
||||
let (l, r) = match count(&content, file_count, field_count, &form) {
|
||||
Ok(tup) => tup,
|
||||
Err(e) => {
|
||||
error = Some(e);
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
|
||||
let (l, r) = count(&content, file_count, field_count, &form)?;
|
||||
file_count = l;
|
||||
field_count = r;
|
||||
|
||||
multipart_form.push((name_parts, content));
|
||||
}
|
||||
}
|
||||
complete => break,
|
||||
else => {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Finished consuming multipart");
|
||||
|
||||
if let Some(e) = provided_error {
|
||||
return Ok(Err(e));
|
||||
}
|
||||
|
||||
if let Some(e) = error {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(Ok(consolidate(multipart_form)))
|
||||
}
|
||||
|
||||
|
@ -279,13 +381,13 @@ fn count<T, E>(
|
|||
match content {
|
||||
MultipartContent::File(_) => {
|
||||
file_count += 1;
|
||||
if file_count >= form.max_files {
|
||||
if file_count > form.max_files {
|
||||
return Err(Error::FileCount);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
field_count += 1;
|
||||
if field_count >= form.max_fields {
|
||||
if field_count > form.max_fields {
|
||||
return Err(Error::FieldCount);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue