Update for latest Actix version

- Dump Futures CpuPool and Futures FS due to weird panic
 - Remove futures executor from Form, use Actix built-in threadpool and
   runtime
 - Update parsing for latest actix multipart impl
This commit is contained in:
asonix 2019-05-08 18:11:26 -05:00
parent 32fbd71d03
commit a1ac05648d
No known key found for this signature in database
GPG key ID: 6986797E36BFA1D4
8 changed files with 276 additions and 262 deletions

View file

@ -1,29 +1,32 @@
[package]
name = "actix-form-data"
description = "Multipart Form Data for Actix Web"
version = "0.3.2"
version = "0.4.0-beta.1"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/actix-form-data.git"
readme = "README.md"
keywords = ["actix", "form-data", "multipart", "async"]
edition = "2018"
[lib]
name = "form_data"
[dependencies]
actix-web = "0.7.15"
actix-multipart = "0.1.0-beta.1"
actix-rt = "0.2.2"
actix-threadpool = "0.1.0"
actix-web = "1.0.0-beta.3"
bytes = "0.4.7"
failure = "0.1"
futures = "0.1.21"
futures-cpupool = "0.1.8"
futures-fs = "0.0.5"
http = "0.1.5"
log = "0.4.1"
mime = "0.3.5"
[dev-dependencies]
actix = "0.7.0"
env_logger = "0.5.9"
actix = "0.8.1"
env_logger = "0.6.0"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"

View file

@ -1,11 +1,10 @@
extern crate actix_web;
extern crate form_data;
extern crate futures;
extern crate mime;
use std::path::PathBuf;
use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State};
use actix_multipart::Multipart;
use actix_web::{
web::{post, resource, Data},
App, HttpResponse, HttpServer,
};
use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form};
use futures::Future;
@ -19,18 +18,16 @@ impl FilenameGenerator for Gen {
}
}
fn upload(
(req, state): (HttpRequest<Form>, State<Form>),
) -> Box<Future<Item = HttpResponse, Error = Error>> {
handle_multipart(req.multipart(), state.clone())
.map(|uploaded_content| {
fn upload((mp, state): (Multipart, Data<Form>)) -> Box<Future<Item = HttpResponse, Error = Error>> {
Box::new(
handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| {
println!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
})
.responder()
}),
)
}
fn main() {
fn main() -> Result<(), failure::Error> {
let form = Form::new()
.field("Hey", Field::text())
.field(
@ -44,11 +41,13 @@ fn main() {
println!("{:?}", form);
server::new(move || {
App::with_state(form.clone())
.resource("/upload", |r| r.method(http::Method::POST).with(upload))
HttpServer::new(move || {
App::new()
.data(form.clone())
.service(resource("/upload").route(post().to(upload)))
})
.bind("127.0.0.1:8080")
.unwrap()
.run();
.bind("127.0.0.1:8080")?
.run()?;
Ok(())
}

View file

@ -1,29 +1,20 @@
extern crate actix;
extern crate actix_web;
extern crate env_logger;
#[macro_use]
extern crate failure;
extern crate form_data;
extern crate futures;
#[macro_use]
extern crate log;
extern crate mime;
extern crate serde;
#[macro_use]
extern crate serde_derive;
use std::{
env,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
use actix_multipart::Multipart;
use actix_web::{
error::ResponseError, http, middleware::Logger, server, App, AsyncResponder, HttpMessage,
HttpRequest, HttpResponse, State,
middleware::Logger,
web::{post, resource, Data},
App, HttpResponse, HttpServer, ResponseError,
};
use failure::Fail;
use form_data::*;
use futures::Future;
use log::info;
use serde_derive::{Deserialize, Serialize};
struct Gen(AtomicUsize);
@ -89,19 +80,20 @@ impl ResponseError for Errors {
}
fn upload(
(req, state): (HttpRequest<AppState>, State<AppState>),
(mp, state): (Multipart, Data<AppState>),
) -> Box<Future<Item = HttpResponse, Error = Errors>> {
handle_multipart(req.multipart(), state.form.clone())
.map(|uploaded_content| {
info!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
})
.map_err(JsonError::from)
.map_err(Errors::from)
.responder()
Box::new(
handle_multipart(mp, state.form.clone())
.map(|uploaded_content| {
info!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
})
.map_err(JsonError::from)
.map_err(Errors::from),
)
}
fn main() {
fn main() -> Result<(), failure::Error> {
env::set_var("RUST_LOG", "upload=info");
env_logger::init();
@ -122,14 +114,16 @@ fn main() {
let state = AppState { form };
server::new(move || {
App::with_state(state.clone())
.middleware(Logger::default())
.resource("/upload", |r| r.method(http::Method::POST).with(upload))
HttpServer::new(move || {
App::new()
.data(state.clone())
.wrap(Logger::default())
.service(resource("/upload").route(post().to(upload)))
})
.bind("127.0.0.1:8080")
.unwrap()
.bind("127.0.0.1:8080")?
.start();
sys.run();
sys.run()?;
Ok(())
}

View file

@ -23,19 +23,23 @@ use std::{
string::FromUtf8Error,
};
use actix_multipart::MultipartError;
use actix_web::{
error::{MultipartError, PayloadError, ResponseError},
error::{PayloadError, ResponseError},
HttpResponse,
};
use bytes::Bytes;
use failure::Fail;
use futures::sync::mpsc::SendError;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "Error saving file, {}", _0)]
FsPool(#[cause] io::Error),
#[fail(display = "Error parsing payload, {}", _0)]
Payload(#[cause] PayloadError),
Payload(PayloadError),
#[fail(display = "Error in multipart creation, {}", _0)]
Multipart(#[cause] MultipartError),
Multipart(MultipartError),
#[fail(display = "Failed to parse field, {}", _0)]
ParseField(#[cause] FromUtf8Error),
#[fail(display = "Failed to parse int, {}", _0)]
@ -52,6 +56,8 @@ pub enum Error {
MkDir,
#[fail(display = "Failed to parse field name")]
Field,
#[fail(display = "Could not write file")]
WriteFile,
#[fail(display = "Too many fields in request")]
FieldCount,
#[fail(display = "Field too large")]
@ -84,6 +90,12 @@ impl From<io::Error> for Error {
}
}
impl From<SendError<Bytes>> for Error {
fn from(_: SendError<Bytes>) -> Self {
Error::WriteFile
}
}
impl ResponseError for Error {
fn error_response(&self) -> HttpResponse {
match *self {
@ -98,6 +110,7 @@ impl ResponseError for Error {
| Error::ContentDisposition
| Error::Field
| Error::FieldCount
| Error::WriteFile
| Error::FieldSize
| Error::FieldType
| Error::Filename

113
src/file_future.rs Normal file
View file

@ -0,0 +1,113 @@
use bytes::{Bytes, BytesMut};
use failure::Fail;
use futures::{
sync::mpsc::{channel, SendError},
task, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
};
use std::{
fs::File,
io::{Error, Write},
path::Path,
};
#[derive(Clone, Debug, Fail)]
#[fail(display = "Error in Channel")]
struct ChannelError;
pub fn write(
filename: impl AsRef<Path> + Clone + Send + 'static,
) -> impl Sink<SinkItem = Bytes, SinkError = SendError<Bytes>> {
let (tx, rx) = channel(50);
actix_rt::spawn(
actix_threadpool::run(move || {
CreateFuture::new(filename.clone())
.from_err()
.and_then(|file| {
rx.map_err(|_| failure::Error::from(ChannelError))
.forward(WriteSink::new(file))
})
.wait()
})
.map_err(|_| ())
.map(|_| ()),
);
tx
}
struct CreateFuture<P>(P)
where
P: AsRef<Path> + Clone;
impl<P> CreateFuture<P>
where
P: AsRef<Path> + Clone,
{
fn new(path: P) -> Self {
CreateFuture(path)
}
}
impl<P> Future for CreateFuture<P>
where
P: AsRef<Path> + Clone,
{
type Item = File;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
File::create(self.0.clone()).map(Async::Ready)
}
}
struct WriteSink {
buffer: BytesMut,
file: File,
}
impl WriteSink {
fn new(file: File) -> Self {
WriteSink {
buffer: BytesMut::new(),
file,
}
}
}
impl Sink for WriteSink {
type SinkItem = Bytes;
type SinkError = Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if let Async::NotReady = self.poll_complete()? {
return Ok(AsyncSink::NotReady(item));
}
self.buffer = BytesMut::new();
self.buffer.extend_from_slice(&item);
self.poll_complete()?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
if self.buffer.is_empty() {
return Ok(Async::Ready(()));
}
let written = self.file.write(&self.buffer)?;
if written == 0 {
return Err(Error::last_os_error());
}
self.buffer.advance(written);
if self.buffer.is_empty() {
Ok(Async::Ready(()))
} else {
task::current().notify();
Ok(Async::NotReady)
}
}
}

View file

@ -25,19 +25,18 @@
//! # Example
//!
//!```rust
//! extern crate actix_web;
//! extern crate form_data;
//! extern crate futures;
//! extern crate mime;
//!
//! use std::path::PathBuf;
//!
//! use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State};
//!
//! use actix_multipart::Multipart;
//! use actix_web::{
//! web::{post, resource, Data},
//! App, HttpResponse, HttpServer,
//! };
//! use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form};
//! use futures::Future;
//!
//!
//! struct Gen;
//!
//!
//! impl FilenameGenerator for Gen {
//! fn next_filename(&self, _: &mime::Mime) -> Option<PathBuf> {
//! let mut p = PathBuf::new();
@ -45,19 +44,17 @@
//! Some(p)
//! }
//! }
//!
//! fn upload(
//! (req, state): (HttpRequest<Form>, State<Form>),
//! ) -> Box<Future<Item = HttpResponse, Error = Error>> {
//! handle_multipart(req.multipart(), state.clone())
//! .map(|uploaded_content| {
//!
//! fn upload((mp, state): (Multipart, Data<Form>)) -> Box<Future<Item = HttpResponse, Error = Error>> {
//! Box::new(
//! handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| {
//! println!("Uploaded Content: {:?}", uploaded_content);
//! HttpResponse::Created().finish()
//! })
//! .responder()
//! }),
//! )
//! }
//!
//! fn main() {
//!
//! fn main() -> Result<(), failure::Error> {
//! let form = Form::new()
//! .field("Hey", Field::text())
//! .field(
@ -68,47 +65,34 @@
//! .finalize(),
//! )
//! .field("files", Field::array(Field::file(Gen)));
//!
//!
//! println!("{:?}", form);
//!
//! server::new(move || {
//! App::with_state(form.clone())
//! .resource("/upload", |r| r.method(http::Method::POST).with(upload))
//! }).bind("127.0.0.1:8080")
//! .unwrap();
//! // .run()
//!
//! HttpServer::new(move || {
//! App::new()
//! .data(form.clone())
//! .service(resource("/upload").route(post().to(upload)))
//! })
//! .bind("127.0.0.1:8080")?;
//! // .run()?;
//!
//! Ok(())
//! }
//!```
extern crate actix_web;
extern crate bytes;
#[macro_use]
extern crate failure;
extern crate futures;
extern crate futures_cpupool;
extern crate futures_fs;
extern crate http;
#[macro_use]
extern crate log;
extern crate mime;
#[cfg(feature = "with-serde")]
extern crate serde;
#[cfg(feature = "with-serde")]
#[macro_use]
extern crate serde_derive;
use std::path::PathBuf;
mod error;
mod file_future;
mod types;
mod upload;
pub use self::error::Error;
pub use self::types::*;
pub use self::upload::handle_multipart;
pub use self::{error::Error, types::*, upload::handle_multipart};
/// A trait for types that produce filenames for uploade files
///
/// Currently, the mime type provided to the `next_filename` method is guessed from the uploaded
/// file's original filename, so relying on this to be 100% accurate is probably a bad idea.
pub trait FilenameGenerator: Send + Sync {
fn next_filename(&self, &mime::Mime) -> Option<PathBuf>;
fn next_filename(&self, mime_type: &mime::Mime) -> Option<PathBuf>;
}

View file

@ -25,13 +25,9 @@ use std::{
};
use bytes::Bytes;
use futures::{
future::{ExecuteError, Executor},
Future,
};
use futures_cpupool::CpuPool;
use log::trace;
use super::FilenameGenerator;
use crate::FilenameGenerator;
/// The result of a succesfull parse through a given multipart stream.
///
@ -74,8 +70,8 @@ impl Value {
match (self, rhs) {
(&mut Value::Map(ref mut hm), Value::Map(ref other)) => {
other.into_iter().fold(hm, |hm, (key, value)| {
if hm.contains_key(key) {
hm.get_mut(key).unwrap().merge(value.clone())
if let Some(v) = hm.get_mut(key) {
v.merge(value.clone());
} else {
hm.insert(key.to_owned(), value.clone());
}
@ -480,16 +476,20 @@ pub struct Form {
pub max_files: u32,
pub max_file_size: usize,
inner: Map,
pub pool: ArcExecutor,
}
impl Form {
/// Create a new form
///
/// This also creates a new `CpuPool` to be used to stream files onto the filesystem. If you
/// wish to provide your own executor, use the `from_executor` method.
/// If you wish to provide your own executor, use the `with_executor` method.
pub fn new() -> Self {
Form::from_executor(CpuPool::new_num_cpus())
Form {
max_fields: 100,
max_field_size: 10_000,
max_files: 20,
max_file_size: 10_000_000,
inner: Map::new(),
}
}
/// Set the maximum number of fields allowed in the upload
@ -528,23 +528,6 @@ impl Form {
self
}
/// Create a new form with a given executor
///
/// This executor is used to stream files onto the filesystem.
pub fn from_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + Clone + 'static,
{
Form {
max_fields: 100,
max_field_size: 10_000,
max_files: 20,
max_file_size: 10_000_000,
inner: Map::new(),
pool: ArcExecutor::new(executor),
}
}
pub fn field(mut self, name: &str, field: Field) -> Self {
self.inner = self.inner.field(name, field);
@ -562,37 +545,6 @@ impl fmt::Debug for Form {
}
}
/// The executor type stored inside a `Form`
///
/// Any executor capable of being shared and executing boxed futures can be stored here.
#[derive(Clone)]
pub struct ArcExecutor {
inner: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static>,
}
impl ArcExecutor {
/// Create a new ArcExecutor from an Executor
///
/// This essentially wraps the given executor in an Arc
pub fn new<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + Clone + 'static,
{
ArcExecutor {
inner: Arc::new(executor),
}
}
}
impl Executor<Box<Future<Item = (), Error = ()> + Send>> for ArcExecutor where {
fn execute(
&self,
future: Box<Future<Item = (), Error = ()> + Send>,
) -> Result<(), ExecuteError<Box<Future<Item = (), Error = ()> + Send>>> {
self.inner.execute(future)
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct ContentDisposition {
pub name: Option<String>,

View file

@ -27,19 +27,19 @@ use std::{
},
};
use actix_web::{error::PayloadError, multipart};
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use futures::{
future::{lazy, result, Either, Executor},
sync::oneshot,
future::{result, Either},
Future, Stream,
};
use futures_fs::FsPool;
use log::trace;
use super::FilenameGenerator;
use error::Error;
use types::{
self, ContentDisposition, MultipartContent, MultipartForm, MultipartHash, NamePart, Value,
use crate::{
error::Error,
types::{
self, ContentDisposition, MultipartContent, MultipartForm, MultipartHash, NamePart, Value,
},
FilenameGenerator,
};
fn consolidate(mf: MultipartForm) -> Value {
@ -74,7 +74,7 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
if part.len() == 1 && part.ends_with(']') {
NamePart::Array
} else if part.ends_with(']') {
NamePart::Map(part.trim_right_matches(']').to_owned())
NamePart::Map(part.trim_end_matches(']').to_owned())
} else {
NamePart::Map(part.to_owned())
}
@ -92,10 +92,7 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
})
}
fn parse_content_disposition<S>(field: &multipart::Field<S>) -> ContentDisposition
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn parse_content_disposition(field: &actix_multipart::Field) -> ContentDisposition {
match field.content_disposition() {
Some(x) => ContentDisposition {
name: x.get_name().map(|v| v.to_string()),
@ -124,15 +121,12 @@ fn build_dir(stored_dir: PathBuf) -> Result<(), Error> {
.map_err(|_| Error::MkDir)
}
fn handle_file_upload<S>(
field: multipart::Field<S>,
fn handle_file_upload(
field: actix_multipart::Field,
gen: Arc<FilenameGenerator>,
filename: Option<String>,
form: types::Form,
) -> Box<Future<Item = MultipartContent, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
) -> Box<Future<Item = MultipartContent, Error = Error>> {
let filename = match filename {
Some(filename) => filename,
None => return Box::new(result(Err(Error::Filename))),
@ -155,55 +149,36 @@ where
let mut stored_dir = stored_as.clone();
stored_dir.pop();
let (tx, rx) = oneshot::channel();
match form.pool.execute(Box::new(lazy(move || {
let res = build_dir(stored_dir.clone());
tx.send(res).map_err(|_| ())
}))) {
Ok(_) => (),
Err(_) => return Box::new(result(Err(Error::MkDir))),
};
let mkdir_fut = actix_threadpool::run(move || build_dir(stored_dir.clone()));
let counter = Arc::new(AtomicUsize::new(0));
Box::new(
rx.then(|res| match res {
Ok(res) => res,
Err(_) => Err(Error::MkDir),
})
.and_then(move |_| {
let write = FsPool::with_executor(form.pool.clone())
.write(stored_as.clone(), Default::default());
field
.map_err(Error::Multipart)
.and_then(move |bytes| {
let size = counter.fetch_add(bytes.len(), Ordering::Relaxed) + bytes.len();
Box::new(mkdir_fut.map_err(|_| Error::MkDir).and_then(move |_| {
let write = crate::file_future::write(stored_as.clone());
field
.map_err(Error::Multipart)
.and_then(move |bytes| {
let size = counter.fetch_add(bytes.len(), Ordering::Relaxed) + bytes.len();
if size > form.max_file_size {
Err(Error::FileSize)
} else {
Ok(bytes)
}
})
.forward(write)
.map(move |_| MultipartContent::File {
filename,
stored_as,
})
}),
)
if size > form.max_file_size {
Err(Error::FileSize)
} else {
Ok(bytes)
}
})
.forward(write)
.map(move |_| MultipartContent::File {
filename,
stored_as,
})
}))
}
fn handle_form_data<S>(
field: multipart::Field<S>,
fn handle_form_data(
field: actix_multipart::Field,
term: types::FieldTerminator,
form: types::Form,
) -> Box<Future<Item = MultipartContent, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
) -> Box<Future<Item = MultipartContent, Error = Error>> {
trace!("In handle_form_data, term: {:?}", term);
let term2 = term.clone();
@ -247,13 +222,10 @@ where
)
}
fn handle_stream_field<S>(
field: multipart::Field<S>,
fn handle_stream_field(
field: actix_multipart::Field,
form: types::Form,
) -> Box<Future<Item = MultipartHash, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
) -> Box<Future<Item = MultipartHash, Error = Error>> {
let content_disposition = parse_content_disposition(&field);
let name = match content_disposition.name {
@ -284,42 +256,26 @@ where
Box::new(fut.map(|content| (name, content)))
}
fn handle_stream<S>(
m: multipart::Multipart<S>,
fn handle_stream(
m: actix_multipart::Multipart,
form: types::Form,
) -> Box<Stream<Item = MultipartHash, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
) -> Box<Stream<Item = MultipartHash, Error = Error>> {
Box::new(
m.map_err(Error::from)
.map(move |item| match item {
multipart::MultipartItem::Field(field) => {
info!("Field: {:?}", field);
Box::new(
handle_stream_field(field, form.clone())
.map(From::from)
.into_stream(),
) as Box<Stream<Item = MultipartHash, Error = Error>>
}
multipart::MultipartItem::Nested(m) => {
info!("Nested");
Box::new(handle_stream(m, form.clone()))
as Box<Stream<Item = MultipartHash, Error = Error>>
}
.map(move |field| {
handle_stream_field(field, form.clone())
.map(From::from)
.into_stream()
})
.flatten(),
)
}
/// Handle multipart streams from Actix Web
pub fn handle_multipart<S>(
m: multipart::Multipart<S>,
pub fn handle_multipart(
m: actix_multipart::Multipart,
form: types::Form,
) -> Box<Future<Item = Value, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
) -> Box<Future<Item = Value, Error = Error>> {
Box::new(
handle_stream(m, form.clone())
.fold(