commit a98ac8b2a83fd822183bbb16d6f57ae5e5c7a744 Author: asonix Date: Sun Apr 29 17:31:06 2018 -0500 Initial multipart formdata library This handles servers, not clients diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8c38c0e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target +**/*.rs.bk +Cargo.lock +/examples/filename*.png diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..238f21d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "actix-multipart" +version = "0.1.0" +authors = ["asonix "] + +[dependencies] +actix = "0.5.6" +actix-web = "0.5.6" +bytes = "0.4.7" +failure = "0.1" +futures = "0.1.21" +futures-cpupool = "0.1.8" +futures-fs = "0.0.4" +http = "0.1.5" +log = "0.4.1" +mime = "0.3.5" + +[dev-dependencies] +env_logger = "0.5.9" +serde = "1.0" +serde_derive = "1.0" diff --git a/examples/.python-version b/examples/.python-version new file mode 100644 index 0000000..40c341b --- /dev/null +++ b/examples/.python-version @@ -0,0 +1 @@ +3.6.0 diff --git a/examples/client.py b/examples/client.py new file mode 100755 index 0000000..fba5f4e --- /dev/null +++ b/examples/client.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# This script could be used for actix-web multipart example test +# just start server and run client.py + +import asyncio +import aiofiles +import aiohttp + +file_name = 'test.png' +url = 'http://localhost:8080/upload' + +async def file_sender(file_name=None): + async with aiofiles.open(file_name, 'rb') as f: + chunk = await f.read(64*1024) + while chunk: + yield chunk + chunk = await f.read(64*1024) + + +async def req(): + async with aiohttp.ClientSession() as session: + data = aiohttp.FormData(quote_fields=False) + data.add_field("files[]", file_sender(file_name=file_name), filename="image1.png") + data.add_field("files[]", file_sender(file_name=file_name), filename="image2.png") + data.add_field("files[]", file_sender(file_name=file_name), filename="image3.png") + data.add_field("Hey", "hi") + data.add_field("Hi[One]", "1") + data.add_field("Hi[Two]", "2.0") + + async with session.post(url, data=data) as resp: + text = await resp.text() + print(text) + assert 201 == resp.status + + +loop = asyncio.get_event_loop() +loop.run_until_complete(req()) diff --git a/examples/test.png b/examples/test.png new file mode 100644 index 0000000..a58edc8 Binary files /dev/null and b/examples/test.png differ diff --git a/examples/upload.rs b/examples/upload.rs new file mode 100644 index 0000000..3d12232 --- /dev/null +++ b/examples/upload.rs @@ -0,0 +1,129 @@ +extern crate actix; +extern crate actix_multipart; +extern crate actix_web; +extern crate env_logger; +#[macro_use] +extern crate failure; +extern crate futures; +#[macro_use] +extern crate log; +extern crate mime; +extern crate serde; +#[macro_use] +extern crate serde_derive; + +use std::{env, path::PathBuf, sync::atomic::{AtomicUsize, Ordering}}; + +use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State, + error::ResponseError, middleware::Logger}; +use actix_multipart::*; +use futures::Future; + +struct Gen(AtomicUsize); + +impl Gen { + pub fn new() -> Self { + Gen(AtomicUsize::new(0)) + } +} + +impl FilenameGenerator for Gen { + fn next_filename(&self, _: &mime::Mime) -> Option { + let mut p = PathBuf::new(); + p.push("examples"); + p.push(&format!( + "filename{}.png", + self.0.fetch_add(1, Ordering::Relaxed) + )); + Some(p) + } +} + +#[derive(Clone, Debug)] +struct AppState { + form: Form, +} + +#[derive(Clone, Debug, Deserialize, Fail, Serialize)] +#[fail(display = "{}", msg)] +struct JsonError { + msg: String, +} + +impl From for JsonError { + fn from(e: Error) -> Self { + JsonError { + msg: format!("{}", e), + } + } +} + +impl ResponseError for JsonError { + fn error_response(&self) -> HttpResponse { + HttpResponse::BadRequest().json(Errors::from(self.clone())) + } +} + +#[derive(Clone, Debug, Deserialize, Fail, Serialize)] +#[fail(display = "Errors occurred")] +struct Errors { + errors: Vec, +} + +impl From for Errors { + fn from(e: JsonError) -> Self { + Errors { errors: vec![e] } + } +} + +impl ResponseError for Errors { + fn error_response(&self) -> HttpResponse { + HttpResponse::BadRequest().json(self) + } +} + +fn upload( + req: HttpRequest, + state: State, +) -> Box> { + handle_upload(req.multipart(), state.form.clone()) + .map(|uploaded_content| { + info!("Uploaded Content: {:?}", uploaded_content); + HttpResponse::Created().finish() + }) + .map_err(JsonError::from) + .map_err(Errors::from) + .responder() +} + +fn main() { + env::set_var("RUST_LOG", "upload=info"); + env_logger::init(); + + let sys = actix::System::new("upload-test"); + + let form = Form::new() + .field("Hey", Field::text()) + .field( + "Hi", + Field::map() + .field("One", Field::int()) + .field("Two", Field::float()) + .finalize(), + ) + .field("files", Field::array(Field::file(Gen::new()))); + + info!("{:?}", form); + + let state = AppState { form }; + + server::new(move || { + App::with_state(state.clone()) + .middleware(Logger::default()) + .resource("/upload", |r| r.method(http::Method::POST).with2(upload)) + }).bind("127.0.0.1:8080") + .unwrap() + .start(); + + sys.run(); +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..b95dca8 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,59 @@ +use std::{io, num::{ParseFloatError, ParseIntError}, string::FromUtf8Error}; + +use actix_web::error::{MultipartError, PayloadError}; + +#[derive(Debug, Fail)] +pub enum Error { + #[fail(display = "Error saving file, {}", _0)] + FsPool(#[cause] io::Error), + #[fail(display = "Error parsing payload, {}", _0)] + Payload(#[cause] PayloadError), + #[fail(display = "Error in multipart creation, {}", _0)] + Multipart(#[cause] MultipartError), + #[fail(display = "Failed to parse field, {}", _0)] + ParseField(#[cause] FromUtf8Error), + #[fail(display = "Failed to parse int, {}", _0)] + ParseInt(#[cause] ParseIntError), + #[fail(display = "Failed to parse float, {}", _0)] + ParseFloat(#[cause] ParseFloatError), + #[fail(display = "Failed to generate filename")] + GenFilename, + #[fail(display = "Bad Content-Type")] + ContentType, + #[fail(display = "Bad Content-Disposition")] + ContentDisposition, + #[fail(display = "Failed to make directory for upload")] + MkDir, + #[fail(display = "Failed to parse field name")] + Field, + #[fail(display = "Too many fields in request")] + FieldCount, + #[fail(display = "Field too large")] + FieldSize, + #[fail(display = "Found field with unexpected name or type")] + FieldType, + #[fail(display = "Failed to parse filename")] + Filename, + #[fail(display = "Too many files in request")] + FileCount, + #[fail(display = "File too large")] + FileSize, +} + +impl From for Error { + fn from(e: MultipartError) -> Self { + Error::Multipart(e) + } +} + +impl From for Error { + fn from(e: PayloadError) -> Self { + Error::Payload(e) + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::FsPool(e) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..666ea94 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,24 @@ +extern crate actix_web; +extern crate bytes; +#[macro_use] +extern crate failure; +extern crate futures; +extern crate futures_cpupool; +extern crate futures_fs; +extern crate http; +#[macro_use] +extern crate log; +extern crate mime; + +use std::path::PathBuf; + +mod error; +mod types; +mod upload; +pub use self::error::Error; +pub use self::types::*; +pub use self::upload::handle_upload; + +pub trait FilenameGenerator: Send + Sync { + fn next_filename(&self, &mime::Mime) -> Option; +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..9a83d5c --- /dev/null +++ b/src/types.rs @@ -0,0 +1,274 @@ +use std::{fmt, collections::VecDeque, sync::Arc}; + +use futures::{Future, future::{ExecuteError, Executor}}; +use futures_cpupool::CpuPool; + +use super::FilenameGenerator; + +#[derive(Clone, Debug, PartialEq)] +pub enum NamePart { + Map(String), + Array, +} + +impl NamePart { + pub fn is_map(&self) -> bool { + match *self { + NamePart::Map(_) => true, + _ => false, + } + } +} + +#[derive(Clone)] +pub enum FieldTerminator { + File(Arc), + Int, + Float, + Text, +} + +impl fmt::Debug for FieldTerminator { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + FieldTerminator::File(_) => write!(f, "File(filename_generator)"), + FieldTerminator::Int => write!(f, "Int"), + FieldTerminator::Float => write!(f, "Float"), + FieldTerminator::Text => write!(f, "Text"), + } + } +} + +#[derive(Clone)] +pub enum Field { + Array(Array), + File(Arc), + Map(Map), + Int, + Float, + Text, +} + +impl fmt::Debug for Field { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Field::Array(ref arr) => write!(f, "Array({:?})", arr), + Field::File(_) => write!(f, "File(filename_generator)"), + Field::Map(ref map) => write!(f, "Map({:?})", map), + Field::Int => write!(f, "Int"), + Field::Float => write!(f, "Float"), + Field::Text => write!(f, "Text"), + } + } +} + +impl Field { + pub fn file(gen: T) -> Self + where + T: FilenameGenerator + 'static, + { + Field::File(Arc::new(gen)) + } + + pub fn text() -> Self { + Field::Text + } + + pub fn int() -> Self { + Field::Int + } + + pub fn float() -> Self { + Field::Float + } + + pub fn array(field: Field) -> Self { + Field::Array(Array::new(field)) + } + + pub fn map() -> Map { + Map::new() + } + + fn valid_field(&self, name: VecDeque) -> Option { + trace!("Checking {:?} and {:?}", self, name); + match *self { + Field::Array(ref arr) => arr.valid_field(name), + Field::Map(ref map) => map.valid_field(name), + Field::File(ref gen) => if name.is_empty() { + Some(FieldTerminator::File(Arc::clone(gen))) + } else { + None + }, + Field::Int => if name.is_empty() { + Some(FieldTerminator::Int) + } else { + None + }, + Field::Float => if name.is_empty() { + Some(FieldTerminator::Float) + } else { + None + }, + Field::Text => if name.is_empty() { + Some(FieldTerminator::Text) + } else { + None + }, + } + } +} + +#[derive(Debug, Clone)] +pub struct Array { + inner: Box, +} + +impl Array { + fn new(field: Field) -> Self { + Array { + inner: Box::new(field), + } + } + + fn valid_field(&self, mut name: VecDeque) -> Option { + trace!("Checking {:?} and {:?}", self, name); + match name.pop_front() { + Some(name_part) => match name_part { + NamePart::Array => self.inner.valid_field(name), + _ => None, + }, + None => None, + } + } +} + +#[derive(Debug, Clone)] +pub struct Map { + inner: Vec<(String, Field)>, +} + +impl Map { + fn new() -> Self { + Map { inner: Vec::new() } + } + + pub fn field(mut self, key: &str, value: Field) -> Self { + self.inner.push((key.to_owned(), value)); + + self + } + + pub fn finalize(self) -> Field { + Field::Map(self) + } + + fn valid_field(&self, mut name: VecDeque) -> Option { + trace!("Checking {:?} and {:?}", self, name); + match name.pop_front() { + Some(name_part) => match name_part { + NamePart::Map(part_name) => self.inner + .iter() + .find(|(item, _)| *item == part_name) + .and_then(|(_, field)| field.valid_field(name)), + _ => None, + }, + None => None, + } + } +} + +#[derive(Clone)] +pub struct Form { + pub max_fields: u32, + pub max_field_size: u32, + pub max_files: u32, + pub max_file_size: u32, + inner: Map, + pub pool: ArcExecutor, +} + +impl Form { + pub fn new() -> Self { + Form::from_executor(CpuPool::new_num_cpus()) + } + + pub fn max_fields(mut self, max: u32) -> Self { + self.max_fields = max; + + self + } + + pub fn max_field_size(mut self, max: u32) -> Self { + self.max_field_size = max; + + self + } + + pub fn max_files(mut self, max: u32) -> Self { + self.max_files = max; + + self + } + + pub fn max_file_size(mut self, max: u32) -> Self { + self.max_file_size = max; + + self + } + + pub fn from_executor(executor: E) -> Self + where + E: Executor + Send>> + Send + Sync + Clone + 'static, + { + Form { + max_fields: 100, + max_field_size: 10_000, + max_files: 20, + max_file_size: 10_000_000, + inner: Map::new(), + pool: ArcExecutor::new(executor), + } + } + + pub fn field(mut self, name: &str, field: Field) -> Self { + self.inner = self.inner.field(name, field); + + self + } + + pub fn valid_field(&self, name: VecDeque) -> Option { + self.inner.valid_field(name.clone()) + } +} + +impl fmt::Debug for Form { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Form({:?})", self.inner) + } +} + +#[derive(Clone)] +pub struct ArcExecutor { + inner: Arc + Send>> + Send + Sync + 'static>, +} + +impl ArcExecutor { + pub fn new(executor: E) -> Self + where + E: Executor + Send>> + Send + Sync + Clone + 'static, + { + ArcExecutor { + inner: Arc::new(executor), + } + } +} + +impl Executor + Send>> for ArcExecutor where { + fn execute( + &self, + future: Box + Send>, + ) -> Result<(), ExecuteError + Send>>> { + self.inner.execute(future) + } +} diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..5ca7eef --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,330 @@ +use std::{fs::DirBuilder, os::unix::fs::DirBuilderExt, path::{Path, PathBuf}, sync::Arc}; + +use actix_web::{multipart, error::PayloadError}; +use bytes::{Bytes, BytesMut}; +use futures::{Future, Stream, future::{lazy, result, Either, Executor}, sync::oneshot}; +use futures_fs::FsPool; +use http::header::CONTENT_DISPOSITION; + +use error::Error; +use super::FilenameGenerator; +use types::{self, NamePart}; + +type MultipartHash = (Vec, MultipartContent); + +#[derive(Clone, Debug, PartialEq)] +pub enum MultipartContent { + File { + filename: String, + stored_as: PathBuf, + }, + Text(String), + Int(i64), + Float(f64), +} + +pub type MultipartForm = Vec; + +fn parse_multipart_name(name: String) -> Result, Error> { + name.split('[') + .map(|part| { + if part.len() == 1 && part.ends_with(']') { + NamePart::Array + } else if part.ends_with(']') { + NamePart::Map(part.trim_right_matches(']').to_owned()) + } else { + NamePart::Map(part.to_owned()) + } + }) + .fold(Ok(vec![]), |acc, part| match acc { + Ok(mut v) => { + if v.len() == 0 && !part.is_map() { + return Err(Error::ContentDisposition); + } + + v.push(part); + Ok(v) + } + Err(e) => Err(e), + }) +} + +pub struct ContentDisposition { + name: Option, + filename: Option, +} + +impl ContentDisposition { + fn empty() -> Self { + ContentDisposition { + name: None, + filename: None, + } + } +} + +fn parse_content_disposition(field: &multipart::Field) -> Result +where + S: Stream, +{ + let content_disposition = if let Some(cd) = field.headers().get(CONTENT_DISPOSITION) { + cd + } else { + return Err(Error::ContentDisposition); + }; + + let content_disposition = if let Ok(cd) = content_disposition.to_str() { + cd + } else { + return Err(Error::ContentDisposition); + }; + + Ok(content_disposition + .split(';') + .skip(1) + .filter_map(|section| { + let mut parts = section.splitn(2, '='); + + let key = if let Some(key) = parts.next() { + key.trim() + } else { + return None; + }; + + let val = if let Some(val) = parts.next() { + val.trim() + } else { + return None; + }; + + Some((key, val.trim_matches('"'))) + }) + .fold(ContentDisposition::empty(), |mut acc, (key, val)| { + if key == "name" { + acc.name = Some(val.to_owned()); + } else if key == "filename" { + acc.filename = Some(val.to_owned()); + } + acc + })) +} + +fn handle_file_upload( + field: multipart::Field, + gen: Arc, + filename: Option, + form: types::Form, +) -> impl Future +where + S: Stream, +{ + let filename = match filename { + Some(filename) => filename, + None => return Either::B(result(Err(Error::Filename))), + }; + + let path: &Path = filename.as_ref(); + let filename = path.file_name().and_then(|filename| filename.to_str()); + + let filename = if let Some(filename) = filename { + filename.to_owned() + } else { + return Either::B(result(Err(Error::Filename))); + }; + + let stored_as = match gen.next_filename(field.content_type()) { + Some(file_path) => file_path, + None => return Either::B(result(Err(Error::GenFilename))), + }; + + let mut stored_dir = stored_as.clone(); + stored_dir.pop(); + + let (tx, rx) = oneshot::channel(); + + match form.pool.execute(Box::new(lazy(move || { + let res = DirBuilder::new() + .recursive(true) + .mode(0o755) + .create(stored_dir.clone()) + .map_err(|_| Error::MkDir); + + tx.send(res).map_err(|_| ()) + }))) { + Ok(_) => (), + Err(_) => return Either::B(result(Err(Error::MkDir))), + }; + + Either::A(rx.then(|res| match res { + Ok(res) => res, + Err(_) => Err(Error::MkDir), + }).and_then(move |_| { + let write = + FsPool::from_executor(form.pool.clone()).write(stored_as.clone(), Default::default()); + field + .map_err(Error::Multipart) + .forward(write) + .map(move |_| MultipartContent::File { + filename, + stored_as, + }) + })) +} + +fn handle_form_data( + field: multipart::Field, + term: types::FieldTerminator, +) -> impl Future +where + S: Stream, +{ + trace!("In handle_form_data, term: {:?}", term); + let max_body_size = 80000; + + field + .from_err() + .fold(BytesMut::new(), move |mut acc, bytes| { + if acc.len() + bytes.len() < max_body_size { + acc.extend(bytes); + Ok(acc) + } else { + Err(Error::FieldSize) + } + }) + .and_then(|bytes| String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)) + .and_then(move |string| { + trace!("Matching: {:?}", string); + match term { + types::FieldTerminator::File(_) => Err(Error::FieldType), + types::FieldTerminator::Float => string + .parse::() + .map(MultipartContent::Float) + .map_err(Error::ParseFloat), + types::FieldTerminator::Int => string + .parse::() + .map(MultipartContent::Int) + .map_err(Error::ParseInt), + types::FieldTerminator::Text => Ok(MultipartContent::Text(string)), + } + }) +} + +fn handle_multipart_field( + field: multipart::Field, + form: types::Form, +) -> impl Future +where + S: Stream, +{ + let content_disposition = match parse_content_disposition(&field) { + Ok(cd) => cd, + Err(e) => return Either::B(result(Err(e))), + }; + + let name = match content_disposition.name { + Some(name) => name, + None => return Either::B(result(Err(Error::Field))), + }; + + let name = match parse_multipart_name(name) { + Ok(name) => name, + Err(e) => return Either::B(result(Err(e))), + }; + + let term = match form.valid_field(name.iter().cloned().collect()) { + Some(term) => term, + None => return Either::B(result(Err(Error::FieldType))), + }; + + let fut = match term { + types::FieldTerminator::File(gen) => Either::A(handle_file_upload( + field, + gen, + content_disposition.filename, + form, + )), + term => Either::B(handle_form_data(field, term)), + }; + + Either::A(fut.map(|content| (name, content))) +} + +pub fn handle_multipart( + m: multipart::Multipart, + form: types::Form, +) -> Box> +where + S: Stream + 'static, +{ + Box::new( + m.map_err(Error::from) + .map(move |item| match item { + multipart::MultipartItem::Field(field) => { + info!("Field: {:?}", field); + Box::new( + handle_multipart_field(field, form.clone()) + .map(From::from) + .into_stream(), + ) as Box> + } + multipart::MultipartItem::Nested(m) => { + info!("Nested"); + Box::new(handle_multipart(m, form.clone())) + as Box> + } + }) + .flatten(), + ) +} + +pub fn handle_upload( + m: multipart::Multipart, + form: types::Form, +) -> impl Future +where + S: Stream + 'static, +{ + let max_files = 10; + let max_fields = 100; + + handle_multipart(m, form) + .fold( + (Vec::new(), 0, 0), + move |(mut acc, file_count, field_count), (name, content)| match content { + MultipartContent::File { + filename, + stored_as, + } => { + let file_count = file_count + 1; + + if file_count < max_files { + acc.push(( + name, + MultipartContent::File { + filename, + stored_as, + }, + )); + + Ok((acc, file_count, field_count)) + } else { + Err(Error::FileCount) + } + } + b @ MultipartContent::Text(_) + | b @ MultipartContent::Float(_) + | b @ MultipartContent::Int(_) => { + let field_count = field_count + 1; + + if field_count < max_fields { + acc.push((name, b)); + + Ok((acc, file_count, field_count)) + } else { + Err(Error::FieldCount) + } + } + }, + ) + .map(|(multipart_form, _, _)| multipart_form) +}