From 839ba65ee9007a23674bc73ace40e915a5c2be4e Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 14 Sep 2020 10:38:54 -0500 Subject: [PATCH] Stable release --- Cargo.toml | 4 ++-- examples/upload.rs | 18 ++++++++++-------- src/error.rs | 10 +--------- src/lib.rs | 3 --- src/spawn.rs | 23 ----------------------- src/upload.rs | 19 +++---------------- 6 files changed, 16 insertions(+), 61 deletions(-) delete mode 100644 src/spawn.rs diff --git a/Cargo.toml b/Cargo.toml index fb3a01f..2aad3f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,11 +19,11 @@ mime = "0.3.16" thiserror = "1.0" tokio = { version = "0.2.21", features = ["sync"] } tracing = "0.1.15" -tracing-futures = "0.2.4" [dev-dependencies] -actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs" } +async-fs = "1.2.1" anyhow = "1.0" +futures-lite = "1.4.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/examples/upload.rs b/examples/upload.rs index 691c621..89235aa 100644 --- a/examples/upload.rs +++ b/examples/upload.rs @@ -6,7 +6,7 @@ use actix_web::{ App, HttpResponse, HttpServer, ResponseError, }; use bytes::Bytes; -use futures::stream::{Stream, TryStreamExt}; +use futures::stream::{Stream, StreamExt, TryStreamExt}; use std::{ env, pin::Pin, @@ -70,15 +70,17 @@ async fn save_file( stream: Pin>>>, count: usize, ) -> Result { - let stream = stream.err_into::(); + use futures_lite::io::AsyncWriteExt; + + let mut stream = stream.err_into::(); let filename = format!("examples/filename{}.png", count); - let file = actix_fs::file::create(filename.clone()).await?; - - if let Err(e) = actix_fs::file::write_stream(file, stream).await { - actix_fs::remove_file(filename.clone()).await?; - return Err(e); + let mut file = async_fs::File::create(&filename).await?; + while let Some(res) = stream.next().await { + let bytes = res?; + file.write_all(&bytes).await?; } + file.flush().await?; Ok(filename) } @@ -86,7 +88,7 @@ async fn save_file( #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", "upload=info"); + env::set_var("RUST_LOG", "upload=debug,actix_form_data=debug"); } tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) diff --git a/src/error.rs b/src/error.rs index 7ce0ac2..0c4067b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -65,8 +65,6 @@ pub enum Error { MissingMiddleware, #[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")] TxDropped, - #[error("Panic in spawned future")] - Panic, } impl From for Error { @@ -75,12 +73,6 @@ impl From for Error { } } -impl From for Error { - fn from(_: crate::spawn::Canceled) -> Self { - Error::Panic - } -} - impl ResponseError for Error { fn status_code(&self) -> StatusCode { match *self { @@ -108,7 +100,7 @@ impl ResponseError for Error { | Error::Filename | Error::FileCount | Error::FileSize => HttpResponse::BadRequest().finish(), - Error::Panic | Error::MissingMiddleware | Error::TxDropped => { + Error::MissingMiddleware | Error::TxDropped => { HttpResponse::InternalServerError().finish() } } diff --git a/src/lib.rs b/src/lib.rs index 86e36f9..fd70480 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,6 @@ mod error; mod middleware; -mod spawn; mod types; mod upload; @@ -85,5 +84,3 @@ pub use self::{ types::{Field, FileMeta, Form, Value}, upload::handle_multipart, }; - -use self::spawn::spawn; diff --git a/src/spawn.rs b/src/spawn.rs deleted file mode 100644 index fb59db6..0000000 --- a/src/spawn.rs +++ /dev/null @@ -1,23 +0,0 @@ -use tracing::error; -use std::future::Future; -use tokio::sync::oneshot::channel; - -#[derive(Debug, thiserror::Error)] -#[error("Task panicked")] -pub(crate) struct Canceled; - -pub(crate) async fn spawn(f: F) -> Result -where - F: Future + 'static, - T: 'static, -{ - let (tx, rx) = channel(); - - actix_rt::spawn(async move { - if let Err(_) = tx.send(f.await) { - error!("rx dropped (this shouldn't happen)"); - } - }); - - rx.await.map_err(|_| Canceled) -} diff --git a/src/upload.rs b/src/upload.rs index b9b79dc..baeda8b 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -37,8 +37,7 @@ use std::{ Arc, }, }; -use tracing::{trace, Span}; -use tracing_futures::Instrument; +use tracing::trace; fn consolidate(mf: MultipartForm) -> Value { mf.into_iter().fold( @@ -212,8 +211,6 @@ async fn handle_stream_field( /// Handle multipart streams from Actix Web pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Result { - let parent = Span::current(); - let mut multipart_form = Vec::new(); let mut file_count: u32 = 0; let mut field_count: u32 = 0; @@ -226,12 +223,12 @@ pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Resu select! { opt = m.next() => { if let Some(res) = opt { - unordered.push(process_field(res?, form.clone(), &parent)); + unordered.push(handle_stream_field(res?, form.clone())); } } opt = unordered.next() => { if let Some(res) = opt { - let (name_parts, content) = res??; + let (name_parts, content) = res?; let (l, r) = count(&content, file_count, field_count, &form)?; file_count = l; @@ -247,16 +244,6 @@ pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Resu Ok(consolidate(multipart_form)) } -async fn process_field( - field: actix_multipart::Field, - form: Form, - parent: &Span, -) -> Result, crate::spawn::Canceled> { - let span = tracing::info_span!(parent: parent, "field"); - - crate::spawn(handle_stream_field(field, form).instrument(span)).await -} - fn count( content: &MultipartContent, mut file_count: u32,