Initial multipart formdata library

This handles servers, not clients
This commit is contained in:
asonix 2018-04-29 17:31:06 -05:00
commit a98ac8b2a8
10 changed files with 880 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
**/*.rs.bk
Cargo.lock
/examples/filename*.png

21
Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "actix-multipart"
version = "0.1.0"
authors = ["asonix <asonix.dev@gmail.com>"]
[dependencies]
actix = "0.5.6"
actix-web = "0.5.6"
bytes = "0.4.7"
failure = "0.1"
futures = "0.1.21"
futures-cpupool = "0.1.8"
futures-fs = "0.0.4"
http = "0.1.5"
log = "0.4.1"
mime = "0.3.5"
[dev-dependencies]
env_logger = "0.5.9"
serde = "1.0"
serde_derive = "1.0"

1
examples/.python-version Normal file
View File

@ -0,0 +1 @@
3.6.0

38
examples/client.py Executable file
View File

@ -0,0 +1,38 @@
#!/usr/bin/env python
# This script could be used for actix-web multipart example test
# just start server and run client.py
import asyncio
import aiofiles
import aiohttp
file_name = 'test.png'
url = 'http://localhost:8080/upload'
async def file_sender(file_name=None):
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(64*1024)
while chunk:
yield chunk
chunk = await f.read(64*1024)
async def req():
async with aiohttp.ClientSession() as session:
data = aiohttp.FormData(quote_fields=False)
data.add_field("files[]", file_sender(file_name=file_name), filename="image1.png")
data.add_field("files[]", file_sender(file_name=file_name), filename="image2.png")
data.add_field("files[]", file_sender(file_name=file_name), filename="image3.png")
data.add_field("Hey", "hi")
data.add_field("Hi[One]", "1")
data.add_field("Hi[Two]", "2.0")
async with session.post(url, data=data) as resp:
text = await resp.text()
print(text)
assert 201 == resp.status
loop = asyncio.get_event_loop()
loop.run_until_complete(req())

BIN
examples/test.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 299 KiB

129
examples/upload.rs Normal file
View File

@ -0,0 +1,129 @@
extern crate actix;
extern crate actix_multipart;
extern crate actix_web;
extern crate env_logger;
#[macro_use]
extern crate failure;
extern crate futures;
#[macro_use]
extern crate log;
extern crate mime;
extern crate serde;
#[macro_use]
extern crate serde_derive;
use std::{env, path::PathBuf, sync::atomic::{AtomicUsize, Ordering}};
use actix_web::{http, server, App, AsyncResponder, HttpMessage, HttpRequest, HttpResponse, State,
error::ResponseError, middleware::Logger};
use actix_multipart::*;
use futures::Future;
struct Gen(AtomicUsize);
impl Gen {
pub fn new() -> Self {
Gen(AtomicUsize::new(0))
}
}
impl FilenameGenerator for Gen {
fn next_filename(&self, _: &mime::Mime) -> Option<PathBuf> {
let mut p = PathBuf::new();
p.push("examples");
p.push(&format!(
"filename{}.png",
self.0.fetch_add(1, Ordering::Relaxed)
));
Some(p)
}
}
#[derive(Clone, Debug)]
struct AppState {
form: Form,
}
#[derive(Clone, Debug, Deserialize, Fail, Serialize)]
#[fail(display = "{}", msg)]
struct JsonError {
msg: String,
}
impl From<Error> for JsonError {
fn from(e: Error) -> Self {
JsonError {
msg: format!("{}", e),
}
}
}
impl ResponseError for JsonError {
fn error_response(&self) -> HttpResponse {
HttpResponse::BadRequest().json(Errors::from(self.clone()))
}
}
#[derive(Clone, Debug, Deserialize, Fail, Serialize)]
#[fail(display = "Errors occurred")]
struct Errors {
errors: Vec<JsonError>,
}
impl From<JsonError> for Errors {
fn from(e: JsonError) -> Self {
Errors { errors: vec![e] }
}
}
impl ResponseError for Errors {
fn error_response(&self) -> HttpResponse {
HttpResponse::BadRequest().json(self)
}
}
fn upload(
req: HttpRequest<AppState>,
state: State<AppState>,
) -> Box<Future<Item = HttpResponse, Error = Errors>> {
handle_upload(req.multipart(), state.form.clone())
.map(|uploaded_content| {
info!("Uploaded Content: {:?}", uploaded_content);
HttpResponse::Created().finish()
})
.map_err(JsonError::from)
.map_err(Errors::from)
.responder()
}
fn main() {
env::set_var("RUST_LOG", "upload=info");
env_logger::init();
let sys = actix::System::new("upload-test");
let form = Form::new()
.field("Hey", Field::text())
.field(
"Hi",
Field::map()
.field("One", Field::int())
.field("Two", Field::float())
.finalize(),
)
.field("files", Field::array(Field::file(Gen::new())));
info!("{:?}", form);
let state = AppState { form };
server::new(move || {
App::with_state(state.clone())
.middleware(Logger::default())
.resource("/upload", |r| r.method(http::Method::POST).with2(upload))
}).bind("127.0.0.1:8080")
.unwrap()
.start();
sys.run();
}

59
src/error.rs Normal file
View File

@ -0,0 +1,59 @@
use std::{io, num::{ParseFloatError, ParseIntError}, string::FromUtf8Error};
use actix_web::error::{MultipartError, PayloadError};
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "Error saving file, {}", _0)]
FsPool(#[cause] io::Error),
#[fail(display = "Error parsing payload, {}", _0)]
Payload(#[cause] PayloadError),
#[fail(display = "Error in multipart creation, {}", _0)]
Multipart(#[cause] MultipartError),
#[fail(display = "Failed to parse field, {}", _0)]
ParseField(#[cause] FromUtf8Error),
#[fail(display = "Failed to parse int, {}", _0)]
ParseInt(#[cause] ParseIntError),
#[fail(display = "Failed to parse float, {}", _0)]
ParseFloat(#[cause] ParseFloatError),
#[fail(display = "Failed to generate filename")]
GenFilename,
#[fail(display = "Bad Content-Type")]
ContentType,
#[fail(display = "Bad Content-Disposition")]
ContentDisposition,
#[fail(display = "Failed to make directory for upload")]
MkDir,
#[fail(display = "Failed to parse field name")]
Field,
#[fail(display = "Too many fields in request")]
FieldCount,
#[fail(display = "Field too large")]
FieldSize,
#[fail(display = "Found field with unexpected name or type")]
FieldType,
#[fail(display = "Failed to parse filename")]
Filename,
#[fail(display = "Too many files in request")]
FileCount,
#[fail(display = "File too large")]
FileSize,
}
impl From<MultipartError> for Error {
fn from(e: MultipartError) -> Self {
Error::Multipart(e)
}
}
impl From<PayloadError> for Error {
fn from(e: PayloadError) -> Self {
Error::Payload(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::FsPool(e)
}
}

24
src/lib.rs Normal file
View File

@ -0,0 +1,24 @@
extern crate actix_web;
extern crate bytes;
#[macro_use]
extern crate failure;
extern crate futures;
extern crate futures_cpupool;
extern crate futures_fs;
extern crate http;
#[macro_use]
extern crate log;
extern crate mime;
use std::path::PathBuf;
mod error;
mod types;
mod upload;
pub use self::error::Error;
pub use self::types::*;
pub use self::upload::handle_upload;
pub trait FilenameGenerator: Send + Sync {
fn next_filename(&self, &mime::Mime) -> Option<PathBuf>;
}

274
src/types.rs Normal file
View File

@ -0,0 +1,274 @@
use std::{fmt, collections::VecDeque, sync::Arc};
use futures::{Future, future::{ExecuteError, Executor}};
use futures_cpupool::CpuPool;
use super::FilenameGenerator;
#[derive(Clone, Debug, PartialEq)]
pub enum NamePart {
Map(String),
Array,
}
impl NamePart {
pub fn is_map(&self) -> bool {
match *self {
NamePart::Map(_) => true,
_ => false,
}
}
}
#[derive(Clone)]
pub enum FieldTerminator {
File(Arc<FilenameGenerator>),
Int,
Float,
Text,
}
impl fmt::Debug for FieldTerminator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
FieldTerminator::File(_) => write!(f, "File(filename_generator)"),
FieldTerminator::Int => write!(f, "Int"),
FieldTerminator::Float => write!(f, "Float"),
FieldTerminator::Text => write!(f, "Text"),
}
}
}
#[derive(Clone)]
pub enum Field {
Array(Array),
File(Arc<FilenameGenerator>),
Map(Map),
Int,
Float,
Text,
}
impl fmt::Debug for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Field::Array(ref arr) => write!(f, "Array({:?})", arr),
Field::File(_) => write!(f, "File(filename_generator)"),
Field::Map(ref map) => write!(f, "Map({:?})", map),
Field::Int => write!(f, "Int"),
Field::Float => write!(f, "Float"),
Field::Text => write!(f, "Text"),
}
}
}
impl Field {
pub fn file<T>(gen: T) -> Self
where
T: FilenameGenerator + 'static,
{
Field::File(Arc::new(gen))
}
pub fn text() -> Self {
Field::Text
}
pub fn int() -> Self {
Field::Int
}
pub fn float() -> Self {
Field::Float
}
pub fn array(field: Field) -> Self {
Field::Array(Array::new(field))
}
pub fn map() -> Map {
Map::new()
}
fn valid_field(&self, name: VecDeque<NamePart>) -> Option<FieldTerminator> {
trace!("Checking {:?} and {:?}", self, name);
match *self {
Field::Array(ref arr) => arr.valid_field(name),
Field::Map(ref map) => map.valid_field(name),
Field::File(ref gen) => if name.is_empty() {
Some(FieldTerminator::File(Arc::clone(gen)))
} else {
None
},
Field::Int => if name.is_empty() {
Some(FieldTerminator::Int)
} else {
None
},
Field::Float => if name.is_empty() {
Some(FieldTerminator::Float)
} else {
None
},
Field::Text => if name.is_empty() {
Some(FieldTerminator::Text)
} else {
None
},
}
}
}
#[derive(Debug, Clone)]
pub struct Array {
inner: Box<Field>,
}
impl Array {
fn new(field: Field) -> Self {
Array {
inner: Box::new(field),
}
}
fn valid_field(&self, mut name: VecDeque<NamePart>) -> Option<FieldTerminator> {
trace!("Checking {:?} and {:?}", self, name);
match name.pop_front() {
Some(name_part) => match name_part {
NamePart::Array => self.inner.valid_field(name),
_ => None,
},
None => None,
}
}
}
#[derive(Debug, Clone)]
pub struct Map {
inner: Vec<(String, Field)>,
}
impl Map {
fn new() -> Self {
Map { inner: Vec::new() }
}
pub fn field(mut self, key: &str, value: Field) -> Self {
self.inner.push((key.to_owned(), value));
self
}
pub fn finalize(self) -> Field {
Field::Map(self)
}
fn valid_field(&self, mut name: VecDeque<NamePart>) -> Option<FieldTerminator> {
trace!("Checking {:?} and {:?}", self, name);
match name.pop_front() {
Some(name_part) => match name_part {
NamePart::Map(part_name) => self.inner
.iter()
.find(|(item, _)| *item == part_name)
.and_then(|(_, field)| field.valid_field(name)),
_ => None,
},
None => None,
}
}
}
#[derive(Clone)]
pub struct Form {
pub max_fields: u32,
pub max_field_size: u32,
pub max_files: u32,
pub max_file_size: u32,
inner: Map,
pub pool: ArcExecutor,
}
impl Form {
pub fn new() -> Self {
Form::from_executor(CpuPool::new_num_cpus())
}
pub fn max_fields(mut self, max: u32) -> Self {
self.max_fields = max;
self
}
pub fn max_field_size(mut self, max: u32) -> Self {
self.max_field_size = max;
self
}
pub fn max_files(mut self, max: u32) -> Self {
self.max_files = max;
self
}
pub fn max_file_size(mut self, max: u32) -> Self {
self.max_file_size = max;
self
}
pub fn from_executor<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + Clone + 'static,
{
Form {
max_fields: 100,
max_field_size: 10_000,
max_files: 20,
max_file_size: 10_000_000,
inner: Map::new(),
pool: ArcExecutor::new(executor),
}
}
pub fn field(mut self, name: &str, field: Field) -> Self {
self.inner = self.inner.field(name, field);
self
}
pub fn valid_field(&self, name: VecDeque<NamePart>) -> Option<FieldTerminator> {
self.inner.valid_field(name.clone())
}
}
impl fmt::Debug for Form {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Form({:?})", self.inner)
}
}
#[derive(Clone)]
pub struct ArcExecutor {
inner: Arc<Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + 'static>,
}
impl ArcExecutor {
pub fn new<E>(executor: E) -> Self
where
E: Executor<Box<Future<Item = (), Error = ()> + Send>> + Send + Sync + Clone + 'static,
{
ArcExecutor {
inner: Arc::new(executor),
}
}
}
impl Executor<Box<Future<Item = (), Error = ()> + Send>> for ArcExecutor where {
fn execute(
&self,
future: Box<Future<Item = (), Error = ()> + Send>,
) -> Result<(), ExecuteError<Box<Future<Item = (), Error = ()> + Send>>> {
self.inner.execute(future)
}
}

330
src/upload.rs Normal file
View File

@ -0,0 +1,330 @@
use std::{fs::DirBuilder, os::unix::fs::DirBuilderExt, path::{Path, PathBuf}, sync::Arc};
use actix_web::{multipart, error::PayloadError};
use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, future::{lazy, result, Either, Executor}, sync::oneshot};
use futures_fs::FsPool;
use http::header::CONTENT_DISPOSITION;
use error::Error;
use super::FilenameGenerator;
use types::{self, NamePart};
type MultipartHash = (Vec<NamePart>, MultipartContent);
#[derive(Clone, Debug, PartialEq)]
pub enum MultipartContent {
File {
filename: String,
stored_as: PathBuf,
},
Text(String),
Int(i64),
Float(f64),
}
pub type MultipartForm = Vec<MultipartHash>;
fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
name.split('[')
.map(|part| {
if part.len() == 1 && part.ends_with(']') {
NamePart::Array
} else if part.ends_with(']') {
NamePart::Map(part.trim_right_matches(']').to_owned())
} else {
NamePart::Map(part.to_owned())
}
})
.fold(Ok(vec![]), |acc, part| match acc {
Ok(mut v) => {
if v.len() == 0 && !part.is_map() {
return Err(Error::ContentDisposition);
}
v.push(part);
Ok(v)
}
Err(e) => Err(e),
})
}
pub struct ContentDisposition {
name: Option<String>,
filename: Option<String>,
}
impl ContentDisposition {
fn empty() -> Self {
ContentDisposition {
name: None,
filename: None,
}
}
}
fn parse_content_disposition<S>(field: &multipart::Field<S>) -> Result<ContentDisposition, Error>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
let content_disposition = if let Some(cd) = field.headers().get(CONTENT_DISPOSITION) {
cd
} else {
return Err(Error::ContentDisposition);
};
let content_disposition = if let Ok(cd) = content_disposition.to_str() {
cd
} else {
return Err(Error::ContentDisposition);
};
Ok(content_disposition
.split(';')
.skip(1)
.filter_map(|section| {
let mut parts = section.splitn(2, '=');
let key = if let Some(key) = parts.next() {
key.trim()
} else {
return None;
};
let val = if let Some(val) = parts.next() {
val.trim()
} else {
return None;
};
Some((key, val.trim_matches('"')))
})
.fold(ContentDisposition::empty(), |mut acc, (key, val)| {
if key == "name" {
acc.name = Some(val.to_owned());
} else if key == "filename" {
acc.filename = Some(val.to_owned());
}
acc
}))
}
fn handle_file_upload<S>(
field: multipart::Field<S>,
gen: Arc<FilenameGenerator>,
filename: Option<String>,
form: types::Form,
) -> impl Future<Item = MultipartContent, Error = Error>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
let filename = match filename {
Some(filename) => filename,
None => return Either::B(result(Err(Error::Filename))),
};
let path: &Path = filename.as_ref();
let filename = path.file_name().and_then(|filename| filename.to_str());
let filename = if let Some(filename) = filename {
filename.to_owned()
} else {
return Either::B(result(Err(Error::Filename)));
};
let stored_as = match gen.next_filename(field.content_type()) {
Some(file_path) => file_path,
None => return Either::B(result(Err(Error::GenFilename))),
};
let mut stored_dir = stored_as.clone();
stored_dir.pop();
let (tx, rx) = oneshot::channel();
match form.pool.execute(Box::new(lazy(move || {
let res = DirBuilder::new()
.recursive(true)
.mode(0o755)
.create(stored_dir.clone())
.map_err(|_| Error::MkDir);
tx.send(res).map_err(|_| ())
}))) {
Ok(_) => (),
Err(_) => return Either::B(result(Err(Error::MkDir))),
};
Either::A(rx.then(|res| match res {
Ok(res) => res,
Err(_) => Err(Error::MkDir),
}).and_then(move |_| {
let write =
FsPool::from_executor(form.pool.clone()).write(stored_as.clone(), Default::default());
field
.map_err(Error::Multipart)
.forward(write)
.map(move |_| MultipartContent::File {
filename,
stored_as,
})
}))
}
fn handle_form_data<S>(
field: multipart::Field<S>,
term: types::FieldTerminator,
) -> impl Future<Item = MultipartContent, Error = Error>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
trace!("In handle_form_data, term: {:?}", term);
let max_body_size = 80000;
field
.from_err()
.fold(BytesMut::new(), move |mut acc, bytes| {
if acc.len() + bytes.len() < max_body_size {
acc.extend(bytes);
Ok(acc)
} else {
Err(Error::FieldSize)
}
})
.and_then(|bytes| String::from_utf8(bytes.to_vec()).map_err(Error::ParseField))
.and_then(move |string| {
trace!("Matching: {:?}", string);
match term {
types::FieldTerminator::File(_) => Err(Error::FieldType),
types::FieldTerminator::Float => string
.parse::<f64>()
.map(MultipartContent::Float)
.map_err(Error::ParseFloat),
types::FieldTerminator::Int => string
.parse::<i64>()
.map(MultipartContent::Int)
.map_err(Error::ParseInt),
types::FieldTerminator::Text => Ok(MultipartContent::Text(string)),
}
})
}
fn handle_multipart_field<S>(
field: multipart::Field<S>,
form: types::Form,
) -> impl Future<Item = MultipartHash, Error = Error>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
let content_disposition = match parse_content_disposition(&field) {
Ok(cd) => cd,
Err(e) => return Either::B(result(Err(e))),
};
let name = match content_disposition.name {
Some(name) => name,
None => return Either::B(result(Err(Error::Field))),
};
let name = match parse_multipart_name(name) {
Ok(name) => name,
Err(e) => return Either::B(result(Err(e))),
};
let term = match form.valid_field(name.iter().cloned().collect()) {
Some(term) => term,
None => return Either::B(result(Err(Error::FieldType))),
};
let fut = match term {
types::FieldTerminator::File(gen) => Either::A(handle_file_upload(
field,
gen,
content_disposition.filename,
form,
)),
term => Either::B(handle_form_data(field, term)),
};
Either::A(fut.map(|content| (name, content)))
}
pub fn handle_multipart<S>(
m: multipart::Multipart<S>,
form: types::Form,
) -> Box<Stream<Item = MultipartHash, Error = Error>>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
Box::new(
m.map_err(Error::from)
.map(move |item| match item {
multipart::MultipartItem::Field(field) => {
info!("Field: {:?}", field);
Box::new(
handle_multipart_field(field, form.clone())
.map(From::from)
.into_stream(),
) as Box<Stream<Item = MultipartHash, Error = Error>>
}
multipart::MultipartItem::Nested(m) => {
info!("Nested");
Box::new(handle_multipart(m, form.clone()))
as Box<Stream<Item = MultipartHash, Error = Error>>
}
})
.flatten(),
)
}
pub fn handle_upload<S>(
m: multipart::Multipart<S>,
form: types::Form,
) -> impl Future<Item = MultipartForm, Error = Error>
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
let max_files = 10;
let max_fields = 100;
handle_multipart(m, form)
.fold(
(Vec::new(), 0, 0),
move |(mut acc, file_count, field_count), (name, content)| match content {
MultipartContent::File {
filename,
stored_as,
} => {
let file_count = file_count + 1;
if file_count < max_files {
acc.push((
name,
MultipartContent::File {
filename,
stored_as,
},
));
Ok((acc, file_count, field_count))
} else {
Err(Error::FileCount)
}
}
b @ MultipartContent::Text(_)
| b @ MultipartContent::Float(_)
| b @ MultipartContent::Int(_) => {
let field_count = field_count + 1;
if field_count < max_fields {
acc.push((name, b));
Ok((acc, file_count, field_count))
} else {
Err(Error::FieldCount)
}
}
},
)
.map(|(multipart_form, _, _)| multipart_form)
}