From 2fb8d3e39a0f7b6ab671d0b024b68456ba85f913 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Mon, 18 Oct 2021 18:02:33 -0500 Subject: [PATCH] Reduce required Unpin bounds with pin-project-lite --- Cargo.lock | 42 ++++++------ Cargo.toml | 1 + src/either.rs | 50 ++++++++++---- src/file.rs | 181 +++++++++++++++++++++++++++++++------------------- src/main.rs | 47 ++++++++----- src/range.rs | 7 +- 6 files changed, 207 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0395c4..afc7ef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "actix-macros" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f86cd6857c135e6e9fe57b1619a88d1f94a7df34c00e11fe13e64fd3438837" +checksum = "c8c999eaf5f8414142b5bb60e43dcb27ccd12c2c15e4133e470f3c5060472532" dependencies = [ "quote", "syn", @@ -240,9 +240,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991984e3fd003e7ba02eb724f87a0f997b78677c46c0e91f8424ad7394c9886a" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ "getrandom", "once_cell", @@ -560,9 +560,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "encoding_rs" -version = "0.8.28" +version = "0.8.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" +checksum = "a74ea89a0a1b98f6332de42c95baff457ada66d1cb4030f9ff151b2041a1c746" dependencies = [ "cfg-if", ] @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "instant" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", ] @@ -920,9 +920,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.103" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" +checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce" [[package]] name = "local-channel" @@ -998,9 +998,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "mio" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" dependencies = [ "libc", "log", @@ -1184,6 +1184,7 @@ dependencies = [ "once_cell", "opentelemetry", "opentelemetry-otlp", + "pin-project-lite", "rand", "serde", "serde_json", @@ -1240,9 +1241,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" -version = "0.2.10" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741" [[package]] name = "proc-macro-error" @@ -1753,9 +1754,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "structopt" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf9d950ef167e25e0bdb073cf1d68e9ad2795ac826f2f3f59647817cf23c0bfa" +checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c" dependencies = [ "clap", "lazy_static", @@ -1764,9 +1765,9 @@ dependencies = [ [[package]] name = "structopt-derive" -version = "0.4.16" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134d838a2c9943ac3125cf6df165eda53493451b719f3255b2a26b85f772d0ba" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ "heck", "proc-macro-error", @@ -2036,14 +2037,15 @@ dependencies = [ [[package]] name = "tower" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f60422bc7fefa2f3ec70359b8ff1caff59d785877eb70595904605bcc412470f" +checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30" dependencies = [ "futures-core", "futures-util", "indexmap", "pin-project", + "pin-project-lite", "rand", "slab", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 4c629c7..69841ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ num_cpus = "1.13" once_cell = "1.4.0" opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" +pin-project-lite = "0.2.7" rand = "0.8.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/either.rs b/src/either.rs index 5bdeedd..97bef7a 100644 --- a/src/either.rs +++ b/src/either.rs @@ -5,39 +5,63 @@ use std::{ }; use tokio::io::{AsyncRead, ReadBuf}; -pub(crate) enum Either { - Left(Left), - Right(Right), +pin_project_lite::pin_project! { + #[project = EitherProj] + #[project_replace = EitherProjReplace] + pub(crate) enum Either { + Left { + #[pin] + left: Left, + }, + Right { + #[pin] + right: Right, + }, + } +} + +impl Either { + pub(crate) fn left(left: L) -> Self { + Either::Left { left } + } + + pub(crate) fn right(right: R) -> Self { + Either::Right { right } + } } impl AsyncRead for Either where - Left: AsyncRead + Unpin, - Right: AsyncRead + Unpin, + Left: AsyncRead, + Right: AsyncRead, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - match *self { - Self::Left(ref mut left) => Pin::new(left).poll_read(cx, buf), - Self::Right(ref mut right) => Pin::new(right).poll_read(cx, buf), + let this = self.as_mut().project(); + + match this { + EitherProj::Left { left } => left.poll_read(cx, buf), + EitherProj::Right { right } => right.poll_read(cx, buf), } } } impl Stream for Either where - Left: Stream::Item> + Unpin, - Right: Stream + Unpin, + Left: Stream::Item>, + Right: Stream, { type Item = ::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match *self { - Self::Left(ref mut left) => Pin::new(left).poll_next(cx), - Self::Right(ref mut right) => Pin::new(right).poll_next(cx), + let this = self.as_mut().project(); + + match this { + EitherProj::Left { left } => left.poll_next(cx), + EitherProj::Right { right } => right.poll_next(cx), } } } diff --git a/src/file.rs b/src/file.rs index a31f0e1..2c8e79d 100644 --- a/src/file.rs +++ b/src/file.rs @@ -10,17 +10,24 @@ pub(crate) use io_uring::File; #[cfg(not(feature = "io-uring"))] pub(crate) use tokio_file::File; -struct CrateError(S); +pin_project_lite::pin_project! { + struct CrateError { + #[pin] + inner: S + } +} impl Stream for CrateError where - S: Stream> + Unpin, + S: Stream>, crate::error::Error: From, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0) + let this = self.as_mut().project(); + + this.inner .poll_next(cx) .map(|opt| opt.map(|res| res.map_err(Into::into))) } @@ -90,31 +97,35 @@ mod tokio_file { mut self, from_start: Option, len: Option, - ) -> Result< - impl Stream> + Unpin, - crate::error::Error, - > { + ) -> Result>, crate::error::Error> + { let obj = match (from_start, len) { (Some(lower), Some(upper)) => { self.inner.seek(SeekFrom::Start(lower)).await?; - Either::Left(self.inner.take(upper)) + Either::left(self.inner.take(upper)) } - (None, Some(upper)) => Either::Left(self.inner.take(upper)), + (None, Some(upper)) => Either::left(self.inner.take(upper)), (Some(lower), None) => { self.inner.seek(SeekFrom::Start(lower)).await?; - Either::Right(self.inner) + Either::right(self.inner) } - (None, None) => Either::Right(self.inner), + (None, None) => Either::right(self.inner), }; - Ok(super::CrateError(BytesFreezer(FramedRead::new( - obj, - BytesCodec::new(), - )))) + Ok(super::CrateError { + inner: BytesFreezer { + inner: FramedRead::new(obj, BytesCodec::new()), + }, + }) } } - struct BytesFreezer(S); + pin_project_lite::pin_project! { + struct BytesFreezer { + #[pin] + inner: S, + } + } impl Stream for BytesFreezer where @@ -126,7 +137,9 @@ mod tokio_file { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.0) + let this = self.as_mut().project(); + + this.inner .poll_next(cx) .map(|opt| opt.map(|res| res.map(BytesMut::freeze))) } @@ -299,21 +312,21 @@ mod io_uring { self, from_start: Option, len: Option, - ) -> Result< - impl Stream> + Unpin, - crate::error::Error, - > { + ) -> Result>, crate::error::Error> + { let size = self.metadata().await?.len(); let cursor = from_start.unwrap_or(0); let size = len.unwrap_or(size - cursor) + cursor; - Ok(super::CrateError(BytesStream { - file: Some(self), - size, - cursor, - fut: None, - })) + Ok(super::CrateError { + inner: BytesStream { + state: ReadFileState::File { file: Some(self) }, + size, + cursor, + callback: read_file, + }, + }) } async fn read_at(&self, buf: T, pos: u64) -> BufResult { @@ -325,55 +338,89 @@ mod io_uring { } } - struct BytesStream { - file: Option, - size: u64, - cursor: u64, - fut: Option>)>>>>, + pin_project_lite::pin_project! { + struct BytesStream { + #[pin] + state: ReadFileState, + size: u64, + cursor: u64, + #[pin] + callback: F, + } } - impl Stream for BytesStream { + pin_project_lite::pin_project! { + #[project = ReadFileStateProj] + #[project_replace = ReadFileStateProjReplace] + enum ReadFileState { + File { + file: Option, + }, + Future { + #[pin] + fut: Fut, + }, + } + } + + async fn read_file( + file: File, + capacity: usize, + cursor: u64, + ) -> (File, BufResult>) { + let buf = Vec::with_capacity(capacity); + + let buf_res = file.read_at(buf, cursor).await; + + (file, buf_res) + } + + impl Stream for BytesStream + where + F: Fn(File, usize, u64) -> Fut, + Fut: Future>)> + 'static, + { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut fut = if let Some(fut) = self.fut.take() { - fut - } else { - let file = self.file.take().unwrap(); + let mut this = self.as_mut().project(); - if self.cursor == self.size { - return Poll::Ready(None); + match this.state.as_mut().project() { + ReadFileStateProj::File { file } => { + let cursor = *this.cursor; + let max_size = *this.size - *this.cursor; + + if max_size == 0 { + return Poll::Ready(None); + } + + let capacity = max_size.min(65_356) as usize; + let file = file.take().unwrap(); + + let fut = (this.callback)(file, capacity, cursor); + + this.state.project_replace(ReadFileState::Future { fut }); + self.poll_next(cx) } + ReadFileStateProj::Future { fut } => match fut.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready((file, (Ok(n), mut buf))) => { + this.state + .project_replace(ReadFileState::File { file: Some(file) }); - let cursor = self.cursor; - let max_size = self.size - self.cursor; + let _ = buf.split_off(n); + let n: u64 = match n.try_into() { + Ok(n) => n, + Err(_) => { + return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into()))) + } + }; + *this.cursor += n; - Box::pin(async move { - let buf = Vec::with_capacity(max_size.try_into().unwrap()); - - let buf_res = file.read_at(buf, cursor).await; - - (file, buf_res) - }) - }; - - match Pin::new(&mut fut).poll(cx) { - Poll::Pending => { - self.fut = Some(fut); - Poll::Pending - } - Poll::Ready((file, (Ok(n), mut buf))) => { - self.file = Some(file); - let _ = buf.split_off(n); - let n: u64 = match n.try_into() { - Ok(n) => n, - Err(_) => return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into()))), - }; - self.cursor += n; - - Poll::Ready(Some(Ok(Bytes::from(buf)))) - } - Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(buf.into()))) + } + Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))), + }, } } } diff --git a/src/main.rs b/src/main.rs index 5695eaa..20124cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -91,16 +91,24 @@ static PROCESS_MAP: Lazy = Lazy::new(DashMap::new); type OutcomeSender = Sender<(Details, web::Bytes)>; type ProcessMap = DashMap>; -struct CancelSafeProcessor { +struct CancelToken { span: Span, path: PathBuf, receiver: Option>, - fut: F, +} + +pin_project_lite::pin_project! { + struct CancelSafeProcessor { + cancel_token: CancelToken, + + #[pin] + fut: F, + } } impl CancelSafeProcessor where - F: Future> + Unpin, + F: Future>, { pub(crate) fn new(path: PathBuf, fut: F) -> Self { let entry = PROCESS_MAP.entry(path.clone()); @@ -127,9 +135,11 @@ where }; CancelSafeProcessor { - span, - path, - receiver, + cancel_token: CancelToken { + span, + path, + receiver, + }, fut, } } @@ -137,21 +147,26 @@ where impl Future for CancelSafeProcessor where - F: Future> + Unpin, + F: Future>, { type Output = Result<(Details, web::Bytes), Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let span = self.span.clone(); + let this = self.as_mut().project(); + + let span = &this.cancel_token.span; + let receiver = &mut this.cancel_token.receiver; + let path = &this.cancel_token.path; + let fut = this.fut; span.in_scope(|| { - if let Some(ref mut rx) = self.receiver { + if let Some(ref mut rx) = receiver { Pin::new(rx) .poll(cx) .map(|res| res.map_err(|_| UploadError::Canceled.into())) } else { - Pin::new(&mut self.fut).poll(cx).map(|res| { - let opt = PROCESS_MAP.remove(&self.path); + fut.poll(cx).map(|res| { + let opt = PROCESS_MAP.remove(path); res.map(|tup| { if let Some((_, vec)) = opt { for sender in vec { @@ -166,7 +181,7 @@ where } } -impl Drop for CancelSafeProcessor { +impl Drop for CancelToken { fn drop(&mut self) { if self.receiver.is_none() { let completed = PROCESS_MAP.remove(&self.path).is_none(); @@ -591,7 +606,7 @@ async fn process( }; let (details, bytes) = - CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?; + CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?; return match range { Some(range_header) => { @@ -717,7 +732,7 @@ async fn ranged_file_resp( let mut builder = HttpResponse::PartialContent(); builder.insert_header(range.to_content_range(meta.len())); - (builder, Either::Left(range.chop_file(file).await?)) + (builder, Either::left(range.chop_file(file).await?)) } else { return Err(UploadError::Range.into()); } @@ -726,13 +741,13 @@ async fn ranged_file_resp( None => { let file = crate::file::File::open(path).await?; let stream = file.read_to_stream(None, None).await?; - (HttpResponse::Ok(), Either::Right(stream)) + (HttpResponse::Ok(), Either::right(stream)) } }; Ok(srv_response( builder, - stream, + Box::pin(stream), details.content_type(), 7 * DAYS, details.system_time(), diff --git a/src/range.rs b/src/range.rs index 5353d09..39b0b77 100644 --- a/src/range.rs +++ b/src/range.rs @@ -42,10 +42,7 @@ impl Range { } } - pub(crate) fn chop_bytes( - &self, - bytes: Bytes, - ) -> impl Stream> + Unpin { + pub(crate) fn chop_bytes(&self, bytes: Bytes) -> impl Stream> { match self { Range::Start(start) => once(ready(Ok(bytes.slice(*start as usize..)))), Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))), @@ -58,7 +55,7 @@ impl Range { pub(crate) async fn chop_file( &self, file: crate::file::File, - ) -> Result> + Unpin, Error> { + ) -> Result>, Error> { match self { Range::Start(start) => file.read_to_stream(Some(*start), None).await, Range::SuffixLength(from_start) => file.read_to_stream(None, Some(*from_start)).await,