Middleware-based approach

This commit is contained in:
asonix 2020-06-05 21:40:44 -05:00
parent 95ef512a93
commit 0deb486d99
10 changed files with 347 additions and 180 deletions

View file

@ -9,12 +9,9 @@ readme = "README.md"
keywords = ["actix", "form-data", "multipart", "async"] keywords = ["actix", "form-data", "multipart", "async"]
edition = "2018" edition = "2018"
[lib]
name = "form_data"
[dependencies] [dependencies]
actix-http = "2.0.0-alpha.3" actix-http = "2.0.0-alpha.3"
actix-multipart = "0.2.0" actix-multipart = "0.3.0-alpha.1"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-web = "3.0.0-alpha.2" actix-web = "3.0.0-alpha.2"
bytes = "0.5.0" bytes = "0.5.0"
@ -22,10 +19,12 @@ futures = "0.3.4"
log = "0.4.8" log = "0.4.8"
mime = "0.3.16" mime = "0.3.16"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "0.2.21", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs" }
anyhow = "1.0" anyhow = "1.0"
env_logger = "0.7.1" env_logger = "0.7.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "0.2.21", features = ["fs"] } thiserror = "1.0"

View file

@ -14,8 +14,7 @@ Add it to your dependencies.
[dependencies] [dependencies]
actix-web = "1.0.0" actix-web = "1.0.0"
actix-multipart = "0.1.0" actix-form-data = "0.5.0-alpha.0"
actix-form-data = "0.4.0"
``` ```
Require it in your project. Require it in your project.
@ -32,61 +31,51 @@ let form = Form::new().field("field-name", Field::text());
``` ```
This creates a form with one required field named "field-name" that will be parsed as text. This creates a form with one required field named "field-name" that will be parsed as text.
Then, pass it to `handle_multipart` in your request handler. Then, pass it as a middleware.
```rust ```rust
fn request_handler(mp: Multipart, state: Data<State>) -> ... { App::new()
let future = form_data::handle_multipart(mp, state.form); .wrap(form.clone())
.service(resource("/upload").route(post().to(upload)))
```
In your handler, get the value
```rust
async fn upload(value: Value) -> {
... ...
} }
``` ```
This returns a `Future<Item = Value, Error = form_data::Error>`, which can be used to And interact with it
fetch your data.
```rust ```rust
let field_value = match value { let field_value = match value {
Value::Map(mut hashmap) => { Value::Map(mut hashmap) => {
hashmap.remove("field-name")? hashmap.remove("field-name")?;
} }
_ => return None, _ => return None,
}; };
...
``` ```
#### Example #### Example
```rust ```rust
/// examples/simple.rs /// examples/simple.rs
use std::path::PathBuf; use actix_form_data::{Error, Field, Form, Value};
use actix_multipart::Multipart;
use actix_web::{ use actix_web::{
web::{post, resource, Data}, web::{post, resource},
App, HttpResponse, HttpServer, App, HttpResponse, HttpServer,
}; };
use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form}; use futures::stream::StreamExt;
use futures::Future;
struct Gen; async fn upload(uploaded_content: Value) -> HttpResponse {
println!("Uploaded Content: {:#?}", uploaded_content);
impl FilenameGenerator for Gen { HttpResponse::Created().finish()
fn next_filename(&self, _: &mime::Mime) -> Option<PathBuf> {
let mut p = PathBuf::new();
p.push("examples/filename.png");
Some(p)
}
} }
fn upload((mp, state): (Multipart, Data<Form>)) -> Box<Future<Item = HttpResponse, Error = Error>> { #[actix_rt::main]
Box::new( async fn main() -> Result<(), anyhow::Error> {
handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| {
println!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
}),
)
}
fn main() -> Result<(), failure::Error> {
let form = Form::new() let form = Form::new()
.field("Hey", Field::text()) .field("Hey", Field::text())
.field( .field(
@ -96,21 +85,29 @@ fn main() -> Result<(), failure::Error> {
.field("Two", Field::float()) .field("Two", Field::float())
.finalize(), .finalize(),
) )
.field("files", Field::array(Field::file(Gen))); .field(
"files",
Field::array(Field::file(|_, _, mut stream| async move {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as Result<(), Error>
})),
);
println!("{:?}", form); println!("{:?}", form);
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(form.clone()) .wrap(form.clone())
.service(resource("/upload").route(post().to(upload))) .service(resource("/upload").route(post().to(upload)))
}) })
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?
.run()?; .run()
.await?;
Ok(()) Ok(())
} }
}
``` ```
### Contributing ### Contributing
@ -118,7 +115,7 @@ Feel free to open issues for anything you find an issue with. Please note that a
### License ### License
Copyright © 2018 Riley Trautman Copyright © 2020 Riley Trautman
Actix Form Data is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Actix Form Data is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

View file

@ -1 +0,0 @@
3.7.0

View file

@ -1,17 +1,13 @@
use std::path::PathBuf; use actix_form_data::{Error, Field, Form, Value};
use actix_multipart::Multipart;
use actix_web::{ use actix_web::{
web::{post, resource, Data}, web::{post, resource},
App, HttpResponse, HttpServer, App, HttpResponse, HttpServer,
}; };
use form_data::{handle_multipart, Error, Field, Form}; use futures::stream::StreamExt;
async fn upload(mp: Multipart, state: Data<Form>) -> Result<HttpResponse, Error> { async fn upload(uploaded_content: Value) -> HttpResponse {
let uploaded_content = handle_multipart(mp, state.get_ref().clone()).await?; println!("Uploaded Content: {:#?}", uploaded_content);
HttpResponse::Created().finish()
println!("Uploaded Content: {:?}", uploaded_content);
Ok(HttpResponse::Created().finish())
} }
#[actix_rt::main] #[actix_rt::main]
@ -25,13 +21,21 @@ async fn main() -> Result<(), anyhow::Error> {
.field("Two", Field::float()) .field("Two", Field::float())
.finalize(), .finalize(),
) )
.field("files", Field::array(Field::file())); .field(
"files",
Field::array(Field::file(|_, _, mut stream| async move {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as Result<(), Error>
})),
);
println!("{:?}", form); println!("{:?}", form);
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(form.clone()) .wrap(form.clone())
.service(resource("/upload").route(post().to(upload))) .service(resource("/upload").route(post().to(upload)))
}) })
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?

View file

@ -1,68 +1,45 @@
use std::{ use actix_form_data::{Error, Field, Form, Value};
env,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
use actix_multipart::Multipart;
use actix_web::{ use actix_web::{
http::StatusCode,
middleware::Logger, middleware::Logger,
web::{post, resource, Data}, web::{post, resource},
App, HttpResponse, HttpServer, ResponseError, App, HttpResponse, HttpServer, ResponseError,
}; };
use failure::Fail; use bytes::Bytes;
use form_data::*; use futures::stream::{Stream, TryStreamExt};
use futures::Future;
use log::info; use log::info;
use serde_derive::{Deserialize, Serialize}; use std::{
env,
struct Gen(AtomicUsize); pin::Pin,
sync::{
impl Gen { atomic::{AtomicUsize, Ordering},
pub fn new() -> Self { Arc,
Gen(AtomicUsize::new(0)) },
} };
}
impl FilenameGenerator for Gen {
fn next_filename(&self, _: &mime::Mime) -> Option<PathBuf> {
let mut p = PathBuf::new();
p.push("examples");
p.push(&format!(
"filename{}.png",
self.0.fetch_add(1, Ordering::Relaxed)
));
Some(p)
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AppState { struct AppState {
form: Form, form: Form,
} }
#[derive(Clone, Debug, Deserialize, Fail, Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[fail(display = "{}", msg)]
struct JsonError { struct JsonError {
msg: String, msg: String,
} }
impl From<Error> for JsonError { impl<T> From<T> for JsonError
fn from(e: Error) -> Self { where
T: std::error::Error,
{
fn from(e: T) -> Self {
JsonError { JsonError {
msg: format!("{}", e), msg: format!("{}", e),
} }
} }
} }
impl ResponseError for JsonError { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, thiserror::Error)]
fn error_response(&self) -> HttpResponse { #[error("Errors occurred")]
HttpResponse::BadRequest().json(Errors::from(self.clone()))
}
}
#[derive(Clone, Debug, Deserialize, Fail, Serialize)]
#[fail(display = "Errors occurred")]
struct Errors { struct Errors {
errors: Vec<JsonError>, errors: Vec<JsonError>,
} }
@ -74,30 +51,48 @@ impl From<JsonError> for Errors {
} }
impl ResponseError for Errors { impl ResponseError for Errors {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
fn error_response(&self) -> HttpResponse { fn error_response(&self) -> HttpResponse {
HttpResponse::BadRequest().json(self) HttpResponse::BadRequest().json(self)
} }
} }
fn upload( async fn upload(uploaded_content: Value) -> HttpResponse {
(mp, state): (Multipart, Data<AppState>), info!("Uploaded Content: {:#?}", uploaded_content);
) -> Box<Future<Item = HttpResponse, Error = Errors>> {
Box::new( HttpResponse::Created().finish()
handle_multipart(mp, state.form.clone())
.map(|uploaded_content| {
info!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
})
.map_err(JsonError::from)
.map_err(Errors::from),
)
} }
fn main() -> Result<(), failure::Error> { async fn save_file(
stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
count: usize,
) -> Result<(), JsonError> {
let stream = stream.err_into::<JsonError>();
let filename = format!("examples/filename{}.png", count);
info!("Creating {}", filename);
let file = actix_fs::create(filename.clone()).await?;
info!("Writing to file");
if let Err(e) = actix_fs::write_stream(file, stream).await {
info!("Error writing, deleting file");
actix_fs::remove(filename.clone()).await?;
return Err(e);
}
info!("Written!");
Ok(())
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
env::set_var("RUST_LOG", "upload=info"); env::set_var("RUST_LOG", "upload=info");
env_logger::init(); env_logger::init();
let sys = actix::System::new("upload-test"); let file_count = Arc::new(AtomicUsize::new(0));
let form = Form::new() let form = Form::new()
.field("Hey", Field::text()) .field("Hey", Field::text())
@ -108,22 +103,25 @@ fn main() -> Result<(), failure::Error> {
.field("Two", Field::float()) .field("Two", Field::float())
.finalize(), .finalize(),
) )
.field("files", Field::array(Field::file(Gen::new()))); .field(
"files",
Field::array(Field::file(move |_filename, _content_type, stream| {
let count = file_count.clone().fetch_add(1, Ordering::Relaxed);
async move { save_file(stream, count).await.map_err(Errors::from) }
})),
);
info!("{:?}", form); info!("{:#?}", form);
let state = AppState { form };
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(state.clone()) .wrap(form.clone())
.wrap(Logger::default()) .wrap(Logger::default())
.service(resource("/upload").route(post().to(upload))) .service(resource("/upload").route(post().to(upload)))
}) })
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?
.start(); .run()
.await?;
sys.run()?;
Ok(()) Ok(())
} }

View file

@ -1,7 +1,7 @@
/* /*
* This file is part of Actix Form Data. * This file is part of Actix Form Data.
* *
* Copyright © 2018 Riley Trautman * Copyright © 2020 Riley Trautman
* *
* Actix Form Data is free software: you can redistribute it and/or modify * Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -25,11 +25,14 @@ use std::{
use actix_multipart::MultipartError; use actix_multipart::MultipartError;
use actix_web::{ use actix_web::{
error::{PayloadError, ResponseError}, error::{PayloadError, ResponseError},
http::StatusCode,
HttpResponse, HttpResponse,
}; };
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
#[error("Error in file function, {0}")]
FileFn(#[from] actix_web::Error),
#[error("Error parsing payload, {0}")] #[error("Error parsing payload, {0}")]
Payload(#[from] PayloadError), Payload(#[from] PayloadError),
#[error("Error in multipart creation, {0}")] #[error("Error in multipart creation, {0}")]
@ -58,6 +61,10 @@ pub enum Error {
FileCount, FileCount,
#[error("File too large")] #[error("File too large")]
FileSize, FileSize,
#[error("Uploaded guard used without Multipart middleware")]
MissingMiddleware,
#[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
TxDropped,
} }
impl From<MultipartError> for Error { impl From<MultipartError> for Error {
@ -67,8 +74,18 @@ impl From<MultipartError> for Error {
} }
impl ResponseError for Error { impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match *self {
Error::FileFn(ref e) => ResponseError::status_code(e.as_response_error()),
Error::Payload(ref e) => ResponseError::status_code(e),
Error::MissingMiddleware | Error::TxDropped => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}
}
fn error_response(&self) -> HttpResponse { fn error_response(&self) -> HttpResponse {
match *self { match *self {
Error::FileFn(ref e) => ResponseError::error_response(e.as_response_error()),
Error::Payload(ref e) => ResponseError::error_response(e), Error::Payload(ref e) => ResponseError::error_response(e),
Error::Multipart(_) Error::Multipart(_)
| Error::ParseField(_) | Error::ParseField(_)
@ -83,6 +100,9 @@ impl ResponseError for Error {
| Error::Filename | Error::Filename
| Error::FileCount | Error::FileCount
| Error::FileSize => HttpResponse::BadRequest().finish(), | Error::FileSize => HttpResponse::BadRequest().finish(),
Error::MissingMiddleware | Error::TxDropped => {
HttpResponse::InternalServerError().finish()
}
} }
} }
} }

View file

@ -1,7 +1,7 @@
/* /*
* This file is part of Actix Form Data. * This file is part of Actix Form Data.
* *
* Copyright © 2018 Riley Trautman * Copyright © 2020 Riley Trautman
* *
* Actix Form Data is free software: you can redistribute it and/or modify * Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -25,25 +25,20 @@
//! # Example //! # Example
//! //!
//!```rust //!```rust
//! use actix_multipart::Multipart; //! use actix_form_data::{Error, Field, Form, Value};
//! use actix_web::{ //! use actix_web::{
//! web::{post, resource, Data}, //! web::{post, resource},
//! App, HttpResponse, HttpServer, //! App, HttpResponse, HttpServer,
//! }; //! };
//! use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form}; //! use futures::stream::StreamExt;
//! use futures::Future;
//! //!
//! //! async fn upload(uploaded_content: Value) -> HttpResponse {
//! fn upload((mp, state): (Multipart, Data<Form>)) -> Box<Future<Item = HttpResponse, Error = Error>> { //! println!("Uploaded Content: {:#?}", uploaded_content);
//! Box::new( //! HttpResponse::Created().finish()
//! handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| {
//! println!("Uploaded Content: {:?}", uploaded_content);
//! HttpResponse::Created().finish()
//! }),
//! )
//! } //! }
//! //!
//! fn main() -> Result<(), failure::Error> { //! #[actix_rt::main]
//! async fn main() -> Result<(), anyhow::Error> {
//! let form = Form::new() //! let form = Form::new()
//! .field("Hey", Field::text()) //! .field("Hey", Field::text())
//! .field( //! .field(
@ -53,24 +48,39 @@
//! .field("Two", Field::float()) //! .field("Two", Field::float())
//! .finalize(), //! .finalize(),
//! ) //! )
//! .field("files", Field::array(Field::file(Gen))); //! .field(
//! "files",
//! Field::array(Field::file(|_, _, mut stream| async move {
//! while let Some(res) = stream.next().await {
//! res?;
//! }
//! Ok(()) as Result<(), Error>
//! })),
//! );
//! //!
//! println!("{:?}", form); //! println!("{:?}", form);
//! //!
//! HttpServer::new(move || { //! HttpServer::new(move || {
//! App::new() //! App::new()
//! .data(form.clone()) //! .wrap(form.clone())
//! .service(resource("/upload").route(post().to(upload))) //! .service(resource("/upload").route(post().to(upload)))
//! }) //! })
//! .bind("127.0.0.1:8080")?; //! .bind("127.0.0.1:8080")?;
//! // .run()?; //! // commented out to prevent infinite doctest
//! // .run()
//! // .await?;
//! //!
//! Ok(()) //! Ok(())
//! } //! }
//!``` //!```
mod error; mod error;
mod middleware;
mod types; mod types;
mod upload; mod upload;
pub use self::{error::Error, types::*, upload::handle_multipart}; pub use self::{
error::Error,
types::{Field, FileMeta, Form, Value},
upload::handle_multipart,
};

109
src/middleware.rs Normal file
View file

@ -0,0 +1,109 @@
/*
* This file is part of Actix Form Data.
*
* Copyright © 2020 Riley Trautman
*
* Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Actix Form Data is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Actix Form Data. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::{
error::Error,
types::{Form, Value},
upload::handle_multipart,
};
use actix_web::{
dev::{Payload, Service, ServiceRequest, Transform},
FromRequest, HttpMessage, HttpRequest,
};
use futures::future::{ok, Ready};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot::{channel, Receiver};
struct Uploaded {
rx: Receiver<Value>,
}
pub struct MultipartMiddleware<S> {
form: Form,
service: S,
}
impl FromRequest for Value {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
type Config = ();
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let opt = req.extensions_mut().remove::<Uploaded>();
Box::pin(async move {
let fut = opt.ok_or(Error::MissingMiddleware)?;
fut.rx.await.map_err(|_| Error::TxDropped)
})
}
}
impl<S> Transform<S> for Form
where
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = MultipartMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(MultipartMiddleware {
form: self.clone(),
service,
})
}
}
impl<S> Service for MultipartMiddleware<S>
where
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, mut req: S::Request) -> Self::Future {
let (tx, rx) = channel();
req.extensions_mut().insert(Uploaded { rx });
let payload = req.take_payload();
let multipart = actix_multipart::Multipart::new(req.headers(), payload);
let form = self.form.clone();
let fut = self.service.call(req);
Box::pin(async move {
let uploaded = handle_multipart(multipart, form).await?;
let _ = tx.send(uploaded);
fut.await
})
}
}

View file

@ -1,7 +1,7 @@
/* /*
* This file is part of Actix Form Data. * This file is part of Actix Form Data.
* *
* Copyright © 2018 Riley Trautman * Copyright © 2020 Riley Trautman
* *
* Actix Form Data is free software: you can redistribute it and/or modify * Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -25,22 +25,15 @@ use mime::Mime;
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
fmt, fmt,
future::Future,
pin::Pin, pin::Pin,
sync::Arc,
}; };
pub struct FileStream { #[derive(Debug)]
pub struct FileMeta {
pub filename: String, pub filename: String,
pub content_type: Mime, pub content_type: Mime,
pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
}
impl fmt::Debug for FileStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FileStream")
.field("filename", &self.filename)
.field("content_type", &self.content_type)
.finish()
}
} }
/// The result of a succesfull parse through a given multipart stream. /// The result of a succesfull parse through a given multipart stream.
@ -72,7 +65,7 @@ impl fmt::Debug for FileStream {
pub enum Value { pub enum Value {
Map(HashMap<String, Value>), Map(HashMap<String, Value>),
Array(Vec<Value>), Array(Vec<Value>),
File(FileStream), File(FileMeta),
Bytes(Bytes), Bytes(Bytes),
Text(String), Text(String),
Int(i64), Int(i64),
@ -118,9 +111,9 @@ impl Value {
} }
} }
pub fn file(self) -> Option<FileStream> { pub fn file(self) -> Option<FileMeta> {
match self { match self {
Value::File(file_stream) => Some(file_stream), Value::File(file_meta) => Some(file_meta),
_ => None, _ => None,
} }
} }
@ -157,7 +150,7 @@ impl Value {
impl From<MultipartContent> for Value { impl From<MultipartContent> for Value {
fn from(mc: MultipartContent) -> Self { fn from(mc: MultipartContent) -> Self {
match mc { match mc {
MultipartContent::File(file_stream) => Value::File(file_stream), MultipartContent::File(file_meta) => Value::File(file_meta),
MultipartContent::Bytes(bytes) => Value::Bytes(bytes), MultipartContent::Bytes(bytes) => Value::Bytes(bytes),
MultipartContent::Text(string) => Value::Text(string), MultipartContent::Text(string) => Value::Text(string),
MultipartContent::Int(i) => Value::Int(i), MultipartContent::Int(i) => Value::Int(i),
@ -166,12 +159,22 @@ impl From<MultipartContent> for Value {
} }
} }
pub type FileFn = Arc<
dyn Fn(
String,
Mime,
Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
) -> Pin<Box<dyn Future<Output = Result<(), actix_web::Error>>>>
+ Send
+ Sync,
>;
/// The field type represents a field in the form-data that is allowed to be parsed. /// The field type represents a field in the form-data that is allowed to be parsed.
#[derive(Clone)] #[derive(Clone)]
pub enum Field { pub enum Field {
Array(Array), Array(Array),
Map(Map), Map(Map),
File, File(FileFn),
Bytes, Bytes,
Int, Int,
Float, Float,
@ -181,9 +184,9 @@ pub enum Field {
impl fmt::Debug for Field { impl fmt::Debug for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Field::Array(ref arr) => write!(f, "Array({:?})", arr), Field::Array(ref arr) => f.debug_tuple("Array").field(arr).finish(),
Field::Map(ref map) => write!(f, "Map({:?})", map), Field::Map(ref map) => f.debug_tuple("Map").field(map).finish(),
Field::File => write!(f, "File"), Field::File(_) => write!(f, "File"),
Field::Bytes => write!(f, "Bytes"), Field::Bytes => write!(f, "Bytes"),
Field::Int => write!(f, "Int"), Field::Int => write!(f, "Int"),
Field::Float => write!(f, "Float"), Field::Float => write!(f, "Float"),
@ -202,11 +205,32 @@ impl Field {
/// # Example /// # Example
/// ```rust /// ```rust
/// # use form_data::{Form, Field}; /// # use form_data::{Form, Field};
/// # use tokio::sync::mpsc::channel;
/// # /// #
/// let form = Form::new().field("file-field", Field::file()); /// let (tx, rx) = channel(1);
/// let form = Form::new().field("file-field", Field::file(|_, _, stream| async move {
/// while let Some(res) = stream.next().await {
/// if let Err(_) = tx.send(res).await {
/// break;
/// }
/// }
/// Ok(()) as Result<(), std::convert::Infallible>
/// }));
/// ``` /// ```
pub fn file() -> Self { pub fn file<F, Fut, E>(f: F) -> Self
Field::File where
F: Fn(String, Mime, Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>) -> Fut
+ Send
+ Sync
+ Clone
+ 'static,
Fut: Future<Output = Result<(), E>> + 'static,
E: actix_web::ResponseError + 'static,
{
Field::File(Arc::new(move |filename, mime, stream| {
let f = f.clone();
Box::pin(async move { (f)(filename, mime, stream).await.map_err(|e| e.into()) })
}))
} }
/// Add a Bytes field to a form /// Add a Bytes field to a form
@ -295,9 +319,9 @@ impl Field {
match *self { match *self {
Field::Array(ref arr) => arr.valid_field(name), Field::Array(ref arr) => arr.valid_field(name),
Field::Map(ref map) => map.valid_field(name), Field::Map(ref map) => map.valid_field(name),
Field::File => { Field::File(ref file_fn) => {
if name.is_empty() { if name.is_empty() {
Some(FieldTerminator::File) Some(FieldTerminator::File(file_fn.clone()))
} else { } else {
None None
} }
@ -512,7 +536,7 @@ impl Form {
impl fmt::Debug for Form { impl fmt::Debug for Form {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Form({:?})", self.inner) f.debug_struct("Form").field("inner", &self.inner).finish()
} }
} }
@ -548,7 +572,7 @@ impl NamePart {
#[derive(Clone)] #[derive(Clone)]
pub(crate) enum FieldTerminator { pub(crate) enum FieldTerminator {
File, File(FileFn),
Bytes, Bytes,
Int, Int,
Float, Float,
@ -558,7 +582,7 @@ pub(crate) enum FieldTerminator {
impl fmt::Debug for FieldTerminator { impl fmt::Debug for FieldTerminator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
FieldTerminator::File => write!(f, "File"), FieldTerminator::File(_) => write!(f, "File"),
FieldTerminator::Bytes => write!(f, "Bytes"), FieldTerminator::Bytes => write!(f, "Bytes"),
FieldTerminator::Int => write!(f, "Int"), FieldTerminator::Int => write!(f, "Int"),
FieldTerminator::Float => write!(f, "Float"), FieldTerminator::Float => write!(f, "Float"),
@ -572,7 +596,7 @@ pub(crate) type MultipartForm = Vec<MultipartHash>;
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum MultipartContent { pub(crate) enum MultipartContent {
File(FileStream), File(FileMeta),
Bytes(Bytes), Bytes(Bytes),
Text(String), Text(String),
Int(i64), Int(i64),

View file

@ -1,7 +1,7 @@
/* /*
* This file is part of Actix Form Data. * This file is part of Actix Form Data.
* *
* Copyright © 2018 Riley Trautman * Copyright © 2020 Riley Trautman
* *
* Actix Form Data is free software: you can redistribute it and/or modify * Actix Form Data is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -20,8 +20,8 @@
use crate::{ use crate::{
error::Error, error::Error,
types::{ types::{
ContentDisposition, FieldTerminator, FileStream, Form, MultipartContent, MultipartForm, ContentDisposition, FieldTerminator, FileFn, FileMeta, Form, MultipartContent,
MultipartHash, NamePart, Value, MultipartForm, MultipartHash, NamePart, Value,
}, },
}; };
use bytes::BytesMut; use bytes::BytesMut;
@ -100,6 +100,7 @@ async fn handle_file_upload(
field: actix_multipart::Field, field: actix_multipart::Field,
filename: Option<String>, filename: Option<String>,
form: Form, form: Form,
file_fn: FileFn,
) -> Result<MultipartContent, Error> { ) -> Result<MultipartContent, Error> {
let filename = filename.ok_or(Error::Filename)?; let filename = filename.ok_or(Error::Filename)?;
let path: &Path = filename.as_ref(); let path: &Path = filename.as_ref();
@ -112,10 +113,10 @@ async fn handle_file_upload(
let content_type = field.content_type().clone(); let content_type = field.content_type().clone();
Ok(MultipartContent::File(FileStream { file_fn(
filename, filename.clone(),
content_type, content_type.clone(),
stream: Box::pin(field.then(move |res| { Box::pin(field.then(move |res| {
let form = form.clone(); let form = form.clone();
let file_size = file_size.clone(); let file_size = file_size.clone();
async move { async move {
@ -133,6 +134,12 @@ async fn handle_file_upload(
} }
} }
})), })),
)
.await?;
Ok(MultipartContent::File(FileMeta {
filename,
content_type,
})) }))
} }
@ -160,7 +167,7 @@ async fn handle_form_data(
let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?; let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?;
match term { match term {
FieldTerminator::Bytes | FieldTerminator::File => { FieldTerminator::Bytes | FieldTerminator::File(_) => {
return Err(Error::FieldType); return Err(Error::FieldType);
} }
FieldTerminator::Text => Ok(MultipartContent::Text(s)), FieldTerminator::Text => Ok(MultipartContent::Text(s)),
@ -189,8 +196,8 @@ async fn handle_stream_field(
.ok_or(Error::FieldType)?; .ok_or(Error::FieldType)?;
let content = match term { let content = match term {
FieldTerminator::File => { FieldTerminator::File(file_fn) => {
handle_file_upload(field, content_disposition.filename, form).await? handle_file_upload(field, content_disposition.filename, form, file_fn).await?
} }
term => handle_form_data(field, term, form.clone()).await?, term => handle_form_data(field, term, form.clone()).await?,
}; };