From a1ac05648db51ec75fd4a29292365cd4a0e002be Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 8 May 2019 18:11:26 -0500 Subject: [PATCH] Update for latest Actix version - Dump Futures CpuPool and Futures FS due to weird panic - Remove futures executor from Form, use Actix built-in threadpool and runtime - Update parsing for latest actix multipart impl --- Cargo.toml | 15 +++-- examples/simple.rs | 39 ++++++------- examples/upload.rs | 60 +++++++++---------- src/error.rs | 19 +++++- src/file_future.rs | 113 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 78 ++++++++++--------------- src/types.rs | 72 ++++------------------- src/upload.rs | 142 ++++++++++++++++----------------------------- 8 files changed, 276 insertions(+), 262 deletions(-) create mode 100644 src/file_future.rs diff --git a/Cargo.toml b/Cargo.toml index c0d1def..acfcadb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,29 +1,32 @@ [package] name = "actix-form-data" description = "Multipart Form Data for Actix Web" -version = "0.3.2" +version = "0.4.0-beta.1" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/actix-form-data.git" readme = "README.md" keywords = ["actix", "form-data", "multipart", "async"] +edition = "2018" [lib] name = "form_data" [dependencies] -actix-web = "0.7.15" +actix-multipart = "0.1.0-beta.1" +actix-rt = "0.2.2" +actix-threadpool = "0.1.0" +actix-web = "1.0.0-beta.3" bytes = "0.4.7" failure = "0.1" futures = "0.1.21" -futures-cpupool = "0.1.8" -futures-fs = "0.0.5" http = "0.1.5" log = "0.4.1" mime = "0.3.5" [dev-dependencies] -actix = "0.7.0" -env_logger = "0.5.9" +actix = "0.8.1" +env_logger = "0.6.0" serde = "1.0" +serde_json = "1.0" serde_derive = "1.0" diff --git a/examples/simple.rs b/examples/simple.rs index 017fa9e..94d51d2 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,11 +1,10 @@ -extern crate actix_web; -extern crate form_data; -extern crate futures; -extern crate mime; - use std::path::PathBuf; -use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State}; +use actix_multipart::Multipart; +use actix_web::{ + web::{post, resource, Data}, + App, HttpResponse, HttpServer, +}; use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form}; use futures::Future; @@ -19,18 +18,16 @@ impl FilenameGenerator for Gen { } } -fn upload( - (req, state): (HttpRequest
, State), -) -> Box> { - handle_multipart(req.multipart(), state.clone()) - .map(|uploaded_content| { +fn upload((mp, state): (Multipart, Data)) -> Box> { + Box::new( + handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| { println!("Uploaded Content: {:?}", uploaded_content); HttpResponse::Created().finish() - }) - .responder() + }), + ) } -fn main() { +fn main() -> Result<(), failure::Error> { let form = Form::new() .field("Hey", Field::text()) .field( @@ -44,11 +41,13 @@ fn main() { println!("{:?}", form); - server::new(move || { - App::with_state(form.clone()) - .resource("/upload", |r| r.method(http::Method::POST).with(upload)) + HttpServer::new(move || { + App::new() + .data(form.clone()) + .service(resource("/upload").route(post().to(upload))) }) - .bind("127.0.0.1:8080") - .unwrap() - .run(); + .bind("127.0.0.1:8080")? + .run()?; + + Ok(()) } diff --git a/examples/upload.rs b/examples/upload.rs index 3e952b8..9f979a0 100644 --- a/examples/upload.rs +++ b/examples/upload.rs @@ -1,29 +1,20 @@ -extern crate actix; -extern crate actix_web; -extern crate env_logger; -#[macro_use] -extern crate failure; -extern crate form_data; -extern crate futures; -#[macro_use] -extern crate log; -extern crate mime; -extern crate serde; -#[macro_use] -extern crate serde_derive; - use std::{ env, path::PathBuf, sync::atomic::{AtomicUsize, Ordering}, }; +use actix_multipart::Multipart; use actix_web::{ - error::ResponseError, http, middleware::Logger, server, App, AsyncResponder, HttpMessage, - HttpRequest, HttpResponse, State, + middleware::Logger, + web::{post, resource, Data}, + App, HttpResponse, HttpServer, ResponseError, }; +use failure::Fail; use form_data::*; use futures::Future; +use log::info; +use serde_derive::{Deserialize, Serialize}; struct Gen(AtomicUsize); @@ -89,19 +80,20 @@ impl ResponseError for Errors { } fn upload( - (req, state): (HttpRequest, State), + (mp, state): (Multipart, Data), ) -> Box> { - handle_multipart(req.multipart(), state.form.clone()) - .map(|uploaded_content| { - info!("Uploaded Content: {:?}", uploaded_content); - HttpResponse::Created().finish() - }) - .map_err(JsonError::from) - .map_err(Errors::from) - .responder() + Box::new( + handle_multipart(mp, state.form.clone()) + .map(|uploaded_content| { + info!("Uploaded Content: {:?}", uploaded_content); + HttpResponse::Created().finish() + }) + .map_err(JsonError::from) + .map_err(Errors::from), + ) } -fn main() { +fn main() -> Result<(), failure::Error> { env::set_var("RUST_LOG", "upload=info"); env_logger::init(); @@ -122,14 +114,16 @@ fn main() { let state = AppState { form }; - server::new(move || { - App::with_state(state.clone()) - .middleware(Logger::default()) - .resource("/upload", |r| r.method(http::Method::POST).with(upload)) + HttpServer::new(move || { + App::new() + .data(state.clone()) + .wrap(Logger::default()) + .service(resource("/upload").route(post().to(upload))) }) - .bind("127.0.0.1:8080") - .unwrap() + .bind("127.0.0.1:8080")? .start(); - sys.run(); + sys.run()?; + + Ok(()) } diff --git a/src/error.rs b/src/error.rs index 35f1c29..db435bf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,19 +23,23 @@ use std::{ string::FromUtf8Error, }; +use actix_multipart::MultipartError; use actix_web::{ - error::{MultipartError, PayloadError, ResponseError}, + error::{PayloadError, ResponseError}, HttpResponse, }; +use bytes::Bytes; +use failure::Fail; +use futures::sync::mpsc::SendError; #[derive(Debug, Fail)] pub enum Error { #[fail(display = "Error saving file, {}", _0)] FsPool(#[cause] io::Error), #[fail(display = "Error parsing payload, {}", _0)] - Payload(#[cause] PayloadError), + Payload(PayloadError), #[fail(display = "Error in multipart creation, {}", _0)] - Multipart(#[cause] MultipartError), + Multipart(MultipartError), #[fail(display = "Failed to parse field, {}", _0)] ParseField(#[cause] FromUtf8Error), #[fail(display = "Failed to parse int, {}", _0)] @@ -52,6 +56,8 @@ pub enum Error { MkDir, #[fail(display = "Failed to parse field name")] Field, + #[fail(display = "Could not write file")] + WriteFile, #[fail(display = "Too many fields in request")] FieldCount, #[fail(display = "Field too large")] @@ -84,6 +90,12 @@ impl From for Error { } } +impl From> for Error { + fn from(_: SendError) -> Self { + Error::WriteFile + } +} + impl ResponseError for Error { fn error_response(&self) -> HttpResponse { match *self { @@ -98,6 +110,7 @@ impl ResponseError for Error { | Error::ContentDisposition | Error::Field | Error::FieldCount + | Error::WriteFile | Error::FieldSize | Error::FieldType | Error::Filename diff --git a/src/file_future.rs b/src/file_future.rs new file mode 100644 index 0000000..c283695 --- /dev/null +++ b/src/file_future.rs @@ -0,0 +1,113 @@ +use bytes::{Bytes, BytesMut}; +use failure::Fail; +use futures::{ + sync::mpsc::{channel, SendError}, + task, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream, +}; +use std::{ + fs::File, + io::{Error, Write}, + path::Path, +}; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error in Channel")] +struct ChannelError; + +pub fn write( + filename: impl AsRef + Clone + Send + 'static, +) -> impl Sink> { + let (tx, rx) = channel(50); + + actix_rt::spawn( + actix_threadpool::run(move || { + CreateFuture::new(filename.clone()) + .from_err() + .and_then(|file| { + rx.map_err(|_| failure::Error::from(ChannelError)) + .forward(WriteSink::new(file)) + }) + .wait() + }) + .map_err(|_| ()) + .map(|_| ()), + ); + + tx +} + +struct CreateFuture

(P) +where + P: AsRef + Clone; + +impl

CreateFuture

+where + P: AsRef + Clone, +{ + fn new(path: P) -> Self { + CreateFuture(path) + } +} + +impl

Future for CreateFuture

+where + P: AsRef + Clone, +{ + type Item = File; + type Error = Error; + + fn poll(&mut self) -> Poll { + File::create(self.0.clone()).map(Async::Ready) + } +} + +struct WriteSink { + buffer: BytesMut, + file: File, +} + +impl WriteSink { + fn new(file: File) -> Self { + WriteSink { + buffer: BytesMut::new(), + file, + } + } +} + +impl Sink for WriteSink { + type SinkItem = Bytes; + type SinkError = Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + if let Async::NotReady = self.poll_complete()? { + return Ok(AsyncSink::NotReady(item)); + } + + self.buffer = BytesMut::new(); + self.buffer.extend_from_slice(&item); + + self.poll_complete()?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if self.buffer.is_empty() { + return Ok(Async::Ready(())); + } + + let written = self.file.write(&self.buffer)?; + if written == 0 { + return Err(Error::last_os_error()); + } + self.buffer.advance(written); + + if self.buffer.is_empty() { + Ok(Async::Ready(())) + } else { + task::current().notify(); + Ok(Async::NotReady) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index f5b9504..c76bc62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,19 +25,18 @@ //! # Example //! //!```rust -//! extern crate actix_web; -//! extern crate form_data; -//! extern crate futures; -//! extern crate mime; -//! //! use std::path::PathBuf; -//! -//! use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State}; +//! +//! use actix_multipart::Multipart; +//! use actix_web::{ +//! web::{post, resource, Data}, +//! App, HttpResponse, HttpServer, +//! }; //! use form_data::{handle_multipart, Error, Field, FilenameGenerator, Form}; //! use futures::Future; -//! +//! //! struct Gen; -//! +//! //! impl FilenameGenerator for Gen { //! fn next_filename(&self, _: &mime::Mime) -> Option { //! let mut p = PathBuf::new(); @@ -45,19 +44,17 @@ //! Some(p) //! } //! } -//! -//! fn upload( -//! (req, state): (HttpRequest, State), -//! ) -> Box> { -//! handle_multipart(req.multipart(), state.clone()) -//! .map(|uploaded_content| { +//! +//! fn upload((mp, state): (Multipart, Data)) -> Box> { +//! Box::new( +//! handle_multipart(mp, state.get_ref().clone()).map(|uploaded_content| { //! println!("Uploaded Content: {:?}", uploaded_content); //! HttpResponse::Created().finish() -//! }) -//! .responder() +//! }), +//! ) //! } -//! -//! fn main() { +//! +//! fn main() -> Result<(), failure::Error> { //! let form = Form::new() //! .field("Hey", Field::text()) //! .field( @@ -68,47 +65,34 @@ //! .finalize(), //! ) //! .field("files", Field::array(Field::file(Gen))); -//! +//! //! println!("{:?}", form); -//! -//! server::new(move || { -//! App::with_state(form.clone()) -//! .resource("/upload", |r| r.method(http::Method::POST).with(upload)) -//! }).bind("127.0.0.1:8080") -//! .unwrap(); -//! // .run() +//! +//! HttpServer::new(move || { +//! App::new() +//! .data(form.clone()) +//! .service(resource("/upload").route(post().to(upload))) +//! }) +//! .bind("127.0.0.1:8080")?; +//! // .run()?; +//! +//! Ok(()) //! } //!``` -extern crate actix_web; -extern crate bytes; -#[macro_use] -extern crate failure; -extern crate futures; -extern crate futures_cpupool; -extern crate futures_fs; -extern crate http; -#[macro_use] -extern crate log; -extern crate mime; -#[cfg(feature = "with-serde")] -extern crate serde; -#[cfg(feature = "with-serde")] -#[macro_use] -extern crate serde_derive; use std::path::PathBuf; mod error; +mod file_future; mod types; mod upload; -pub use self::error::Error; -pub use self::types::*; -pub use self::upload::handle_multipart; + +pub use self::{error::Error, types::*, upload::handle_multipart}; /// A trait for types that produce filenames for uploade files /// /// Currently, the mime type provided to the `next_filename` method is guessed from the uploaded /// file's original filename, so relying on this to be 100% accurate is probably a bad idea. pub trait FilenameGenerator: Send + Sync { - fn next_filename(&self, &mime::Mime) -> Option; + fn next_filename(&self, mime_type: &mime::Mime) -> Option; } diff --git a/src/types.rs b/src/types.rs index a646781..8cb7e01 100644 --- a/src/types.rs +++ b/src/types.rs @@ -25,13 +25,9 @@ use std::{ }; use bytes::Bytes; -use futures::{ - future::{ExecuteError, Executor}, - Future, -}; -use futures_cpupool::CpuPool; +use log::trace; -use super::FilenameGenerator; +use crate::FilenameGenerator; /// The result of a succesfull parse through a given multipart stream. /// @@ -74,8 +70,8 @@ impl Value { match (self, rhs) { (&mut Value::Map(ref mut hm), Value::Map(ref other)) => { other.into_iter().fold(hm, |hm, (key, value)| { - if hm.contains_key(key) { - hm.get_mut(key).unwrap().merge(value.clone()) + if let Some(v) = hm.get_mut(key) { + v.merge(value.clone()); } else { hm.insert(key.to_owned(), value.clone()); } @@ -480,16 +476,20 @@ pub struct Form { pub max_files: u32, pub max_file_size: usize, inner: Map, - pub pool: ArcExecutor, } impl Form { /// Create a new form /// - /// This also creates a new `CpuPool` to be used to stream files onto the filesystem. If you - /// wish to provide your own executor, use the `from_executor` method. + /// If you wish to provide your own executor, use the `with_executor` method. pub fn new() -> Self { - Form::from_executor(CpuPool::new_num_cpus()) + Form { + max_fields: 100, + max_field_size: 10_000, + max_files: 20, + max_file_size: 10_000_000, + inner: Map::new(), + } } /// Set the maximum number of fields allowed in the upload @@ -528,23 +528,6 @@ impl Form { self } - /// Create a new form with a given executor - /// - /// This executor is used to stream files onto the filesystem. - pub fn from_executor(executor: E) -> Self - where - E: Executor + Send>> + Send + Sync + Clone + 'static, - { - Form { - max_fields: 100, - max_field_size: 10_000, - max_files: 20, - max_file_size: 10_000_000, - inner: Map::new(), - pool: ArcExecutor::new(executor), - } - } - pub fn field(mut self, name: &str, field: Field) -> Self { self.inner = self.inner.field(name, field); @@ -562,37 +545,6 @@ impl fmt::Debug for Form { } } -/// The executor type stored inside a `Form` -/// -/// Any executor capable of being shared and executing boxed futures can be stored here. -#[derive(Clone)] -pub struct ArcExecutor { - inner: Arc + Send>> + Send + Sync + 'static>, -} - -impl ArcExecutor { - /// Create a new ArcExecutor from an Executor - /// - /// This essentially wraps the given executor in an Arc - pub fn new(executor: E) -> Self - where - E: Executor + Send>> + Send + Sync + Clone + 'static, - { - ArcExecutor { - inner: Arc::new(executor), - } - } -} - -impl Executor + Send>> for ArcExecutor where { - fn execute( - &self, - future: Box + Send>, - ) -> Result<(), ExecuteError + Send>>> { - self.inner.execute(future) - } -} - #[derive(Clone, Debug, PartialEq)] pub(crate) struct ContentDisposition { pub name: Option, diff --git a/src/upload.rs b/src/upload.rs index e350fa7..7e6ec92 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -27,19 +27,19 @@ use std::{ }, }; -use actix_web::{error::PayloadError, multipart}; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::{ - future::{lazy, result, Either, Executor}, - sync::oneshot, + future::{result, Either}, Future, Stream, }; -use futures_fs::FsPool; +use log::trace; -use super::FilenameGenerator; -use error::Error; -use types::{ - self, ContentDisposition, MultipartContent, MultipartForm, MultipartHash, NamePart, Value, +use crate::{ + error::Error, + types::{ + self, ContentDisposition, MultipartContent, MultipartForm, MultipartHash, NamePart, Value, + }, + FilenameGenerator, }; fn consolidate(mf: MultipartForm) -> Value { @@ -74,7 +74,7 @@ fn parse_multipart_name(name: String) -> Result, Error> { if part.len() == 1 && part.ends_with(']') { NamePart::Array } else if part.ends_with(']') { - NamePart::Map(part.trim_right_matches(']').to_owned()) + NamePart::Map(part.trim_end_matches(']').to_owned()) } else { NamePart::Map(part.to_owned()) } @@ -92,10 +92,7 @@ fn parse_multipart_name(name: String) -> Result, Error> { }) } -fn parse_content_disposition(field: &multipart::Field) -> ContentDisposition -where - S: Stream, -{ +fn parse_content_disposition(field: &actix_multipart::Field) -> ContentDisposition { match field.content_disposition() { Some(x) => ContentDisposition { name: x.get_name().map(|v| v.to_string()), @@ -124,15 +121,12 @@ fn build_dir(stored_dir: PathBuf) -> Result<(), Error> { .map_err(|_| Error::MkDir) } -fn handle_file_upload( - field: multipart::Field, +fn handle_file_upload( + field: actix_multipart::Field, gen: Arc, filename: Option, form: types::Form, -) -> Box> -where - S: Stream + 'static, -{ +) -> Box> { let filename = match filename { Some(filename) => filename, None => return Box::new(result(Err(Error::Filename))), @@ -155,55 +149,36 @@ where let mut stored_dir = stored_as.clone(); stored_dir.pop(); - let (tx, rx) = oneshot::channel(); - - match form.pool.execute(Box::new(lazy(move || { - let res = build_dir(stored_dir.clone()); - - tx.send(res).map_err(|_| ()) - }))) { - Ok(_) => (), - Err(_) => return Box::new(result(Err(Error::MkDir))), - }; + let mkdir_fut = actix_threadpool::run(move || build_dir(stored_dir.clone())); let counter = Arc::new(AtomicUsize::new(0)); - Box::new( - rx.then(|res| match res { - Ok(res) => res, - Err(_) => Err(Error::MkDir), - }) - .and_then(move |_| { - let write = FsPool::with_executor(form.pool.clone()) - .write(stored_as.clone(), Default::default()); - field - .map_err(Error::Multipart) - .and_then(move |bytes| { - let size = counter.fetch_add(bytes.len(), Ordering::Relaxed) + bytes.len(); + Box::new(mkdir_fut.map_err(|_| Error::MkDir).and_then(move |_| { + let write = crate::file_future::write(stored_as.clone()); + field + .map_err(Error::Multipart) + .and_then(move |bytes| { + let size = counter.fetch_add(bytes.len(), Ordering::Relaxed) + bytes.len(); - if size > form.max_file_size { - Err(Error::FileSize) - } else { - Ok(bytes) - } - }) - .forward(write) - .map(move |_| MultipartContent::File { - filename, - stored_as, - }) - }), - ) + if size > form.max_file_size { + Err(Error::FileSize) + } else { + Ok(bytes) + } + }) + .forward(write) + .map(move |_| MultipartContent::File { + filename, + stored_as, + }) + })) } -fn handle_form_data( - field: multipart::Field, +fn handle_form_data( + field: actix_multipart::Field, term: types::FieldTerminator, form: types::Form, -) -> Box> -where - S: Stream + 'static, -{ +) -> Box> { trace!("In handle_form_data, term: {:?}", term); let term2 = term.clone(); @@ -247,13 +222,10 @@ where ) } -fn handle_stream_field( - field: multipart::Field, +fn handle_stream_field( + field: actix_multipart::Field, form: types::Form, -) -> Box> -where - S: Stream + 'static, -{ +) -> Box> { let content_disposition = parse_content_disposition(&field); let name = match content_disposition.name { @@ -284,42 +256,26 @@ where Box::new(fut.map(|content| (name, content))) } -fn handle_stream( - m: multipart::Multipart, +fn handle_stream( + m: actix_multipart::Multipart, form: types::Form, -) -> Box> -where - S: Stream + 'static, -{ +) -> Box> { Box::new( m.map_err(Error::from) - .map(move |item| match item { - multipart::MultipartItem::Field(field) => { - info!("Field: {:?}", field); - Box::new( - handle_stream_field(field, form.clone()) - .map(From::from) - .into_stream(), - ) as Box> - } - multipart::MultipartItem::Nested(m) => { - info!("Nested"); - Box::new(handle_stream(m, form.clone())) - as Box> - } + .map(move |field| { + handle_stream_field(field, form.clone()) + .map(From::from) + .into_stream() }) .flatten(), ) } /// Handle multipart streams from Actix Web -pub fn handle_multipart( - m: multipart::Multipart, +pub fn handle_multipart( + m: actix_multipart::Multipart, form: types::Form, -) -> Box> -where - S: Stream + 'static, -{ +) -> Box> { Box::new( handle_stream(m, form.clone()) .fold(