From aef0e3c557b1365c0c1039bedd321724cd09201f Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sat, 21 Aug 2021 15:48:38 -0500 Subject: [PATCH] Go even faster --- Cargo.toml | 7 +- src/file.rs | 303 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 47 +++++++- 3 files changed, 352 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1078b74..d01edd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,14 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-rt = "2.0.1" +actix-rt = "2.2.0" bytes = "1" -futures = "0.3.5" +futures-util = { version = "0.3.16", default-features = false, features = ["sink"] } log = "0.4" thiserror = "1.0" +tokio = { version = "1", default-features = false, features = ["sync"] } +tokio-stream = { version = "0.1", default-features = false } [dev-dependencies] anyhow = "1.0" +futures = "0.3.16" diff --git a/src/file.rs b/src/file.rs index 8424a31..34b99bc 100644 --- a/src/file.rs +++ b/src/file.rs @@ -6,7 +6,7 @@ use crate::Error; use actix_rt::task::{spawn_blocking, JoinError}; use bytes::{Bytes, BytesMut}; -use futures::{ +use futures_util::{ future::{FutureExt, LocalBoxFuture}, sink::{Sink, SinkExt}, stream::{Stream, StreamExt}, @@ -20,6 +20,8 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::sync::mpsc::channel; +use tokio_stream::wrappers::ReceiverStream; /// A stream of bytes from a file on the filesystem pub struct FileStream { @@ -31,6 +33,12 @@ pub struct FileStream { Option, JoinError>>>, } +/// A faster stream of bytes from a file on the filesystem +pub struct FasterFileStream { + size: u64, + inner: ReceiverStream>, +} + struct FileSink { file: Option, fut: Option, JoinError>>>, @@ -330,6 +338,43 @@ pub async fn read(file: File) -> Result { Ok(bytes_mut.freeze()) } +/// Pull all bytes from this file. +/// +/// # Errors +/// +/// If this function encounters any form of I/O or other error, an error +/// variant will be returned. If an error is returned then it must be +/// guaranteed that no bytes were read. +/// +/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the read +/// operation should be retried if there is nothing else to do. +/// +/// # Examples +/// +/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted +/// +/// ```no_run +/// #[actix_rt::main] +/// async fn main() -> actix_fs::Result<()> { +/// let f = actix_fs::file::open("foo.txt").await?; +/// +/// let bytes = actix_fs::file::read_faster(f).await?; +/// +/// println!("The bytes: {:?}", bytes); +/// Ok(()) +/// } +/// ``` +pub async fn read_faster(file: File) -> Result { + let mut stream = FileStream::new(file).await?.faster(); + let mut bytes_mut = BytesMut::with_capacity(stream.size() as usize); + + while let Some(res) = stream.next().await { + bytes_mut.extend(res?); + } + + Ok(bytes_mut.freeze()) +} + /// Produce a new stream of bytes from this File /// /// ```no_run @@ -391,6 +436,46 @@ pub async fn read_to_string(file: File) -> Result { Ok(string) } +/// Read all bytes until EOF in this source. +/// +/// # Errors +/// +/// If the data in this stream is *not* valid UTF-8 then an error is +/// returned and `buf` is unchanged. +/// +/// See [`read`][read] for other error semantics. +/// +/// [read]: fn.read.html +/// +/// # Examples +/// +/// ```no_run +/// #[actix_rt::main] +/// async fn main() -> actix_fs::Result<()> { +/// let f = actix_fs::file::open("foo.txt").await?; +/// let s = actix_fs::file::read_to_string_faster(f).await?; +/// Ok(()) +/// } +/// ``` +/// +/// (See also the [`read_to_string_faster`] convenience function for +/// reading from a file.) +/// +/// [`read_to_string_faster`]: ../fn.read_to_string_faster.html +pub async fn read_to_string_faster(file: File) -> Result { + let mut stream = read_to_stream(file).await?.faster(); + + let mut string = String::with_capacity(stream.size() as usize); + + while let Some(res) = stream.next().await { + let bytes = res?; + let s = std::str::from_utf8(&bytes)?; + string.extend(s.chars()); + } + + Ok(string) +} + /// Write bytes into this writer. /// /// # Errors @@ -465,6 +550,69 @@ where Ok(sink.file.take().expect("Panick in write future")) } +/// Write a stream of bytes into this writer. +/// +/// # Errors +/// +/// Each call to `write` may generate an I/O error indicating that the +/// operation could not be completed. If an error is returned then no bytes +/// in the buffer were written to this writer. +/// +/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the +/// write operation should be retried if there is nothing else to do. +/// +/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted +/// +/// # Examples +/// +/// ```no_run +/// #[actix_rt::main] +/// async fn main() -> actix_fs::Result<()> { +/// let file = actix_fs::file::create("foo.txt").await?; +/// let stream = actix_fs::read_to_stream("bar.txt").await?.faster(); +/// +/// actix_fs::file::write_stream_faster(file, stream).await?; +/// Ok(()) +/// } +/// ``` +pub async fn write_stream_faster(mut file: File, mut stream: S) -> Result +where + S: Stream> + Unpin, + E: From + Unpin, +{ + let (tx, mut rx) = channel(1); + + let handle = spawn_blocking(move || { + while let Some(bytes) = rx.blocking_recv() { + let bytes: Bytes = bytes; + file.write_all(bytes.as_ref())?; + } + + Ok(file) + }); + + // Ensure `tx` dies at the end of the scope + { + let tx = tx; + while let Some(res) = stream.next().await { + if tx.send(res?).await.is_err() { + // It should be safe to break here because any failure to send indicates early exit on + // the receive side, which only happens in error. This will ensure we still return an + // error from this function instead of pretending things are okay + break; + } + } + } + + let res: Result, JoinError> = handle.await; + + match res { + Ok(Ok(file)) => Ok(file), + Ok(Err(e)) => Err(e.into()), + Err(e) => Err(Error::from(e).into()), + } +} + impl FileStream { async fn new(file: File) -> Result { let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?; @@ -509,6 +657,80 @@ impl FileStream { pub fn size(&self) -> u64 { self.size } + + /// Returns a stream of bytes from the file that is sourced from a blocking loop on another + /// thread. This has the potential to be much faster than `FileStream` type since it incurs the + /// cost of running on another thread once at the start, rather than for every chunk. + pub fn faster(self) -> FasterFileStream { + let (tx, rx) = channel(1); + + let FileStream { + mut file, + mut offset, + size, + chunk_size, + .. + } = self; + let mut file = file.take().expect("Tried to concurrently access file"); + + let _ = spawn_blocking(move || { + let mut max_bytes: usize; + + loop { + if size == offset || tx.is_closed() { + return; + } + + max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize; + let mut buf = Vec::with_capacity(max_bytes); + let pos_res = file.seek(io::SeekFrom::Start(offset)); + + let pos = match pos_res { + Ok(pos) => pos, + Err(e) => { + let _ = tx.blocking_send(Err(e.into())); + return; + } + }; + + let nbytes_res = Read::by_ref(&mut file) + .take(max_bytes as u64) + .read_to_end(&mut buf); + + let nbytes = match nbytes_res { + Ok(0) => { + let _ = tx + .blocking_send(Err( + io::Error::from(io::ErrorKind::UnexpectedEof).into() + )); + return; + } + Ok(nbytes) => nbytes, + Err(e) => { + let _ = tx.blocking_send(Err(e.into())); + return; + } + }; + + if tx.blocking_send(Ok(Bytes::from(buf))).is_err() { + return; + } + offset = pos + nbytes as u64; + } + }); + + FasterFileStream { + size, + inner: ReceiverStream::new(rx), + } + } +} + +impl FasterFileStream { + /// Get the size of the file being streamed + pub fn size(&self) -> u64 { + self.size + } } impl FileSink { @@ -569,6 +791,14 @@ impl Stream for FileStream { } } +impl Stream for FasterFileStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + impl Sink for FileSink where E: From + Unpin, @@ -658,6 +888,7 @@ mod tests { const READ_FILE: &str = "tests/read.txt"; const WRITE_FILE: &str = "tests/write.txt"; + const WRITE_FILE_FASTER: &str = "tests/write-faster.txt"; const TEST_FILE: &str = "tests/test.txt"; #[test] @@ -683,6 +914,29 @@ mod tests { .unwrap() } + #[test] + fn stream_file_faster() { + run(async move { + let read_file = open(READ_FILE).await?; + let stream = read_to_stream(read_file).await?.faster(); + let write_file = create(WRITE_FILE_FASTER).await?; + + write_stream_faster(write_file, stream).await?; + + let read_file = open(READ_FILE).await?; + let write_file = open(WRITE_FILE_FASTER).await?; + let read_bytes = read_faster(read_file).await?; + let written_bytes = read_faster(write_file).await?; + + assert!(written_bytes.as_ref() == read_bytes.as_ref()); + + remove_file(WRITE_FILE_FASTER).await?; + + Ok(()) as Result<_, Error> + }) + .unwrap() + } + #[test] fn read_write_file() { let bytes_to_be_written = b"abcdefg"; @@ -714,6 +968,18 @@ mod tests { .unwrap(); } + #[test] + fn read_file_faster() { + run(async move { + let file = open(READ_FILE).await?; + let bytes = read_faster(file).await?; + + assert!(bytes.as_ref() == b"Hello, World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + #[test] fn seek_file() { run(async move { @@ -729,6 +995,21 @@ mod tests { .unwrap(); } + #[test] + fn seek_file_faster() { + run(async move { + let file = open(READ_FILE).await?; + let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?; + assert!(pos == 7); + + let bytes = read_faster(file).await?; + + assert!(bytes.as_ref() == b"World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + #[test] fn small_chunks() { run(async move { @@ -749,6 +1030,26 @@ mod tests { .unwrap(); } + #[test] + fn small_chunks_faster() { + run(async move { + let file = open(READ_FILE).await?; + + let mut bytes_mut = BytesMut::new(); + let (file, _) = seek(file, io::SeekFrom::Start(7)).await?; + let mut stream = read_to_stream(file).await?.chunk_size(2).faster(); + + while let Some(res) = stream.next().await { + bytes_mut.extend(res?); + } + let bytes = bytes_mut.freeze(); + + assert!(bytes.as_ref() == b"World!\n"); + Ok(()) as Result<_, Error> + }) + .unwrap(); + } + fn run(f: F) -> F::Output { actix_rt::System::new().block_on(f) } diff --git a/src/lib.rs b/src/lib.rs index 3bf549c..82cc364 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,7 @@ //! //! ### License //! -//! Copyright © 2020 Riley Trautman +//! Copyright © 2021 Riley Trautman //! //! pict-rs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. //! @@ -37,7 +37,7 @@ use actix_rt::task::spawn_blocking; use bytes::Bytes; -use futures::stream::Stream; +use futures_util::stream::Stream; use std::{ fs, io, path::{Path, PathBuf}, @@ -515,6 +515,49 @@ where file::read_to_string(f).await } +/// Read the entire contents of a file into a string. +/// +/// This is a convenience function for using [`file::open`] and [`read_to_string`] +/// with fewer imports and without an intermediate variable. It pre-allocates a +/// buffer based on the file size when available, so it is generally faster than +/// reading into a string created with `String::new()`. +/// +/// [`file::open`]: ./file/fn.open.html +/// [`read_to_string_faster`]: ./file/fn.read_to_string_faster.html +/// +/// # Errors +/// +/// This function will return an error if `path` does not already exist. +/// Other errors may also be returned according to [`OpenOptions::open`]. +/// +/// [`OpenOptions::open`]: struct.OpenOptions.html#method.open +/// +/// It will also return an error if it encounters while reading an error +/// of a kind other than [`ErrorKind::Interrupted`], +/// or if the contents of the file are not valid UTF-8. +/// +/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted +/// +/// # Examples +/// +/// ```no_run +/// use std::net::SocketAddr; +/// +/// #[actix_rt::main] +/// async fn main() -> Result<(), Box> { +/// let foo: SocketAddr = actix_fs::read_to_string_faster("address.txt").await?.parse()?; +/// Ok(()) +/// } +/// ``` +pub async fn read_to_string_faster

(path: P) -> Result +where + P: AsRef + Send + 'static, +{ + let f = file::open(path).await?; + + file::read_to_string_faster(f).await +} + /// Removes an existing, empty directory. /// /// # Platform-specific behavior