Stable release
This commit is contained in:
parent
b91365b7bd
commit
839ba65ee9
|
@ -19,11 +19,11 @@ mime = "0.3.16"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "0.2.21", features = ["sync"] }
|
tokio = { version = "0.2.21", features = ["sync"] }
|
||||||
tracing = "0.1.15"
|
tracing = "0.1.15"
|
||||||
tracing-futures = "0.2.4"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs" }
|
async-fs = "1.2.1"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
|
futures-lite = "1.4.0"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
|
|
@ -6,7 +6,7 @@ use actix_web::{
|
||||||
App, HttpResponse, HttpServer, ResponseError,
|
App, HttpResponse, HttpServer, ResponseError,
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::stream::{Stream, TryStreamExt};
|
use futures::stream::{Stream, StreamExt, TryStreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
env,
|
env,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
@ -70,15 +70,17 @@ async fn save_file(
|
||||||
stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
stream: Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>,
|
||||||
count: usize,
|
count: usize,
|
||||||
) -> Result<String, JsonError> {
|
) -> Result<String, JsonError> {
|
||||||
let stream = stream.err_into::<JsonError>();
|
use futures_lite::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let mut stream = stream.err_into::<JsonError>();
|
||||||
let filename = format!("examples/filename{}.png", count);
|
let filename = format!("examples/filename{}.png", count);
|
||||||
|
|
||||||
let file = actix_fs::file::create(filename.clone()).await?;
|
let mut file = async_fs::File::create(&filename).await?;
|
||||||
|
while let Some(res) = stream.next().await {
|
||||||
if let Err(e) = actix_fs::file::write_stream(file, stream).await {
|
let bytes = res?;
|
||||||
actix_fs::remove_file(filename.clone()).await?;
|
file.write_all(&bytes).await?;
|
||||||
return Err(e);
|
|
||||||
}
|
}
|
||||||
|
file.flush().await?;
|
||||||
|
|
||||||
Ok(filename)
|
Ok(filename)
|
||||||
}
|
}
|
||||||
|
@ -86,7 +88,7 @@ async fn save_file(
|
||||||
#[actix_rt::main]
|
#[actix_rt::main]
|
||||||
async fn main() -> Result<(), anyhow::Error> {
|
async fn main() -> Result<(), anyhow::Error> {
|
||||||
if env::var("RUST_LOG").is_err() {
|
if env::var("RUST_LOG").is_err() {
|
||||||
env::set_var("RUST_LOG", "upload=info");
|
env::set_var("RUST_LOG", "upload=debug,actix_form_data=debug");
|
||||||
}
|
}
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
|
10
src/error.rs
10
src/error.rs
|
@ -65,8 +65,6 @@ pub enum Error {
|
||||||
MissingMiddleware,
|
MissingMiddleware,
|
||||||
#[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
|
#[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
|
||||||
TxDropped,
|
TxDropped,
|
||||||
#[error("Panic in spawned future")]
|
|
||||||
Panic,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<MultipartError> for Error {
|
impl From<MultipartError> for Error {
|
||||||
|
@ -75,12 +73,6 @@ impl From<MultipartError> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<crate::spawn::Canceled> for Error {
|
|
||||||
fn from(_: crate::spawn::Canceled) -> Self {
|
|
||||||
Error::Panic
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ResponseError for Error {
|
impl ResponseError for Error {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
match *self {
|
match *self {
|
||||||
|
@ -108,7 +100,7 @@ impl ResponseError for Error {
|
||||||
| Error::Filename
|
| Error::Filename
|
||||||
| Error::FileCount
|
| Error::FileCount
|
||||||
| Error::FileSize => HttpResponse::BadRequest().finish(),
|
| Error::FileSize => HttpResponse::BadRequest().finish(),
|
||||||
Error::Panic | Error::MissingMiddleware | Error::TxDropped => {
|
Error::MissingMiddleware | Error::TxDropped => {
|
||||||
HttpResponse::InternalServerError().finish()
|
HttpResponse::InternalServerError().finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,6 @@
|
||||||
|
|
||||||
mod error;
|
mod error;
|
||||||
mod middleware;
|
mod middleware;
|
||||||
mod spawn;
|
|
||||||
mod types;
|
mod types;
|
||||||
mod upload;
|
mod upload;
|
||||||
|
|
||||||
|
@ -85,5 +84,3 @@ pub use self::{
|
||||||
types::{Field, FileMeta, Form, Value},
|
types::{Field, FileMeta, Form, Value},
|
||||||
upload::handle_multipart,
|
upload::handle_multipart,
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::spawn::spawn;
|
|
||||||
|
|
23
src/spawn.rs
23
src/spawn.rs
|
@ -1,23 +0,0 @@
|
||||||
use tracing::error;
|
|
||||||
use std::future::Future;
|
|
||||||
use tokio::sync::oneshot::channel;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
|
||||||
#[error("Task panicked")]
|
|
||||||
pub(crate) struct Canceled;
|
|
||||||
|
|
||||||
pub(crate) async fn spawn<F, T>(f: F) -> Result<T, Canceled>
|
|
||||||
where
|
|
||||||
F: Future<Output = T> + 'static,
|
|
||||||
T: 'static,
|
|
||||||
{
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
if let Err(_) = tx.send(f.await) {
|
|
||||||
error!("rx dropped (this shouldn't happen)");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
rx.await.map_err(|_| Canceled)
|
|
||||||
}
|
|
|
@ -37,8 +37,7 @@ use std::{
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use tracing::{trace, Span};
|
use tracing::trace;
|
||||||
use tracing_futures::Instrument;
|
|
||||||
|
|
||||||
fn consolidate(mf: MultipartForm) -> Value {
|
fn consolidate(mf: MultipartForm) -> Value {
|
||||||
mf.into_iter().fold(
|
mf.into_iter().fold(
|
||||||
|
@ -212,8 +211,6 @@ async fn handle_stream_field(
|
||||||
|
|
||||||
/// Handle multipart streams from Actix Web
|
/// Handle multipart streams from Actix Web
|
||||||
pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Result<Value, Error> {
|
pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Result<Value, Error> {
|
||||||
let parent = Span::current();
|
|
||||||
|
|
||||||
let mut multipart_form = Vec::new();
|
let mut multipart_form = Vec::new();
|
||||||
let mut file_count: u32 = 0;
|
let mut file_count: u32 = 0;
|
||||||
let mut field_count: u32 = 0;
|
let mut field_count: u32 = 0;
|
||||||
|
@ -226,12 +223,12 @@ pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Resu
|
||||||
select! {
|
select! {
|
||||||
opt = m.next() => {
|
opt = m.next() => {
|
||||||
if let Some(res) = opt {
|
if let Some(res) = opt {
|
||||||
unordered.push(process_field(res?, form.clone(), &parent));
|
unordered.push(handle_stream_field(res?, form.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
opt = unordered.next() => {
|
opt = unordered.next() => {
|
||||||
if let Some(res) = opt {
|
if let Some(res) = opt {
|
||||||
let (name_parts, content) = res??;
|
let (name_parts, content) = res?;
|
||||||
|
|
||||||
let (l, r) = count(&content, file_count, field_count, &form)?;
|
let (l, r) = count(&content, file_count, field_count, &form)?;
|
||||||
file_count = l;
|
file_count = l;
|
||||||
|
@ -247,16 +244,6 @@ pub async fn handle_multipart(m: actix_multipart::Multipart, form: Form) -> Resu
|
||||||
Ok(consolidate(multipart_form))
|
Ok(consolidate(multipart_form))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_field(
|
|
||||||
field: actix_multipart::Field,
|
|
||||||
form: Form,
|
|
||||||
parent: &Span,
|
|
||||||
) -> Result<Result<MultipartHash, Error>, crate::spawn::Canceled> {
|
|
||||||
let span = tracing::info_span!(parent: parent, "field");
|
|
||||||
|
|
||||||
crate::spawn(handle_stream_field(field, form).instrument(span)).await
|
|
||||||
}
|
|
||||||
|
|
||||||
fn count(
|
fn count(
|
||||||
content: &MultipartContent,
|
content: &MultipartContent,
|
||||||
mut file_count: u32,
|
mut file_count: u32,
|
||||||
|
|
Loading…
Reference in a new issue