Compare commits

...

11 commits

Author SHA1 Message Date
asonix 9ab99f162a Add forgejo actions
All checks were successful
/ check (aarch64-unknown-linux-musl) (push) Successful in 34s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 32s
/ check (x86_64-unknown-linux-musl) (push) Successful in 25s
/ clippy (push) Successful in 29s
/ tests (push) Successful in 36s
/ build (aarch64-unknown-linux-musl) (push) Successful in 37s
/ build (armv7-unknown-linux-musleabihf) (push) Successful in 37s
/ build (x86_64-unknown-linux-musl) (push) Successful in 24s
/ publish-forgejo (push) Successful in 8s
/ publish-crate (push) Successful in 27s
2024-03-27 19:04:02 -05:00
asonix 298843c31c Bump version 2024-03-27 19:03:45 -05:00
asonix 1c88a70021 Make form construction fallible 2024-03-27 16:16:06 -05:00
asonix 5f38d493c6 Bump version 2023-12-22 13:55:21 -06:00
asonix 0d3fd3b19e Propagate span tree into field handler 2023-12-22 13:40:57 -06:00
asonix 05161821b6 Update version, remove dependency on actix-rt 2023-12-10 18:20:06 -06:00
asonix 0f7614ec3b Simplify stream implementations with streem 2023-12-10 18:16:25 -06:00
asonix 8b422644fb Add streem, update dependencies 2023-12-10 18:16:10 -06:00
asonix a91c503378 Update flake 2023-12-10 18:15:42 -06:00
asonix 9e0e425e14 Bump version 2023-07-18 18:04:41 -05:00
asonix fb46235530 Fix off-by-one in file & field count 2023-07-18 18:03:51 -05:00
10 changed files with 246 additions and 138 deletions

View file

@ -0,0 +1,55 @@
on:
push:
branches:
- '*'
pull_request:
branches:
- main
jobs:
clippy:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Clippy
run: |
cargo clippy --no-default-features -- -D warnings
tests:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Test
run: cargo test
check:
strategy:
fail-fast: false
matrix:
target:
- x86_64-unknown-linux-musl
- armv7-unknown-linux-musleabihf
- aarch64-unknown-linux-musl
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Debug builds
run: cargo zigbuild --target ${{ matrix.target }}

View file

@ -0,0 +1,78 @@
on:
push:
tags:
- 'v*.*.*'
jobs:
clippy:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Clippy
run: |
cargo clippy --no-default-features -- -D warnings
tests:
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Test
run: cargo test
build:
needs:
- clippy
- tests
runs-on: base-image
strategy:
fail-fast: false
matrix:
target:
- x86_64-unknown-linux-musl
- armv7-unknown-linux-musleabihf
- aarch64-unknown-linux-musl
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Compile actix-form-data
run: cargo zigbuild --target ${{ matrix.target }}
publish-forgejo:
needs: [build]
runs-on: base-image
steps:
- uses: actions/forgejo-release@v1
with:
direction: upload
token: ${{ secrets.GITHUB_TOKEN }}
publish-crate:
needs: [build]
runs-on: base-image
steps:
-
name: Checkout actix-form-data
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Publish Crate
run: cargo publish --token ${{ secrets.CRATES_IO_TOKEN }}

View file

@ -1,7 +1,7 @@
[package]
name = "actix-form-data"
description = "Multipart Form Data for Actix Web"
version = "0.7.0-beta.3"
version = "0.7.0-beta.7"
license = "GPL-3.0"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/actix-form-data.git"
@ -11,18 +11,19 @@ edition = "2021"
[dependencies]
actix-multipart = { version = "0.6.0", default-features = false }
actix-rt = "2.5.0"
actix-web = { version = "4.0.0", default-features = false }
futures-core = "0.3.28"
mime = "0.3.16"
streem = "0.2.0"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
tracing = "0.1.15"
[dev-dependencies]
async-fs = "1.2.1"
actix-rt = "2.5.0"
async-fs = "2.1.0"
anyhow = "1.0"
futures-lite = "1.4.0"
futures-lite = "2.1.0"
futures-util = "0.3.17"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View file

@ -11,8 +11,8 @@ impl FormData for UploadedContent {
type Item = ();
type Error = Error;
fn form(_: &HttpRequest) -> Form<Self::Item, Self::Error> {
Form::new()
fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
Ok(Form::new()
.field("Hey", Field::text())
.field(
"Hi",
@ -29,7 +29,7 @@ impl FormData for UploadedContent {
}
Ok(()) as Result<(), Error>
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>

View file

@ -61,11 +61,11 @@ impl FormData for UploadedContent {
type Item = PathBuf;
type Error = Errors;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let file_count = req.app_data::<Arc<AtomicUsize>>().expect("Set config");
let file_count = Arc::clone(file_count);
Form::new()
Ok(Form::new()
.field("Hey", Field::text())
.field(
"Hi",
@ -85,7 +85,7 @@ impl FormData for UploadedContent {
.map_err(Errors::from)
}
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1685518550,
"narHash": "sha256-o2d0KcvaXzTrPRIo0kOLV0/QXHhDQ5DTi+OxcjO8xqY=",
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "a1720a10a6cfe8234c0e93907ffe81be440f4cef",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1685564631,
"narHash": "sha256-8ywr3AkblY4++3lIVxmrWZFzac7+f32ZEhH/A8pNscI=",
"lastModified": 1702151865,
"narHash": "sha256-9VAt19t6yQa7pHZLDbil/QctAgVsA66DLnzdRGqDisg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4f53efe34b3a8877ac923b9350c874e3dcd5dc0a",
"rev": "666fc80e7b2afb570462423cb0e1cf1a3a34fedd",
"type": "github"
},
"original": {

View file

@ -19,7 +19,7 @@
use std::{
num::{ParseFloatError, ParseIntError},
string::FromUtf8Error,
str::Utf8Error,
};
use actix_web::{
@ -35,7 +35,7 @@ pub enum Error {
#[error("Error in multipart creation")]
Multipart(#[from] MultipartError),
#[error("Failed to parse field")]
ParseField(#[from] FromUtf8Error),
ParseField(#[from] Utf8Error),
#[error("Failed to parse int")]
ParseInt(#[from] ParseIntError),
#[error("Failed to parse float")]

View file

@ -9,7 +9,7 @@ pub trait FormData {
type Item: 'static;
type Error: ResponseError + 'static;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error>;
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error>;
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
where
@ -27,9 +27,11 @@ where
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
let multipart = actix_multipart::Multipart::new(req.headers(), payload.take());
let form = Rc::new(T::form(req));
let res = T::form(req);
Box::pin(async move {
let form = Rc::new(res?);
let uploaded = match handle_multipart(multipart, Rc::clone(&form)).await {
Ok(Ok(uploaded)) => uploaded,
Ok(Err(e)) => return Err(e.into()),

View file

@ -38,8 +38,8 @@
//! type Item = ();
//! type Error = Error;
//!
//! fn form(_: &HttpRequest) -> Form<Self::Item, Self::Error> {
//! Form::new()
//! fn form(_: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
//! Ok(Form::new()
//! .field("Hey", Field::text())
//! .field(
//! "Hi",
@ -56,7 +56,7 @@
//! }
//! Ok(()) as Result<(), Error>
//! })),
//! )
//! ))
//! }
//!
//! fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>

View file

@ -25,43 +25,10 @@ use crate::{
},
};
use actix_web::web::BytesMut;
use std::{collections::HashMap, future::poll_fn, path::Path, pin::Pin, rc::Rc};
use tokio::{sync::mpsc::Receiver, task::JoinSet};
use tracing::trace;
struct Streamer<S>(S, bool);
impl<S> Streamer<S>
where
S: futures_core::Stream + Unpin,
{
async fn next(&mut self) -> Option<S::Item> {
if self.1 {
return None;
}
let opt = poll_fn(|cx| Pin::new(&mut self.0).poll_next(cx)).await;
self.1 = opt.is_none();
opt
}
fn is_closed(&self) -> bool {
self.1
}
}
struct ReceiverStream<T>(Receiver<T>);
impl<T> futures_core::Stream for ReceiverStream<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_recv(cx)
}
}
use std::{collections::HashMap, path::Path, rc::Rc};
use streem::IntoStreamer;
use tokio::task::JoinSet;
use tracing::Instrument;
fn consolidate<T>(mf: MultipartForm<T>) -> Value<T> {
mf.into_iter().fold(
@ -100,16 +67,13 @@ fn parse_multipart_name(name: String) -> Result<Vec<NamePart>, Error> {
NamePart::Map(part.to_owned())
}
})
.fold(Ok(vec![]), |acc, part| match acc {
Ok(mut v) => {
if v.is_empty() && !part.is_map() {
return Err(Error::ContentDisposition);
}
v.push(part);
Ok(v)
.try_fold(vec![], |mut v, part| {
if v.is_empty() && !part.is_map() {
return Err(Error::ContentDisposition);
}
Err(e) => Err(e),
v.push(part);
Ok(v)
})
}
@ -143,42 +107,40 @@ where
let max_file_size = form.max_file_size;
let (tx, rx) = tokio::sync::mpsc::channel(8);
let consume_fut = async move {
let field_stream = streem::try_from_fn(move |yielder| async move {
let mut file_size = 0;
let mut exceeded_size = false;
let mut stream = Streamer(field, false);
let mut stream = field.into_streamer();
while let Some(res) = stream.next().await {
while let Some(bytes) = stream.try_next().await? {
tracing::trace!("Bytes from field");
if exceeded_size {
tracing::trace!("Dropping oversized bytes");
continue;
file_size += bytes.len();
if file_size > max_file_size {
drop(bytes);
while stream.try_next().await?.is_some() {
tracing::trace!("Dropping oversized bytes");
}
return Err(Error::FileSize);
}
if let Ok(bytes) = &res {
file_size += bytes.len();
if file_size > max_file_size {
exceeded_size = true;
let _ = tx.send(Err(Error::FileSize)).await;
}
};
let _ = tx.send(res.map_err(Error::from)).await;
yielder.yield_ok(bytes).await;
}
drop(tx);
tracing::debug!("Finished consuming field");
};
let stream = ReceiverStream(rx);
Ok(())
});
let result_fut = file_fn(filename.clone(), content_type.clone(), Box::pin(stream));
let (_, result) = tokio::join!(consume_fut, result_fut);
let result = file_fn(
filename.clone(),
content_type.clone(),
Box::pin(field_stream),
)
.await;
match result {
Ok(result) => Ok(Ok(MultipartContent::File(FileMeta {
@ -199,59 +161,54 @@ where
T: 'static,
E: 'static,
{
trace!("In handle_form_data, term: {:?}", term);
let mut bytes = BytesMut::new();
tracing::trace!("In handle_form_data, term: {:?}", term);
let mut buf = Vec::new();
let mut exceeded_size = false;
let mut error = None;
let mut stream = field.into_streamer();
let mut stream = Streamer(field, false);
while let Some(res) = stream.next().await {
while let Some(bytes) = stream.try_next().await? {
tracing::trace!("bytes from field");
if exceeded_size {
tracing::trace!("Dropping oversized bytes");
continue;
}
if buf.len() + bytes.len() > form.max_field_size {
drop(buf);
if error.is_some() {
tracing::trace!("Draining field while error exists");
continue;
}
let b = match res {
Ok(bytes) => bytes,
Err(e) if error.is_none() => {
error = Some(e);
continue;
while stream.try_next().await?.is_some() {
tracing::trace!("Dropping oversized bytes");
}
Err(_) => continue,
};
if bytes.len() + b.len() > form.max_field_size {
exceeded_size = true;
continue;
return Err(Error::FieldSize);
}
bytes.extend(b);
buf.push(bytes);
}
let bytes = match buf.len() {
0 => return Err(Error::FieldSize),
1 => buf.pop().expect("contains an element"),
_ => {
let total_length = buf.iter().map(|b| b.len()).sum();
let mut bytes = BytesMut::with_capacity(total_length);
for b in buf {
bytes.extend(b);
}
bytes.freeze()
}
};
tracing::debug!("Finished consuming field");
if let Some(error) = error {
return Err(error.into());
}
if let FieldTerminator::Bytes = term {
return Ok(MultipartContent::Bytes(bytes.freeze()));
return Ok(MultipartContent::Bytes(bytes));
}
let s = String::from_utf8(bytes.to_vec()).map_err(Error::ParseField)?;
let s = std::str::from_utf8(&bytes).map_err(Error::ParseField)?;
match term {
FieldTerminator::Bytes | FieldTerminator::File(_) => Err(Error::FieldType),
FieldTerminator::Text => Ok(MultipartContent::Text(s)),
FieldTerminator::Text => Ok(MultipartContent::Text(String::from(s))),
FieldTerminator::Float => s
.parse()
.map_err(Error::ParseFloat)
@ -294,6 +251,7 @@ where
}
/// Handle multipart streams from Actix Web
#[tracing::instrument(level = "TRACE", skip_all)]
pub async fn handle_multipart<T, E>(
m: actix_multipart::Multipart,
form: Rc<Form<T, E>>,
@ -308,25 +266,35 @@ where
let mut set = JoinSet::new();
let mut m = Streamer(m, false);
let mut m = m.into_streamer();
let mut error: Option<Error> = None;
let mut provided_error: Option<E> = None;
let mut is_closed = false;
let mut stream_error = false;
'outer: loop {
tracing::trace!("multipart loop");
if error.is_some() || provided_error.is_some() {
while let Some(res) = m.next().await {
if let Ok(field) = res {
let mut stream = Streamer(field, false);
while let Some(_res) = stream.next().await {
tracing::trace!("Throwing away uploaded bytes, we have an error");
set.abort_all();
if !stream_error {
while let Some(res) = m.next().await {
tracing::trace!("draining multipart field");
if let Ok(field) = res {
let mut stream = field.into_streamer();
while stream.next().await.is_some() {
tracing::trace!("Throwing away uploaded bytes, we have an error");
}
} else {
break;
}
}
}
while let Some(_) = set.join_next().await {
while set.join_next().await.is_some() {
tracing::trace!("Throwing away joined result");
}
@ -334,14 +302,18 @@ where
}
tokio::select! {
opt = m.next(), if !m.is_closed() => {
opt = m.next(), if !is_closed => {
tracing::trace!("Selected stream");
is_closed = opt.is_none();
if let Some(res) = opt {
match res {
Ok(field) => {
set.spawn_local(handle_stream_field(field, Rc::clone(&form)));
set.spawn_local(handle_stream_field(field, Rc::clone(&form)).instrument(tracing::trace_span!("multipart-field")));
},
Err(e) => {
is_closed = true;
stream_error = true;
error = Some(e.into());
continue 'outer;
}
@ -409,13 +381,13 @@ fn count<T, E>(
match content {
MultipartContent::File(_) => {
file_count += 1;
if file_count >= form.max_files {
if file_count > form.max_files {
return Err(Error::FileCount);
}
}
_ => {
field_count += 1;
if field_count >= form.max_fields {
if field_count > form.max_fields {
return Err(Error::FieldCount);
}
}