Compare commits
3 commits
main
...
asonix/act
Author | SHA1 | Date | |
---|---|---|---|
Aode (Lion) | aef0e3c557 | ||
asonix | f28d287468 | ||
asonix | 602d9de3e1 |
10
Cargo.toml
10
Cargo.toml
|
@ -13,12 +13,14 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
actix-threadpool = "0.3.2"
|
||||
bytes = "0.5.4"
|
||||
futures = "0.3.5"
|
||||
actix-rt = "2.2.0"
|
||||
bytes = "1"
|
||||
futures-util = { version = "0.3.16", default-features = false, features = ["sink"] }
|
||||
log = "0.4"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
tokio-stream = { version = "0.1", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "1.1.1"
|
||||
anyhow = "1.0"
|
||||
futures = "0.3.16"
|
||||
|
|
364
src/file.rs
364
src/file.rs
|
@ -4,9 +4,9 @@
|
|||
//! to the ownership requirements of threaded interaction.
|
||||
|
||||
use crate::Error;
|
||||
use actix_threadpool::{run, BlockingError};
|
||||
use actix_rt::task::{spawn_blocking, JoinError};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{
|
||||
use futures_util::{
|
||||
future::{FutureExt, LocalBoxFuture},
|
||||
sink::{Sink, SinkExt},
|
||||
stream::{Stream, StreamExt},
|
||||
|
@ -20,6 +20,8 @@ use std::{
|
|||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
/// A stream of bytes from a file on the filesystem
|
||||
pub struct FileStream {
|
||||
|
@ -27,12 +29,19 @@ pub struct FileStream {
|
|||
size: u64,
|
||||
offset: u64,
|
||||
file: Option<File>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<(File, Bytes, usize), BlockingError<io::Error>>>>,
|
||||
fut:
|
||||
Option<LocalBoxFuture<'static, Result<Result<(File, Bytes, usize), io::Error>, JoinError>>>,
|
||||
}
|
||||
|
||||
/// A faster stream of bytes from a file on the filesystem
|
||||
pub struct FasterFileStream {
|
||||
size: u64,
|
||||
inner: ReceiverStream<Result<Bytes, Error>>,
|
||||
}
|
||||
|
||||
struct FileSink<E> {
|
||||
file: Option<File>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<File, BlockingError<io::Error>>>>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<Result<File, io::Error>, JoinError>>>,
|
||||
closing: bool,
|
||||
_error: PhantomData<E>,
|
||||
}
|
||||
|
@ -59,7 +68,7 @@ pub async fn create<P>(path: P) -> Result<File, Error>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let file = run(move || File::create(path)).await?;
|
||||
let file = spawn_blocking(move || File::create(path)).await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -77,12 +86,12 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn metadata(file: File) -> Result<(File, Metadata), Error> {
|
||||
let tup = run(move || {
|
||||
let tup = spawn_blocking(move || {
|
||||
let metadata = file.metadata()?;
|
||||
|
||||
Ok((file, metadata)) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(tup)
|
||||
}
|
||||
|
@ -111,7 +120,7 @@ pub async fn open<P>(path: P) -> Result<File, Error>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let file = run(move || File::open(path)).await?;
|
||||
let file = spawn_blocking(move || File::open(path)).await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -146,11 +155,11 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn set_permissions(file: File, perm: Permissions) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.set_permissions(perm)?;
|
||||
Ok(file) as io::Result<_>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -179,11 +188,11 @@ pub async fn set_permissions(file: File, perm: Permissions) -> Result<File, Erro
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn sync_all(file: File) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.sync_all()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -215,11 +224,11 @@ pub async fn sync_all(file: File) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn sync_data(file: File) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.sync_data()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -253,11 +262,11 @@ pub async fn sync_data(file: File) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn set_len(file: File, size: u64) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.set_len(size)?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -283,11 +292,11 @@ pub async fn set_len(file: File, size: u64) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> {
|
||||
let tup = run(move || {
|
||||
let tup = spawn_blocking(move || {
|
||||
let pos = file.seek(seek)?;
|
||||
Ok((file, pos)) as io::Result<_>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(tup)
|
||||
}
|
||||
|
@ -329,6 +338,43 @@ pub async fn read(file: File) -> Result<Bytes, Error> {
|
|||
Ok(bytes_mut.freeze())
|
||||
}
|
||||
|
||||
/// Pull all bytes from this file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If this function encounters any form of I/O or other error, an error
|
||||
/// variant will be returned. If an error is returned then it must be
|
||||
/// guaranteed that no bytes were read.
|
||||
///
|
||||
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the read
|
||||
/// operation should be retried if there is nothing else to do.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() -> actix_fs::Result<()> {
|
||||
/// let f = actix_fs::file::open("foo.txt").await?;
|
||||
///
|
||||
/// let bytes = actix_fs::file::read_faster(f).await?;
|
||||
///
|
||||
/// println!("The bytes: {:?}", bytes);
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn read_faster(file: File) -> Result<Bytes, Error> {
|
||||
let mut stream = FileStream::new(file).await?.faster();
|
||||
let mut bytes_mut = BytesMut::with_capacity(stream.size() as usize);
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
bytes_mut.extend(res?);
|
||||
}
|
||||
|
||||
Ok(bytes_mut.freeze())
|
||||
}
|
||||
|
||||
/// Produce a new stream of bytes from this File
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -390,6 +436,46 @@ pub async fn read_to_string(file: File) -> Result<String, Error> {
|
|||
Ok(string)
|
||||
}
|
||||
|
||||
/// Read all bytes until EOF in this source.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If the data in this stream is *not* valid UTF-8 then an error is
|
||||
/// returned and `buf` is unchanged.
|
||||
///
|
||||
/// See [`read`][read] for other error semantics.
|
||||
///
|
||||
/// [read]: fn.read.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() -> actix_fs::Result<()> {
|
||||
/// let f = actix_fs::file::open("foo.txt").await?;
|
||||
/// let s = actix_fs::file::read_to_string_faster(f).await?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// (See also the [`read_to_string_faster`] convenience function for
|
||||
/// reading from a file.)
|
||||
///
|
||||
/// [`read_to_string_faster`]: ../fn.read_to_string_faster.html
|
||||
pub async fn read_to_string_faster(file: File) -> Result<String, Error> {
|
||||
let mut stream = read_to_stream(file).await?.faster();
|
||||
|
||||
let mut string = String::with_capacity(stream.size() as usize);
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
let bytes = res?;
|
||||
let s = std::str::from_utf8(&bytes)?;
|
||||
string.extend(s.chars());
|
||||
}
|
||||
|
||||
Ok(string)
|
||||
}
|
||||
|
||||
/// Write bytes into this writer.
|
||||
///
|
||||
/// # Errors
|
||||
|
@ -464,6 +550,69 @@ where
|
|||
Ok(sink.file.take().expect("Panick in write future"))
|
||||
}
|
||||
|
||||
/// Write a stream of bytes into this writer.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Each call to `write` may generate an I/O error indicating that the
|
||||
/// operation could not be completed. If an error is returned then no bytes
|
||||
/// in the buffer were written to this writer.
|
||||
///
|
||||
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the
|
||||
/// write operation should be retried if there is nothing else to do.
|
||||
///
|
||||
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() -> actix_fs::Result<()> {
|
||||
/// let file = actix_fs::file::create("foo.txt").await?;
|
||||
/// let stream = actix_fs::read_to_stream("bar.txt").await?.faster();
|
||||
///
|
||||
/// actix_fs::file::write_stream_faster(file, stream).await?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn write_stream_faster<S, E>(mut file: File, mut stream: S) -> Result<File, E>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
||||
E: From<Error> + Unpin,
|
||||
{
|
||||
let (tx, mut rx) = channel(1);
|
||||
|
||||
let handle = spawn_blocking(move || {
|
||||
while let Some(bytes) = rx.blocking_recv() {
|
||||
let bytes: Bytes = bytes;
|
||||
file.write_all(bytes.as_ref())?;
|
||||
}
|
||||
|
||||
Ok(file)
|
||||
});
|
||||
|
||||
// Ensure `tx` dies at the end of the scope
|
||||
{
|
||||
let tx = tx;
|
||||
while let Some(res) = stream.next().await {
|
||||
if tx.send(res?).await.is_err() {
|
||||
// It should be safe to break here because any failure to send indicates early exit on
|
||||
// the receive side, which only happens in error. This will ensure we still return an
|
||||
// error from this function instead of pretending things are okay
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let res: Result<Result<File, Error>, JoinError> = handle.await;
|
||||
|
||||
match res {
|
||||
Ok(Ok(file)) => Ok(file),
|
||||
Ok(Err(e)) => Err(e.into()),
|
||||
Err(e) => Err(Error::from(e).into()),
|
||||
}
|
||||
}
|
||||
|
||||
impl FileStream {
|
||||
async fn new(file: File) -> Result<Self, Error> {
|
||||
let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?;
|
||||
|
@ -508,6 +657,80 @@ impl FileStream {
|
|||
pub fn size(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
|
||||
/// Returns a stream of bytes from the file that is sourced from a blocking loop on another
|
||||
/// thread. This has the potential to be much faster than `FileStream` type since it incurs the
|
||||
/// cost of running on another thread once at the start, rather than for every chunk.
|
||||
pub fn faster(self) -> FasterFileStream {
|
||||
let (tx, rx) = channel(1);
|
||||
|
||||
let FileStream {
|
||||
mut file,
|
||||
mut offset,
|
||||
size,
|
||||
chunk_size,
|
||||
..
|
||||
} = self;
|
||||
let mut file = file.take().expect("Tried to concurrently access file");
|
||||
|
||||
let _ = spawn_blocking(move || {
|
||||
let mut max_bytes: usize;
|
||||
|
||||
loop {
|
||||
if size == offset || tx.is_closed() {
|
||||
return;
|
||||
}
|
||||
|
||||
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
|
||||
let mut buf = Vec::with_capacity(max_bytes);
|
||||
let pos_res = file.seek(io::SeekFrom::Start(offset));
|
||||
|
||||
let pos = match pos_res {
|
||||
Ok(pos) => pos,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let nbytes_res = Read::by_ref(&mut file)
|
||||
.take(max_bytes as u64)
|
||||
.read_to_end(&mut buf);
|
||||
|
||||
let nbytes = match nbytes_res {
|
||||
Ok(0) => {
|
||||
let _ = tx
|
||||
.blocking_send(Err(
|
||||
io::Error::from(io::ErrorKind::UnexpectedEof).into()
|
||||
));
|
||||
return;
|
||||
}
|
||||
Ok(nbytes) => nbytes,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(e.into()));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if tx.blocking_send(Ok(Bytes::from(buf))).is_err() {
|
||||
return;
|
||||
}
|
||||
offset = pos + nbytes as u64;
|
||||
}
|
||||
});
|
||||
|
||||
FasterFileStream {
|
||||
size,
|
||||
inner: ReceiverStream::new(rx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FasterFileStream {
|
||||
/// Get the size of the file being streamed
|
||||
pub fn size(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> FileSink<E> {
|
||||
|
@ -527,12 +750,13 @@ impl Stream for FileStream {
|
|||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
return match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok((file, bytes, offset))) => {
|
||||
Poll::Ready(Ok(Ok((file, bytes, offset)))) => {
|
||||
self.fut.take();
|
||||
self.file = Some(file);
|
||||
self.offset = offset as u64;
|
||||
Poll::Ready(Some(Ok(bytes)))
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
};
|
||||
|
@ -548,7 +772,7 @@ impl Stream for FileStream {
|
|||
|
||||
let mut file = self.file.take().expect("Use after completion");
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
let max_bytes: usize;
|
||||
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
|
||||
let mut buf = Vec::with_capacity(max_bytes);
|
||||
|
@ -567,6 +791,14 @@ impl Stream for FileStream {
|
|||
}
|
||||
}
|
||||
|
||||
impl Stream for FasterFileStream {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.inner).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> Sink<Bytes> for FileSink<E>
|
||||
where
|
||||
E: From<Error> + Unpin,
|
||||
|
@ -576,7 +808,7 @@ where
|
|||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
return match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok(file)) => {
|
||||
Poll::Ready(Ok(Ok(file))) => {
|
||||
self.fut.take();
|
||||
self.file = Some(file);
|
||||
|
||||
|
@ -584,6 +816,7 @@ where
|
|||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
};
|
||||
|
@ -599,7 +832,7 @@ where
|
|||
log::debug!("Writing {} bytes", item.len());
|
||||
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
file.write_all(item.as_ref())?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
|
@ -617,9 +850,10 @@ where
|
|||
if !self.closing {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok(file)) => {
|
||||
Poll::Ready(Ok(Ok(file))) => {
|
||||
self.file = Some(file);
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
};
|
||||
|
@ -629,7 +863,7 @@ where
|
|||
self.closing = true;
|
||||
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
file.flush()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
|
@ -641,12 +875,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl From<BlockingError<io::Error>> for Error {
|
||||
fn from(e: BlockingError<io::Error>) -> Self {
|
||||
match e {
|
||||
BlockingError::Error(e) => e.into(),
|
||||
_ => Error::Canceled,
|
||||
}
|
||||
impl From<JoinError> for Error {
|
||||
fn from(_: JoinError) -> Self {
|
||||
Error::Canceled
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -657,6 +888,7 @@ mod tests {
|
|||
|
||||
const READ_FILE: &str = "tests/read.txt";
|
||||
const WRITE_FILE: &str = "tests/write.txt";
|
||||
const WRITE_FILE_FASTER: &str = "tests/write-faster.txt";
|
||||
const TEST_FILE: &str = "tests/test.txt";
|
||||
|
||||
#[test]
|
||||
|
@ -682,6 +914,29 @@ mod tests {
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_file_faster() {
|
||||
run(async move {
|
||||
let read_file = open(READ_FILE).await?;
|
||||
let stream = read_to_stream(read_file).await?.faster();
|
||||
let write_file = create(WRITE_FILE_FASTER).await?;
|
||||
|
||||
write_stream_faster(write_file, stream).await?;
|
||||
|
||||
let read_file = open(READ_FILE).await?;
|
||||
let write_file = open(WRITE_FILE_FASTER).await?;
|
||||
let read_bytes = read_faster(read_file).await?;
|
||||
let written_bytes = read_faster(write_file).await?;
|
||||
|
||||
assert!(written_bytes.as_ref() == read_bytes.as_ref());
|
||||
|
||||
remove_file(WRITE_FILE_FASTER).await?;
|
||||
|
||||
Ok(()) as Result<_, Error>
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_write_file() {
|
||||
let bytes_to_be_written = b"abcdefg";
|
||||
|
@ -713,6 +968,18 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_file_faster() {
|
||||
run(async move {
|
||||
let file = open(READ_FILE).await?;
|
||||
let bytes = read_faster(file).await?;
|
||||
|
||||
assert!(bytes.as_ref() == b"Hello, World!\n");
|
||||
Ok(()) as Result<_, Error>
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn seek_file() {
|
||||
run(async move {
|
||||
|
@ -728,6 +995,21 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn seek_file_faster() {
|
||||
run(async move {
|
||||
let file = open(READ_FILE).await?;
|
||||
let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?;
|
||||
assert!(pos == 7);
|
||||
|
||||
let bytes = read_faster(file).await?;
|
||||
|
||||
assert!(bytes.as_ref() == b"World!\n");
|
||||
Ok(()) as Result<_, Error>
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn small_chunks() {
|
||||
run(async move {
|
||||
|
@ -748,7 +1030,27 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn small_chunks_faster() {
|
||||
run(async move {
|
||||
let file = open(READ_FILE).await?;
|
||||
|
||||
let mut bytes_mut = BytesMut::new();
|
||||
let (file, _) = seek(file, io::SeekFrom::Start(7)).await?;
|
||||
let mut stream = read_to_stream(file).await?.chunk_size(2).faster();
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
bytes_mut.extend(res?);
|
||||
}
|
||||
let bytes = bytes_mut.freeze();
|
||||
|
||||
assert!(bytes.as_ref() == b"World!\n");
|
||||
Ok(()) as Result<_, Error>
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn run<F: Future + 'static>(f: F) -> F::Output {
|
||||
actix_rt::System::new("test-system").block_on(f)
|
||||
actix_rt::System::new().block_on(f)
|
||||
}
|
||||
}
|
||||
|
|
75
src/lib.rs
75
src/lib.rs
|
@ -25,7 +25,7 @@
|
|||
//!
|
||||
//! ### License
|
||||
//!
|
||||
//! Copyright © 2020 Riley Trautman
|
||||
//! Copyright © 2021 Riley Trautman
|
||||
//!
|
||||
//! pict-rs is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
//!
|
||||
|
@ -35,9 +35,9 @@
|
|||
|
||||
#![doc(html_root_url = "https://actix-fs.asonix.dog")]
|
||||
|
||||
use actix_threadpool::run;
|
||||
use actix_rt::task::spawn_blocking;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use futures_util::stream::Stream;
|
||||
use std::{
|
||||
fs, io,
|
||||
path::{Path, PathBuf},
|
||||
|
@ -117,7 +117,7 @@ pub async fn canonicalize<P>(path: P) -> Result<PathBuf>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let path = run(move || fs::canonicalize(path)).await?;
|
||||
let path = spawn_blocking(move || fs::canonicalize(path)).await??;
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let bytes = run(move || fs::copy(from, to)).await?;
|
||||
let bytes = spawn_blocking(move || fs::copy(from, to)).await??;
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ pub async fn create_dir<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::create_dir(path)).await?;
|
||||
spawn_blocking(move || fs::create_dir(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ pub async fn create_dir_all<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::create_dir_all(path)).await?;
|
||||
spawn_blocking(move || fs::create_dir_all(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::hard_link(src, dst)).await?;
|
||||
spawn_blocking(move || fs::hard_link(src, dst)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -348,7 +348,7 @@ pub async fn metadata<P>(path: P) -> Result<fs::Metadata>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let metadata = run(move || fs::metadata(path)).await?;
|
||||
let metadata = spawn_blocking(move || fs::metadata(path)).await??;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
@ -427,7 +427,7 @@ pub async fn read_link<P>(path: P) -> Result<PathBuf>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let path = run(move || fs::read_link(path)).await?;
|
||||
let path = spawn_blocking(move || fs::read_link(path)).await??;
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
@ -515,6 +515,49 @@ where
|
|||
file::read_to_string(f).await
|
||||
}
|
||||
|
||||
/// Read the entire contents of a file into a string.
|
||||
///
|
||||
/// This is a convenience function for using [`file::open`] and [`read_to_string`]
|
||||
/// with fewer imports and without an intermediate variable. It pre-allocates a
|
||||
/// buffer based on the file size when available, so it is generally faster than
|
||||
/// reading into a string created with `String::new()`.
|
||||
///
|
||||
/// [`file::open`]: ./file/fn.open.html
|
||||
/// [`read_to_string_faster`]: ./file/fn.read_to_string_faster.html
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function will return an error if `path` does not already exist.
|
||||
/// Other errors may also be returned according to [`OpenOptions::open`].
|
||||
///
|
||||
/// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
|
||||
///
|
||||
/// It will also return an error if it encounters while reading an error
|
||||
/// of a kind other than [`ErrorKind::Interrupted`],
|
||||
/// or if the contents of the file are not valid UTF-8.
|
||||
///
|
||||
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use std::net::SocketAddr;
|
||||
///
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
|
||||
/// let foo: SocketAddr = actix_fs::read_to_string_faster("address.txt").await?.parse()?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn read_to_string_faster<P>(path: P) -> Result<String>
|
||||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let f = file::open(path).await?;
|
||||
|
||||
file::read_to_string_faster(f).await
|
||||
}
|
||||
|
||||
/// Removes an existing, empty directory.
|
||||
///
|
||||
/// # Platform-specific behavior
|
||||
|
@ -546,7 +589,7 @@ pub async fn remove_dir<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_dir(path)).await?;
|
||||
spawn_blocking(move || fs::remove_dir(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -586,7 +629,7 @@ pub async fn remove_dir_all<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_dir_all(path)).await?;
|
||||
spawn_blocking(move || fs::remove_dir_all(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -626,7 +669,7 @@ pub async fn remove_file<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_file(path)).await?;
|
||||
spawn_blocking(move || fs::remove_file(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -673,7 +716,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::rename(from, to)).await?;
|
||||
spawn_blocking(move || fs::rename(from, to)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -711,7 +754,7 @@ pub async fn set_permissions<P>(path: P, permissions: fs::Permissions) -> Result
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::set_permissions(path, permissions)).await?;
|
||||
spawn_blocking(move || fs::set_permissions(path, permissions)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -748,7 +791,7 @@ pub async fn symlink_metadata<P>(path: P) -> Result<fs::Metadata>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let metadata = run(move || fs::symlink_metadata(path)).await?;
|
||||
let metadata = spawn_blocking(move || fs::symlink_metadata(path)).await??;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue