Go even faster

This commit is contained in:
Aode (Lion) 2021-08-21 15:48:38 -05:00
parent f28d287468
commit aef0e3c557
3 changed files with 352 additions and 5 deletions

View file

@ -13,11 +13,14 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.0.1"
actix-rt = "2.2.0"
bytes = "1"
futures = "0.3.5"
futures-util = { version = "0.3.16", default-features = false, features = ["sink"] }
log = "0.4"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] }
tokio-stream = { version = "0.1", default-features = false }
[dev-dependencies]
anyhow = "1.0"
futures = "0.3.16"

View file

@ -6,7 +6,7 @@
use crate::Error;
use actix_rt::task::{spawn_blocking, JoinError};
use bytes::{Bytes, BytesMut};
use futures::{
use futures_util::{
future::{FutureExt, LocalBoxFuture},
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
@ -20,6 +20,8 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
/// A stream of bytes from a file on the filesystem
pub struct FileStream {
@ -31,6 +33,12 @@ pub struct FileStream {
Option<LocalBoxFuture<'static, Result<Result<(File, Bytes, usize), io::Error>, JoinError>>>,
}
/// A faster stream of bytes from a file on the filesystem
pub struct FasterFileStream {
size: u64,
inner: ReceiverStream<Result<Bytes, Error>>,
}
struct FileSink<E> {
file: Option<File>,
fut: Option<LocalBoxFuture<'static, Result<Result<File, io::Error>, JoinError>>>,
@ -330,6 +338,43 @@ pub async fn read(file: File) -> Result<Bytes, Error> {
Ok(bytes_mut.freeze())
}
/// Pull all bytes from this file.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the read
/// operation should be retried if there is nothing else to do.
///
/// # Examples
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
///
/// let bytes = actix_fs::file::read_faster(f).await?;
///
/// println!("The bytes: {:?}", bytes);
/// Ok(())
/// }
/// ```
pub async fn read_faster(file: File) -> Result<Bytes, Error> {
let mut stream = FileStream::new(file).await?.faster();
let mut bytes_mut = BytesMut::with_capacity(stream.size() as usize);
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
Ok(bytes_mut.freeze())
}
/// Produce a new stream of bytes from this File
///
/// ```no_run
@ -391,6 +436,46 @@ pub async fn read_to_string(file: File) -> Result<String, Error> {
Ok(string)
}
/// Read all bytes until EOF in this source.
///
/// # Errors
///
/// If the data in this stream is *not* valid UTF-8 then an error is
/// returned and `buf` is unchanged.
///
/// See [`read`][read] for other error semantics.
///
/// [read]: fn.read.html
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let s = actix_fs::file::read_to_string_faster(f).await?;
/// Ok(())
/// }
/// ```
///
/// (See also the [`read_to_string_faster`] convenience function for
/// reading from a file.)
///
/// [`read_to_string_faster`]: ../fn.read_to_string_faster.html
pub async fn read_to_string_faster(file: File) -> Result<String, Error> {
let mut stream = read_to_stream(file).await?.faster();
let mut string = String::with_capacity(stream.size() as usize);
while let Some(res) = stream.next().await {
let bytes = res?;
let s = std::str::from_utf8(&bytes)?;
string.extend(s.chars());
}
Ok(string)
}
/// Write bytes into this writer.
///
/// # Errors
@ -465,6 +550,69 @@ where
Ok(sink.file.take().expect("Panick in write future"))
}
/// Write a stream of bytes into this writer.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the
/// write operation should be retried if there is nothing else to do.
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let file = actix_fs::file::create("foo.txt").await?;
/// let stream = actix_fs::read_to_stream("bar.txt").await?.faster();
///
/// actix_fs::file::write_stream_faster(file, stream).await?;
/// Ok(())
/// }
/// ```
pub async fn write_stream_faster<S, E>(mut file: File, mut stream: S) -> Result<File, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: From<Error> + Unpin,
{
let (tx, mut rx) = channel(1);
let handle = spawn_blocking(move || {
while let Some(bytes) = rx.blocking_recv() {
let bytes: Bytes = bytes;
file.write_all(bytes.as_ref())?;
}
Ok(file)
});
// Ensure `tx` dies at the end of the scope
{
let tx = tx;
while let Some(res) = stream.next().await {
if tx.send(res?).await.is_err() {
// It should be safe to break here because any failure to send indicates early exit on
// the receive side, which only happens in error. This will ensure we still return an
// error from this function instead of pretending things are okay
break;
}
}
}
let res: Result<Result<File, Error>, JoinError> = handle.await;
match res {
Ok(Ok(file)) => Ok(file),
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(Error::from(e).into()),
}
}
impl FileStream {
async fn new(file: File) -> Result<Self, Error> {
let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?;
@ -509,6 +657,80 @@ impl FileStream {
pub fn size(&self) -> u64 {
self.size
}
/// Returns a stream of bytes from the file that is sourced from a blocking loop on another
/// thread. This has the potential to be much faster than `FileStream` type since it incurs the
/// cost of running on another thread once at the start, rather than for every chunk.
pub fn faster(self) -> FasterFileStream {
let (tx, rx) = channel(1);
let FileStream {
mut file,
mut offset,
size,
chunk_size,
..
} = self;
let mut file = file.take().expect("Tried to concurrently access file");
let _ = spawn_blocking(move || {
let mut max_bytes: usize;
loop {
if size == offset || tx.is_closed() {
return;
}
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
let mut buf = Vec::with_capacity(max_bytes);
let pos_res = file.seek(io::SeekFrom::Start(offset));
let pos = match pos_res {
Ok(pos) => pos,
Err(e) => {
let _ = tx.blocking_send(Err(e.into()));
return;
}
};
let nbytes_res = Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf);
let nbytes = match nbytes_res {
Ok(0) => {
let _ = tx
.blocking_send(Err(
io::Error::from(io::ErrorKind::UnexpectedEof).into()
));
return;
}
Ok(nbytes) => nbytes,
Err(e) => {
let _ = tx.blocking_send(Err(e.into()));
return;
}
};
if tx.blocking_send(Ok(Bytes::from(buf))).is_err() {
return;
}
offset = pos + nbytes as u64;
}
});
FasterFileStream {
size,
inner: ReceiverStream::new(rx),
}
}
}
impl FasterFileStream {
/// Get the size of the file being streamed
pub fn size(&self) -> u64 {
self.size
}
}
impl<E> FileSink<E> {
@ -569,6 +791,14 @@ impl Stream for FileStream {
}
}
impl Stream for FasterFileStream {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<E> Sink<Bytes> for FileSink<E>
where
E: From<Error> + Unpin,
@ -658,6 +888,7 @@ mod tests {
const READ_FILE: &str = "tests/read.txt";
const WRITE_FILE: &str = "tests/write.txt";
const WRITE_FILE_FASTER: &str = "tests/write-faster.txt";
const TEST_FILE: &str = "tests/test.txt";
#[test]
@ -683,6 +914,29 @@ mod tests {
.unwrap()
}
#[test]
fn stream_file_faster() {
run(async move {
let read_file = open(READ_FILE).await?;
let stream = read_to_stream(read_file).await?.faster();
let write_file = create(WRITE_FILE_FASTER).await?;
write_stream_faster(write_file, stream).await?;
let read_file = open(READ_FILE).await?;
let write_file = open(WRITE_FILE_FASTER).await?;
let read_bytes = read_faster(read_file).await?;
let written_bytes = read_faster(write_file).await?;
assert!(written_bytes.as_ref() == read_bytes.as_ref());
remove_file(WRITE_FILE_FASTER).await?;
Ok(()) as Result<_, Error>
})
.unwrap()
}
#[test]
fn read_write_file() {
let bytes_to_be_written = b"abcdefg";
@ -714,6 +968,18 @@ mod tests {
.unwrap();
}
#[test]
fn read_file_faster() {
run(async move {
let file = open(READ_FILE).await?;
let bytes = read_faster(file).await?;
assert!(bytes.as_ref() == b"Hello, World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn seek_file() {
run(async move {
@ -729,6 +995,21 @@ mod tests {
.unwrap();
}
#[test]
fn seek_file_faster() {
run(async move {
let file = open(READ_FILE).await?;
let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?;
assert!(pos == 7);
let bytes = read_faster(file).await?;
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn small_chunks() {
run(async move {
@ -749,6 +1030,26 @@ mod tests {
.unwrap();
}
#[test]
fn small_chunks_faster() {
run(async move {
let file = open(READ_FILE).await?;
let mut bytes_mut = BytesMut::new();
let (file, _) = seek(file, io::SeekFrom::Start(7)).await?;
let mut stream = read_to_stream(file).await?.chunk_size(2).faster();
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
let bytes = bytes_mut.freeze();
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
fn run<F: Future + 'static>(f: F) -> F::Output {
actix_rt::System::new().block_on(f)
}

View file

@ -25,7 +25,7 @@
//!
//! ### License
//!
//! Copyright © 2020 Riley Trautman
//! Copyright © 2021 Riley Trautman
//!
//! pict-rs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
//!
@ -37,7 +37,7 @@
use actix_rt::task::spawn_blocking;
use bytes::Bytes;
use futures::stream::Stream;
use futures_util::stream::Stream;
use std::{
fs, io,
path::{Path, PathBuf},
@ -515,6 +515,49 @@ where
file::read_to_string(f).await
}
/// Read the entire contents of a file into a string.
///
/// This is a convenience function for using [`file::open`] and [`read_to_string`]
/// with fewer imports and without an intermediate variable. It pre-allocates a
/// buffer based on the file size when available, so it is generally faster than
/// reading into a string created with `String::new()`.
///
/// [`file::open`]: ./file/fn.open.html
/// [`read_to_string_faster`]: ./file/fn.read_to_string_faster.html
///
/// # Errors
///
/// This function will return an error if `path` does not already exist.
/// Other errors may also be returned according to [`OpenOptions::open`].
///
/// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
///
/// It will also return an error if it encounters while reading an error
/// of a kind other than [`ErrorKind::Interrupted`],
/// or if the contents of the file are not valid UTF-8.
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// # Examples
///
/// ```no_run
/// use std::net::SocketAddr;
///
/// #[actix_rt::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
/// let foo: SocketAddr = actix_fs::read_to_string_faster("address.txt").await?.parse()?;
/// Ok(())
/// }
/// ```
pub async fn read_to_string_faster<P>(path: P) -> Result<String>
where
P: AsRef<Path> + Send + 'static,
{
let f = file::open(path).await?;
file::read_to_string_faster(f).await
}
/// Removes an existing, empty directory.
///
/// # Platform-specific behavior