diff --git a/src/file.rs b/src/file.rs index a8b0fc1..a2408df 100644 --- a/src/file.rs +++ b/src/file.rs @@ -113,10 +113,7 @@ mod io_uring { use std::{ convert::TryInto, fs::Metadata, - future::Future, path::{Path, PathBuf}, - pin::Pin, - task::{Context, Poll}, }; use streem::IntoStreamer; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -319,20 +316,7 @@ mod io_uring { from_start: Option, len: Option, ) -> Result>, FileError> { - let size = self.metadata().await?.len(); - - let cursor = from_start.unwrap_or(0); - let size = len.unwrap_or(size - cursor) + cursor; - - Ok(BytesStream { - state: ReadFileState::File { - file: Some(self), - bytes: Some(BytesMut::new()), - }, - size, - cursor, - callback: read_file, - }) + Ok(bytes_stream(self, from_start, len)) } async fn read_at(&self, buf: T, pos: u64) -> BufResult { @@ -344,98 +328,50 @@ mod io_uring { } } - pin_project_lite::pin_project! { - struct BytesStream { - #[pin] - state: ReadFileState, - size: u64, - cursor: u64, - #[pin] - callback: F, - } - } - - pin_project_lite::pin_project! { - #[project = ReadFileStateProj] - #[project_replace = ReadFileStateProjReplace] - enum ReadFileState { - File { - file: Option, - bytes: Option, - }, - Future { - #[pin] - fut: Fut, - }, - } - } - - async fn read_file( + fn bytes_stream( file: File, - buf: BytesMut, - cursor: u64, - ) -> (File, BufResult) { - let buf_res = file.read_at(buf, cursor).await; + from_start: Option, + len: Option, + ) -> impl Stream> { + streem::try_from_fn(|yielder| async move { + let file_size = file.metadata().await?.len(); - (file, buf_res) - } + let mut cursor = from_start.unwrap_or(0); + let remaining_size = file_size.saturating_sub(cursor); + let read_until = len.unwrap_or(remaining_size) + cursor; - impl Stream for BytesStream - where - F: Fn(File, BytesMut, u64) -> Fut, - Fut: Future)> + 'static, - { - type Item = std::io::Result; + let mut bytes = BytesMut::new(); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); + loop { + let max_size = read_until.saturating_sub(cursor); - match this.state.as_mut().project() { - ReadFileStateProj::File { file, bytes } => { - let cursor = *this.cursor; - let max_size = *this.size - *this.cursor; - - if max_size == 0 { - return Poll::Ready(None); - } - - let capacity = max_size.min(65_356) as usize; - let mut bytes = bytes.take().unwrap(); - let file = file.take().unwrap(); - - if bytes.capacity() < capacity { - bytes.reserve(capacity - bytes.capacity()); - } - - let fut = (this.callback)(file, bytes, cursor); - - this.state.project_replace(ReadFileState::Future { fut }); - self.poll_next(cx) + if max_size == 0 { + break; } - ReadFileStateProj::Future { fut } => match fut.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready((file, (Ok(n), mut buf))) => { - let bytes = buf.split_off(n); - this.state.project_replace(ReadFileState::File { - file: Some(file), - bytes: Some(bytes), - }); + let capacity = max_size.min(65_356) as usize; - let n: u64 = match n.try_into() { - Ok(n) => n, - Err(_) => { - return Poll::Ready(Some(Err(std::io::ErrorKind::Other.into()))) - } - }; - *this.cursor += n; + if bytes.capacity() < capacity { + bytes.reserve(capacity - bytes.capacity()); + } - Poll::Ready(Some(Ok(buf.into()))) - } - Poll::Ready((_, (Err(e), _))) => Poll::Ready(Some(Err(e))), - }, + let (result, mut buf_) = file.read_at(bytes, cursor).await; + + let n = match result { + Ok(n) => n, + Err(e) => return Err(e), + }; + + bytes = buf_.split_off(n); + + let n: u64 = n.try_into().map_err(|_| std::io::ErrorKind::Other)?; + cursor += n; + + yielder.yield_ok(buf_.into()).await; } - } + + Ok(()) + }) } #[cfg(test)] diff --git a/src/stream.rs b/src/stream.rs index f7d4449..9e617aa 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -58,6 +58,7 @@ where }) } +#[cfg(not(feature = "io-uring"))] pub(crate) fn map_ok(stream: S, f: F) -> impl Stream> where S: Stream>,