Reduce required Unpin bounds with pin-project-lite

This commit is contained in:
Aode (Lion) 2021-10-18 18:02:33 -05:00
parent 4c5482b3a8
commit 2fb8d3e39a
6 changed files with 207 additions and 121 deletions

42
Cargo.lock generated
View file

@ -77,9 +77,9 @@ dependencies = [
[[package]]
name = "actix-macros"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2f86cd6857c135e6e9fe57b1619a88d1f94a7df34c00e11fe13e64fd3438837"
checksum = "c8c999eaf5f8414142b5bb60e43dcb27ccd12c2c15e4133e470f3c5060472532"
dependencies = [
"quote",
"syn",
@ -240,9 +240,9 @@ dependencies = [
[[package]]
name = "ahash"
version = "0.7.5"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "991984e3fd003e7ba02eb724f87a0f997b78677c46c0e91f8424ad7394c9886a"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
@ -560,9 +560,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.28"
version = "0.8.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065"
checksum = "a74ea89a0a1b98f6332de42c95baff457ada66d1cb4030f9ff151b2041a1c746"
dependencies = [
"cfg-if",
]
@ -865,9 +865,9 @@ dependencies = [
[[package]]
name = "instant"
version = "0.1.11"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
@ -920,9 +920,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.103"
version = "0.2.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
[[package]]
name = "local-channel"
@ -998,9 +998,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mio"
version = "0.7.13"
version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16"
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
dependencies = [
"libc",
"log",
@ -1184,6 +1184,7 @@ dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry-otlp",
"pin-project-lite",
"rand",
"serde",
"serde_json",
@ -1240,9 +1241,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741"
[[package]]
name = "proc-macro-error"
@ -1753,9 +1754,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "structopt"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf9d950ef167e25e0bdb073cf1d68e9ad2795ac826f2f3f59647817cf23c0bfa"
checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c"
dependencies = [
"clap",
"lazy_static",
@ -1764,9 +1765,9 @@ dependencies = [
[[package]]
name = "structopt-derive"
version = "0.4.16"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134d838a2c9943ac3125cf6df165eda53493451b719f3255b2a26b85f772d0ba"
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [
"heck",
"proc-macro-error",
@ -2036,14 +2037,15 @@ dependencies = [
[[package]]
name = "tower"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60422bc7fefa2f3ec70359b8ff1caff59d785877eb70595904605bcc412470f"
checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand",
"slab",
"tokio",

View file

@ -28,6 +28,7 @@ num_cpus = "1.13"
once_cell = "1.4.0"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = "0.9"
pin-project-lite = "0.2.7"
rand = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View file

@ -5,39 +5,63 @@ use std::{
};
use tokio::io::{AsyncRead, ReadBuf};
pub(crate) enum Either<Left, Right> {
Left(Left),
Right(Right),
pin_project_lite::pin_project! {
#[project = EitherProj]
#[project_replace = EitherProjReplace]
pub(crate) enum Either<Left, Right> {
Left {
#[pin]
left: Left,
},
Right {
#[pin]
right: Right,
},
}
}
impl<L, R> Either<L, R> {
pub(crate) fn left(left: L) -> Self {
Either::Left { left }
}
pub(crate) fn right(right: R) -> Self {
Either::Right { right }
}
}
impl<Left, Right> AsyncRead for Either<Left, Right>
where
Left: AsyncRead + Unpin,
Right: AsyncRead + Unpin,
Left: AsyncRead,
Right: AsyncRead,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match *self {
Self::Left(ref mut left) => Pin::new(left).poll_read(cx, buf),
Self::Right(ref mut right) => Pin::new(right).poll_read(cx, buf),
let this = self.as_mut().project();
match this {
EitherProj::Left { left } => left.poll_read(cx, buf),
EitherProj::Right { right } => right.poll_read(cx, buf),
}
}
}
impl<Left, Right> Stream for Either<Left, Right>
where
Left: Stream<Item = <Right as Stream>::Item> + Unpin,
Right: Stream + Unpin,
Left: Stream<Item = <Right as Stream>::Item>,
Right: Stream,
{
type Item = <Left as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
Self::Left(ref mut left) => Pin::new(left).poll_next(cx),
Self::Right(ref mut right) => Pin::new(right).poll_next(cx),
let this = self.as_mut().project();
match this {
EitherProj::Left { left } => left.poll_next(cx),
EitherProj::Right { right } => right.poll_next(cx),
}
}
}

View file

@ -10,17 +10,24 @@ pub(crate) use io_uring::File;
#[cfg(not(feature = "io-uring"))]
pub(crate) use tokio_file::File;
struct CrateError<S>(S);
pin_project_lite::pin_project! {
struct CrateError<S> {
#[pin]
inner: S
}
}
impl<T, E, S> Stream for CrateError<S>
where
S: Stream<Item = Result<T, E>> + Unpin,
S: Stream<Item = Result<T, E>>,
crate::error::Error: From<E>,
{
type Item = Result<T, crate::error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0)
let this = self.as_mut().project();
this.inner
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
@ -90,31 +97,35 @@ mod tokio_file {
mut self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<
impl Stream<Item = Result<Bytes, crate::error::Error>> + Unpin,
crate::error::Error,
> {
) -> Result<impl Stream<Item = Result<Bytes, crate::error::Error>>, crate::error::Error>
{
let obj = match (from_start, len) {
(Some(lower), Some(upper)) => {
self.inner.seek(SeekFrom::Start(lower)).await?;
Either::Left(self.inner.take(upper))
Either::left(self.inner.take(upper))
}
(None, Some(upper)) => Either::Left(self.inner.take(upper)),
(None, Some(upper)) => Either::left(self.inner.take(upper)),
(Some(lower), None) => {
self.inner.seek(SeekFrom::Start(lower)).await?;
Either::Right(self.inner)
Either::right(self.inner)
}
(None, None) => Either::Right(self.inner),
(None, None) => Either::right(self.inner),
};
Ok(super::CrateError(BytesFreezer(FramedRead::new(
obj,
BytesCodec::new(),
))))
Ok(super::CrateError {
inner: BytesFreezer {
inner: FramedRead::new(obj, BytesCodec::new()),
},
})
}
}
struct BytesFreezer<S>(S);
pin_project_lite::pin_project! {
struct BytesFreezer<S> {
#[pin]
inner: S,
}
}
impl<S, E> Stream for BytesFreezer<S>
where
@ -126,7 +137,9 @@ mod tokio_file {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0)
let this = self.as_mut().project();
this.inner
.poll_next(cx)
.map(|opt| opt.map(|res| res.map(BytesMut::freeze)))
}
@ -299,21 +312,21 @@ mod io_uring {
self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<
impl Stream<Item = Result<Bytes, crate::error::Error>> + Unpin,
crate::error::Error,
> {
) -> Result<impl Stream<Item = Result<Bytes, crate::error::Error>>, crate::error::Error>
{
let size = self.metadata().await?.len();
let cursor = from_start.unwrap_or(0);
let size = len.unwrap_or(size - cursor) + cursor;
Ok(super::CrateError(BytesStream {
file: Some(self),
size,
cursor,
fut: None,
}))
Ok(super::CrateError {
inner: BytesStream {
state: ReadFileState::File { file: Some(self) },
size,
cursor,
callback: read_file,
},
})
}
async fn read_at<T: IoBufMut>(&self, buf: T, pos: u64) -> BufResult<usize, T> {
@ -325,55 +338,89 @@ mod io_uring {
}
}
struct BytesStream {
file: Option<File>,
size: u64,
cursor: u64,
fut: Option<Pin<Box<dyn Future<Output = (File, BufResult<usize, Vec<u8>>)>>>>,
pin_project_lite::pin_project! {
struct BytesStream<F, Fut> {
#[pin]
state: ReadFileState<Fut>,
size: u64,
cursor: u64,
#[pin]
callback: F,
}
}
impl Stream for BytesStream {
pin_project_lite::pin_project! {
#[project = ReadFileStateProj]
#[project_replace = ReadFileStateProjReplace]
enum ReadFileState<Fut> {
File {
file: Option<File>,
},
Future {
#[pin]
fut: Fut,
},
}
}
async fn read_file(
file: File,
capacity: usize,
cursor: u64,
) -> (File, BufResult<usize, Vec<u8>>) {
let buf = Vec::with_capacity(capacity);
let buf_res = file.read_at(buf, cursor).await;
(file, buf_res)
}
impl<F, Fut> Stream for BytesStream<F, Fut>
where
F: Fn(File, usize, u64) -> Fut,
Fut: Future<Output = (File, BufResult<usize, Vec<u8>>)> + 'static,
{
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut fut = if let Some(fut) = self.fut.take() {
fut
} else {
let file = self.file.take().unwrap();
let mut this = self.as_mut().project();
if self.cursor == self.size {
return Poll::Ready(None);
match this.state.as_mut().project() {
ReadFileStateProj::File { file } => {
let cursor = *this.cursor;
let max_size = *this.size - *this.cursor;
if max_size == 0 {
return Poll::Ready(None);
}
let capacity = max_size.min(65_356) as usize;
let file = file.take().unwrap();
let fut = (this.callback)(file, capacity, cursor);
this.state.project_replace(ReadFileState::Future { fut });
self.poll_next(cx)
}
ReadFileStateProj::Future { fut } => match fut.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready((file, (Ok(n), mut buf))) => {
this.state
.project_replace(ReadFileState::File { file: Some(file) });
let cursor = self.cursor;
let max_size = self.size - self.cursor;
let _ = buf.split_off(n);
let n: u64 = match n.try_into() {
Ok(n) => n,
Err(_) => {
return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into())))
}
};
*this.cursor += n;
Box::pin(async move {
let buf = Vec::with_capacity(max_size.try_into().unwrap());
let buf_res = file.read_at(buf, cursor).await;
(file, buf_res)
})
};
match Pin::new(&mut fut).poll(cx) {
Poll::Pending => {
self.fut = Some(fut);
Poll::Pending
}
Poll::Ready((file, (Ok(n), mut buf))) => {
self.file = Some(file);
let _ = buf.split_off(n);
let n: u64 = match n.try_into() {
Ok(n) => n,
Err(_) => return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into()))),
};
self.cursor += n;
Poll::Ready(Some(Ok(Bytes::from(buf))))
}
Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(buf.into())))
}
Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))),
},
}
}
}

View file

@ -91,16 +91,24 @@ static PROCESS_MAP: Lazy<ProcessMap> = Lazy::new(DashMap::new);
type OutcomeSender = Sender<(Details, web::Bytes)>;
type ProcessMap = DashMap<PathBuf, Vec<OutcomeSender>>;
struct CancelSafeProcessor<F> {
struct CancelToken {
span: Span,
path: PathBuf,
receiver: Option<Receiver<(Details, web::Bytes)>>,
fut: F,
}
pin_project_lite::pin_project! {
struct CancelSafeProcessor<F> {
cancel_token: CancelToken,
#[pin]
fut: F,
}
}
impl<F> CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>> + Unpin,
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
pub(crate) fn new(path: PathBuf, fut: F) -> Self {
let entry = PROCESS_MAP.entry(path.clone());
@ -127,9 +135,11 @@ where
};
CancelSafeProcessor {
span,
path,
receiver,
cancel_token: CancelToken {
span,
path,
receiver,
},
fut,
}
}
@ -137,21 +147,26 @@ where
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, web::Bytes), Error>> + Unpin,
F: Future<Output = Result<(Details, web::Bytes), Error>>,
{
type Output = Result<(Details, web::Bytes), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let span = self.span.clone();
let this = self.as_mut().project();
let span = &this.cancel_token.span;
let receiver = &mut this.cancel_token.receiver;
let path = &this.cancel_token.path;
let fut = this.fut;
span.in_scope(|| {
if let Some(ref mut rx) = self.receiver {
if let Some(ref mut rx) = receiver {
Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into()))
} else {
Pin::new(&mut self.fut).poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(&self.path);
fut.poll(cx).map(|res| {
let opt = PROCESS_MAP.remove(path);
res.map(|tup| {
if let Some((_, vec)) = opt {
for sender in vec {
@ -166,7 +181,7 @@ where
}
}
impl<F> Drop for CancelSafeProcessor<F> {
impl Drop for CancelToken {
fn drop(&mut self) {
if self.receiver.is_none() {
let completed = PROCESS_MAP.remove(&self.path).is_none();
@ -591,7 +606,7 @@ async fn process(
};
let (details, bytes) =
CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?;
CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?;
return match range {
Some(range_header) => {
@ -717,7 +732,7 @@ async fn ranged_file_resp(
let mut builder = HttpResponse::PartialContent();
builder.insert_header(range.to_content_range(meta.len()));
(builder, Either::Left(range.chop_file(file).await?))
(builder, Either::left(range.chop_file(file).await?))
} else {
return Err(UploadError::Range.into());
}
@ -726,13 +741,13 @@ async fn ranged_file_resp(
None => {
let file = crate::file::File::open(path).await?;
let stream = file.read_to_stream(None, None).await?;
(HttpResponse::Ok(), Either::Right(stream))
(HttpResponse::Ok(), Either::right(stream))
}
};
Ok(srv_response(
builder,
stream,
Box::pin(stream),
details.content_type(),
7 * DAYS,
details.system_time(),

View file

@ -42,10 +42,7 @@ impl Range {
}
}
pub(crate) fn chop_bytes(
&self,
bytes: Bytes,
) -> impl Stream<Item = Result<Bytes, Error>> + Unpin {
pub(crate) fn chop_bytes(&self, bytes: Bytes) -> impl Stream<Item = Result<Bytes, Error>> {
match self {
Range::Start(start) => once(ready(Ok(bytes.slice(*start as usize..)))),
Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))),
@ -58,7 +55,7 @@ impl Range {
pub(crate) async fn chop_file(
&self,
file: crate::file::File,
) -> Result<impl Stream<Item = Result<Bytes, Error>> + Unpin, Error> {
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
match self {
Range::Start(start) => file.read_to_stream(Some(*start), None).await,
Range::SuffixLength(from_start) => file.read_to_stream(None, Some(*from_start)).await,