Remove a few boxes

This commit is contained in:
Aode (Lion) 2021-10-20 18:58:32 -05:00
parent 922200673e
commit e7b4e4d1cc
5 changed files with 125 additions and 37 deletions

View file

@ -128,6 +128,9 @@ pub(crate) enum UploadError {
#[error("Command failed")] #[error("Command failed")]
Status, Status,
#[error(transparent)]
Limit(#[from] super::LimitError),
} }
impl From<awc::error::SendRequestError> for UploadError { impl From<awc::error::SendRequestError> for UploadError {
@ -152,6 +155,7 @@ impl ResponseError for Error {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match self.kind { match self.kind {
UploadError::DuplicateAlias UploadError::DuplicateAlias
| UploadError::Limit(_)
| UploadError::NoFiles | UploadError::NoFiles
| UploadError::Upload(_) | UploadError::Upload(_)
| UploadError::ParseReq(_) => StatusCode::BAD_REQUEST, | UploadError::ParseReq(_) => StatusCode::BAD_REQUEST,

View file

@ -11,12 +11,18 @@ pub(crate) use io_uring::File;
pub(crate) use tokio_file::File; pub(crate) use tokio_file::File;
pin_project_lite::pin_project! { pin_project_lite::pin_project! {
struct CrateError<S> { pub(super) struct CrateError<S> {
#[pin] #[pin]
inner: S inner: S
} }
} }
impl<S> CrateError<S> {
pub(super) fn new(inner: S) -> Self {
CrateError { inner }
}
}
impl<T, E, S> Stream for CrateError<S> impl<T, E, S> Stream for CrateError<S>
where where
S: Stream<Item = Result<T, E>>, S: Stream<Item = Result<T, E>>,
@ -112,11 +118,10 @@ mod tokio_file {
(None, None) => Either::right(self.inner), (None, None) => Either::right(self.inner),
}; };
Ok(super::CrateError { Ok(super::CrateError::new(BytesFreezer::new(FramedRead::new(
inner: BytesFreezer { obj,
inner: FramedRead::new(obj, BytesCodec::new()), BytesCodec::new(),
}, ))))
})
} }
} }
@ -127,6 +132,12 @@ mod tokio_file {
} }
} }
impl<S> BytesFreezer<S> {
fn new(inner: S) -> Self {
BytesFreezer { inner }
}
}
impl<S, E> Stream for BytesFreezer<S> impl<S, E> Stream for BytesFreezer<S>
where where
S: Stream<Item = Result<BytesMut, E>> + Unpin, S: Stream<Item = Result<BytesMut, E>> + Unpin,

View file

@ -57,6 +57,7 @@ use self::{
config::{Config, Format}, config::{Config, Format},
either::Either, either::Either,
error::{Error, UploadError}, error::{Error, UploadError},
file::CrateError,
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
upload_manager::{Details, UploadManager, UploadManagerSession}, upload_manager::{Details, UploadManager, UploadManagerSession},
validate::{image_webp, video_mp4}, validate::{image_webp, video_mp4},
@ -340,6 +341,59 @@ struct UrlQuery {
url: String, url: String,
} }
pin_project_lite::pin_project! {
struct Limit<S> {
#[pin]
inner: S,
count: u64,
limit: u64,
}
}
impl<S> Limit<S> {
fn new(inner: S, limit: u64) -> Self {
Limit {
inner,
count: 0,
limit,
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("Resonse body larger than size limit")]
struct LimitError;
impl<S, E> Stream for Limit<S>
where
S: Stream<Item = Result<web::Bytes, E>>,
E: From<LimitError>,
{
type Item = Result<web::Bytes, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
let limit = this.limit;
let count = this.count;
let inner = this.inner;
inner.poll_next(cx).map(|opt| {
opt.map(|res| match res {
Ok(bytes) => {
*count += bytes.len() as u64;
if *count > *limit {
return Err(LimitError.into());
}
Ok(bytes)
}
Err(e) => Err(e),
})
})
}
}
/// download an image from a URL /// download an image from a URL
#[instrument(name = "Downloading file", skip(client, manager))] #[instrument(name = "Downloading file", skip(client, manager))]
async fn download( async fn download(
@ -347,15 +401,19 @@ async fn download(
manager: web::Data<UploadManager>, manager: web::Data<UploadManager>,
query: web::Query<UrlQuery>, query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let mut res = client.get(&query.url).propagate().send().await?; let res = client.get(&query.url).propagate().send().await?;
if !res.status().is_success() { if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into()); return Err(UploadError::Download(res.status()).into());
} }
let fut = res.body().limit(CONFIG.max_file_size() * MEGABYTES); let mut stream = Limit::new(
CrateError::new(res),
(CONFIG.max_file_size() * MEGABYTES) as u64,
);
let stream = Box::pin(once(fut)); // SAFETY: stream is shadowed, so original cannot not be moved
let stream = unsafe { Pin::new_unchecked(&mut stream) };
let permit = PROCESS_SEMAPHORE.acquire().await?; let permit = PROCESS_SEMAPHORE.acquire().await?;
let session = manager.session().upload(stream).await?; let session = manager.session().upload(stream).await?;
@ -743,7 +801,7 @@ async fn ranged_file_resp(
Ok(srv_response( Ok(srv_response(
builder, builder,
Box::pin(stream), stream,
details.content_type(), details.content_type(),
7 * DAYS, 7 * DAYS,
details.system_time(), details.system_time(),
@ -759,7 +817,7 @@ fn srv_response<S, E>(
modified: SystemTime, modified: SystemTime,
) -> HttpResponse ) -> HttpResponse
where where
S: Stream<Item = Result<web::Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<web::Bytes, E>> + 'static,
E: std::error::Error + 'static, E: std::error::Error + 'static,
actix_web::Error: From<E>, actix_web::Error: From<E>,
{ {
@ -772,7 +830,8 @@ where
])) ]))
.insert_header((ACCEPT_RANGES, "bytes")) .insert_header((ACCEPT_RANGES, "bytes"))
.content_type(ext.to_string()) .content_type(ext.to_string())
.streaming(stream) // TODO: remove pin when actix-web drops Unpin requirement
.streaming(Box::pin(stream))
} }
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]

View file

@ -22,12 +22,19 @@ pub(crate) struct Process {
span: Span, span: Span,
} }
pub(crate) struct ProcessRead<I> { struct DropHandle {
inner: I, inner: JoinHandle<()>,
span: Span, }
err_recv: Receiver<std::io::Error>,
err_closed: bool, pin_project_lite::pin_project! {
handle: JoinHandle<()>, struct ProcessRead<I> {
#[pin]
inner: I,
span: Span,
err_recv: Receiver<std::io::Error>,
err_closed: bool,
handle: DropHandle,
}
} }
impl Process { impl Process {
@ -86,13 +93,13 @@ impl Process {
.instrument(span), .instrument(span),
); );
Some(Box::pin(ProcessRead { Some(ProcessRead {
inner: stdout, inner: stdout,
span: self.span, span: self.span,
err_recv: rx, err_recv: rx,
err_closed: false, err_closed: false,
handle, handle: DropHandle { inner: handle },
})) })
} }
pub(crate) fn file_read( pub(crate) fn file_read(
@ -129,30 +136,36 @@ impl Process {
.instrument(span), .instrument(span),
); );
Some(Box::pin(ProcessRead { Some(ProcessRead {
inner: stdout, inner: stdout,
span: self.span, span: self.span,
err_recv: rx, err_recv: rx,
err_closed: false, err_closed: false,
handle, handle: DropHandle { inner: handle },
})) })
} }
} }
impl<I> AsyncRead for ProcessRead<I> impl<I> AsyncRead for ProcessRead<I>
where where
I: AsyncRead + Unpin, I: AsyncRead,
{ {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
let span = self.span.clone(); let this = self.as_mut().project();
let span = this.span;
let err_recv = this.err_recv;
let err_closed = this.err_closed;
let inner = this.inner;
span.in_scope(|| { span.in_scope(|| {
if !self.err_closed { if !*err_closed {
if let Poll::Ready(res) = Pin::new(&mut self.err_recv).poll(cx) { if let Poll::Ready(res) = Pin::new(err_recv).poll(cx) {
self.err_closed = true; *err_closed = true;
if let Ok(err) = res { if let Ok(err) = res {
let display = format!("{}", err); let display = format!("{}", err);
let debug = format!("{:?}", err); let debug = format!("{:?}", err);
@ -163,7 +176,7 @@ where
} }
} }
if let Poll::Ready(res) = Pin::new(&mut self.inner).poll_read(cx, buf) { if let Poll::Ready(res) = inner.poll_read(cx, buf) {
if let Err(err) = &res { if let Err(err) = &res {
let display = format!("{}", err); let display = format!("{}", err);
let debug = format!("{:?}", err); let debug = format!("{:?}", err);
@ -178,9 +191,9 @@ where
} }
} }
impl<I> Drop for ProcessRead<I> { impl Drop for DropHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.handle.abort(); self.inner.abort();
} }
} }

View file

@ -9,15 +9,13 @@ use crate::{
}, },
}; };
use actix_web::web; use actix_web::web;
use futures_util::stream::{LocalBoxStream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
use std::path::PathBuf; use std::path::PathBuf;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tracing::{debug, instrument, warn, Span}; use tracing::{debug, instrument, warn, Span};
use tracing_futures::Instrument; use tracing_futures::Instrument;
use uuid::Uuid; use uuid::Uuid;
type UploadStream<E> = LocalBoxStream<'static, Result<web::Bytes, E>>;
pub(crate) struct UploadManagerSession { pub(crate) struct UploadManagerSession {
manager: UploadManager, manager: UploadManager,
alias: Option<String>, alias: Option<String>,
@ -136,7 +134,7 @@ impl UploadManagerSession {
alias: String, alias: String,
content_type: mime::Mime, content_type: mime::Mime,
validate: bool, validate: bool,
mut stream: UploadStream<E>, mut stream: impl Stream<Item = Result<web::Bytes, E>> + Unpin,
) -> Result<Self, Error> ) -> Result<Self, Error>
where where
Error: From<E>, Error: From<E>,
@ -177,7 +175,10 @@ impl UploadManagerSession {
/// Upload the file, discarding bytes if it's already present, or saving if it's new /// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))] #[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(mut self, mut stream: UploadStream<E>) -> Result<Self, Error> pub(crate) async fn upload<E>(
mut self,
mut stream: impl Stream<Item = Result<web::Bytes, E>> + Unpin,
) -> Result<Self, Error>
where where
Error: From<E>, Error: From<E>,
{ {