actix-fs/src/file.rs
2020-09-13 20:48:40 -05:00

755 lines
21 KiB
Rust

//! File-specific operations
//!
//! This module provides methods analogous to the std::fs::File struct, but as free functions due
//! to the ownership requirements of threaded interaction.
use crate::Error;
use actix_threadpool::{run, BlockingError};
use bytes::{Bytes, BytesMut};
use futures::{
future::{FutureExt, LocalBoxFuture},
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use std::{
fs::{File, Metadata, Permissions},
future::Future,
io::{self, prelude::*},
marker::PhantomData,
path::Path,
pin::Pin,
task::{Context, Poll},
};
/// A stream of bytes from a file on the filesystem
pub struct FileStream {
chunk_size: u64,
size: u64,
offset: u64,
file: Option<File>,
fut: Option<LocalBoxFuture<'static, Result<(File, Bytes, usize), BlockingError<io::Error>>>>,
}
struct FileSink<E> {
file: Option<File>,
fut: Option<LocalBoxFuture<'static, Result<File, BlockingError<io::Error>>>>,
closing: bool,
_error: PhantomData<E>,
}
/// Opens a file in write-only mode.
///
/// This function will create a file if it does not exist,
/// and will truncate it if it does.
///
/// See the [`OpenOptions::open`] function for more details.
///
/// [`OpenOptions::open`]: https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html#method.open
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::create("foo.txt").await?;
/// Ok(())
/// }
/// ```
pub async fn create<P>(path: P) -> Result<File, Error>
where
P: AsRef<Path> + Send + 'static,
{
let file = run(move || File::create(path)).await?;
Ok(file)
}
/// Queries metadata about the underlying file.
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let (f, metadata) = actix_fs::file::metadata(f).await?;
/// Ok(())
/// }
/// ```
pub async fn metadata(file: File) -> Result<(File, Metadata), Error> {
let tup = run(move || {
let metadata = file.metadata()?;
Ok((file, metadata)) as Result<_, io::Error>
})
.await?;
Ok(tup)
}
/// Attempts to open a file in read-only mode.
///
/// See the [`OpenOptions::open`] method for more details.
///
/// # Errors
///
/// This function will return an error if `path` does not already exist.
/// Other errors may also be returned according to [`OpenOptions::open`].
///
/// [`OpenOptions::open`]: https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html#method.open
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// Ok(())
/// }
/// ```
pub async fn open<P>(path: P) -> Result<File, Error>
where
P: AsRef<Path> + Send + 'static,
{
let file = run(move || File::open(path)).await?;
Ok(file)
}
/// Changes the permissions on the underlying file.
///
/// # Platform-specific behavior
///
/// This function currently corresponds to the `fchmod` function on Unix and
/// the `SetFileInformationByHandle` function on Windows. Note that, this
/// [may change in the future][changes].
///
/// [changes]: https://doc.rust-lang.org/stable/std/io/index.html#platform-specific-behavior
///
/// # Errors
///
/// This function will return an error if the user lacks permission change
/// attributes on the underlying file. It may also return an error in other
/// os-specific unspecified cases.
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let file = actix_fs::file::open("foo.txt").await?;
/// let (file, meta) = actix_fs::file::metadata(file).await?;
/// let mut perms = meta.permissions();
/// perms.set_readonly(true);
/// actix_fs::file::set_permissions(file, perms).await?;
/// Ok(())
/// }
/// ```
pub async fn set_permissions(file: File, perm: Permissions) -> Result<File, Error> {
let file = run(move || {
file.set_permissions(perm)?;
Ok(file) as io::Result<_>
})
.await?;
Ok(file)
}
/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-memory data reaches the
/// filesystem before returning.
///
/// This can be used to handle errors that would otherwise only be caught
/// when the `File` is closed. Dropping a file will ignore errors in
/// synchronizing this in-memory data.
///
/// # Examples
///
/// ```no_run
/// use bytes::Bytes;
///
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::create("foo.txt").await?;
/// let f = actix_fs::file::write(f, Bytes::from_static(&*b"Hello, world!")).await?;
///
/// actix_fs::file::sync_all(f).await?;
/// Ok(())
/// }
/// ```
pub async fn sync_all(file: File) -> Result<File, Error> {
let file = run(move || {
file.sync_all()?;
Ok(file) as Result<_, io::Error>
})
.await?;
Ok(file)
}
/// This function is similar to [`sync_all`], except that it may not
/// synchronize file metadata to the filesystem.
///
/// This is intended for use cases that must synchronize content, but don't
/// need the metadata on disk. The goal of this method is to reduce disk
/// operations.
///
/// Note that some platforms may simply implement this in terms of
/// [`sync_all`].
///
/// [`sync_all`]: fn.sync_all.html
///
/// # Examples
///
/// ```no_run
/// use bytes::Bytes;
///
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::create("foo.txt").await?;
/// let f = actix_fs::file::write(f, Bytes::from_static(&*b"Hello, world!")).await?;
///
/// actix_fs::file::sync_data(f).await?;
/// Ok(())
/// }
/// ```
pub async fn sync_data(file: File) -> Result<File, Error> {
let file = run(move || {
file.sync_data()?;
Ok(file) as Result<_, io::Error>
})
.await?;
Ok(file)
}
/// Truncates or extends the underlying file, updating the size of
/// this file to become `size`.
///
/// If the `size` is less than the current file's size, then the file will
/// be shrunk. If it is greater than the current file's size, then the file
/// will be extended to `size` and have all of the intermediate data filled
/// in with 0s.
///
/// The file's cursor isn't changed. In particular, if the cursor was at the
/// end and the file is shrunk using this operation, the cursor will now be
/// past the end.
///
/// # Errors
///
/// This function will return an error if the file is not opened for writing.
/// Also, std::io::ErrorKind::InvalidInput will be returned if the desired
/// length would cause an overflow due to the implementation specifics.
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::create("foo.txt").await?;
/// actix_fs::file::set_len(f, 10).await?;
/// Ok(())
/// }
/// ```
pub async fn set_len(file: File, size: u64) -> Result<File, Error> {
let file = run(move || {
file.set_len(size)?;
Ok(file) as Result<_, io::Error>
})
.await?;
Ok(file)
}
/// Seek to an offset, in bytes, in a file.
///
/// If the seek operation completed successfully,
/// this method returns the new position from the start of the stream.
/// That position can be used later with [`SeekFrom::Start`].
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
///
/// [`SeekFrom::Start`]: https://doc.rust-lang.org/stable/std/fs/struct.OpenOptions.html#method.open
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let (f, pos) = actix_fs::file::seek(f, std::io::SeekFrom::Start(8)).await?;
/// Ok(())
/// }
/// ```
pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> {
let tup = run(move || {
let pos = file.seek(seek)?;
Ok((file, pos)) as io::Result<_>
})
.await?;
Ok(tup)
}
/// Pull all bytes from this file.
///
/// # Errors
///
/// If this function encounters any form of I/O or other error, an error
/// variant will be returned. If an error is returned then it must be
/// guaranteed that no bytes were read.
///
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the read
/// operation should be retried if there is nothing else to do.
///
/// # Examples
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
///
/// let bytes = actix_fs::file::read(f).await?;
///
/// println!("The bytes: {:?}", bytes);
/// Ok(())
/// }
/// ```
pub async fn read(file: File) -> Result<Bytes, Error> {
let mut stream = FileStream::new(file).await?;
let mut bytes_mut = BytesMut::with_capacity(stream.size() as usize);
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
Ok(bytes_mut.freeze())
}
/// Produce a new stream of bytes from this File
///
/// ```no_run
/// use futures::stream::StreamExt;
///
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let mut stream = actix_fs::file::read_to_stream(f).await?;
///
/// while let Some(res) = stream.next().await {
/// println!("bytes: {:?}", res?);
/// }
///
/// Ok(())
/// }
/// ```
pub async fn read_to_stream(file: File) -> Result<FileStream, Error> {
FileStream::new(file).await
}
/// Read all bytes until EOF in this source.
///
/// # Errors
///
/// If the data in this stream is *not* valid UTF-8 then an error is
/// returned and `buf` is unchanged.
///
/// See [`read`][read] for other error semantics.
///
/// [read]: fn.read.html
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let s = actix_fs::file::read_to_string(f).await?;
/// Ok(())
/// }
/// ```
///
/// (See also the [`read_to_string`] convenience function for
/// reading from a file.)
///
/// [`read_to_string`]: ../fn.read_to_string.html
pub async fn read_to_string(file: File) -> Result<String, Error> {
let mut stream = read_to_stream(file).await?;
let mut string = String::with_capacity(stream.size() as usize);
while let Some(res) = stream.next().await {
let bytes = res?;
let s = std::str::from_utf8(&bytes)?;
string.extend(s.chars());
}
Ok(string)
}
/// Write bytes into this writer.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the
/// write operation should be retried if there is nothing else to do.
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// # Examples
///
/// ```no_run
/// use bytes::Bytes;
///
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let file = actix_fs::file::create("foo.txt").await?;
///
/// actix_fs::file::write(file, Bytes::from_static(&*b"some bytes")).await?;
/// Ok(())
/// }
/// ```
pub async fn write<B>(file: File, bytes: B) -> Result<File, Error>
where
B: Into<Bytes>,
{
let mut sink = FileSink::<Error>::new(file);
sink.send(bytes.into()).await?;
sink.close().await?;
Ok(sink.file.take().expect("Panick in write future"))
}
/// Write a stream of bytes into this writer.
///
/// # Errors
///
/// Each call to `write` may generate an I/O error indicating that the
/// operation could not be completed. If an error is returned then no bytes
/// in the buffer were written to this writer.
///
/// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the
/// write operation should be retried if there is nothing else to do.
///
/// [`ErrorKind::Interrupted`]: https://doc.rust-lang.org/stable/std/io/enum.ErrorKind.html#variant.Interrupted
///
/// # Examples
///
/// ```no_run
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let file = actix_fs::file::create("foo.txt").await?;
/// let stream = actix_fs::read_to_stream("bar.txt").await?;
///
/// actix_fs::file::write_stream(file, stream).await?;
/// Ok(())
/// }
/// ```
pub async fn write_stream<S, E>(file: File, mut stream: S) -> Result<File, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: From<Error> + Unpin,
{
let mut sink = FileSink::<E>::new(file);
sink.send_all(&mut stream).await?;
sink.close().await?;
Ok(sink.file.take().expect("Panick in write future"))
}
impl FileStream {
async fn new(file: File) -> Result<Self, Error> {
let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?;
let (file, metadata) = metadata(file).await?;
Ok(FileStream {
chunk_size: 65_356,
size: metadata.len(),
offset,
file: Some(file),
fut: None,
})
}
/// Change the size of chunks read by the stream
///
/// This can be used to only read certain bytes from a given file
///
/// ```no_run
/// use futures::stream::StreamExt;
/// use std::io::SeekFrom;
///
/// #[actix_rt::main]
/// async fn main() -> actix_fs::Result<()> {
/// let f = actix_fs::file::open("foo.txt").await?;
/// let (f, _) = actix_fs::file::seek(f, SeekFrom::Start(16)).await?;
/// let mut stream = actix_fs::file::read_to_stream(f).await?.chunk_size(16);
///
/// if let Some(res) = stream.next().await {
/// let sixteen_bytes = res?;
/// // ...
/// }
/// Ok(())
/// }
/// ```
pub fn chunk_size(mut self, chunk_size: u64) -> Self {
self.chunk_size = chunk_size;
self
}
/// Get the size of the file being streamed
pub fn size(&self) -> u64 {
self.size
}
}
impl<E> FileSink<E> {
fn new(file: File) -> Self {
FileSink {
file: Some(file),
fut: None,
closing: false,
_error: PhantomData,
}
}
}
impl Stream for FileStream {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok((file, bytes, offset))) => {
self.fut.take();
self.file = Some(file);
self.offset = offset as u64;
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))),
Poll::Pending => Poll::Pending,
};
}
let size = self.size;
let offset = self.offset;
let chunk_size = self.chunk_size;
if size == offset {
return Poll::Ready(None);
}
let mut file = self.file.take().expect("Use after completion");
self.fut = Some(
run(move || {
let max_bytes: usize;
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
let mut buf = Vec::with_capacity(max_bytes);
let pos = file.seek(io::SeekFrom::Start(offset))?;
let nbytes = Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
if nbytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf), pos as usize + nbytes))
})
.boxed_local(),
);
self.poll_next(cx)
}
}
impl<E> Sink<Bytes> for FileSink<E>
where
E: From<Error> + Unpin,
{
type Error = E;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(file)) => {
self.fut.take();
self.file = Some(file);
log::debug!("Written");
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())),
Poll::Pending => Poll::Pending,
};
}
log::debug!("Ready");
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let mut file = self.file.take().expect("Use after completion");
log::debug!("Writing {} bytes", item.len());
self.fut = Some(
run(move || {
file.write_all(item.as_ref())?;
Ok(file) as Result<_, io::Error>
})
.boxed_local(),
);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
if !self.closing {
if let Some(ref mut fut) = self.fut {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(file)) => {
self.file = Some(file);
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())),
Poll::Pending => return Poll::Pending,
};
}
let mut file = self.file.take().expect("Use after completion");
self.closing = true;
self.fut = Some(
run(move || {
file.flush()?;
Ok(file) as Result<_, io::Error>
})
.boxed_local(),
);
}
self.poll_ready(cx)
}
}
impl From<BlockingError<io::Error>> for Error {
fn from(e: BlockingError<io::Error>) -> Self {
match e {
BlockingError::Error(e) => e.into(),
_ => Error::Canceled,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::remove_file;
const READ_FILE: &str = "tests/read.txt";
const WRITE_FILE: &str = "tests/write.txt";
const TEST_FILE: &str = "tests/test.txt";
#[test]
fn stream_file() {
run(async move {
let read_file = open(READ_FILE).await?;
let stream = read_to_stream(read_file).await?;
let write_file = create(WRITE_FILE).await?;
write_stream(write_file, stream).await?;
let read_file = open(READ_FILE).await?;
let write_file = open(WRITE_FILE).await?;
let read_bytes = read(read_file).await?;
let written_bytes = read(write_file).await?;
assert!(written_bytes.as_ref() == read_bytes.as_ref());
remove_file(WRITE_FILE).await?;
Ok(()) as Result<_, Error>
})
.unwrap()
}
#[test]
fn read_write_file() {
let bytes_to_be_written = b"abcdefg";
run(async move {
let file = create(TEST_FILE).await?;
write(file, bytes_to_be_written.to_vec()).await?;
let file = open(TEST_FILE).await?;
let bytes = read(file).await?;
assert!(bytes.as_ref() == bytes_to_be_written);
remove_file(TEST_FILE).await?;
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn read_file() {
run(async move {
let file = open(READ_FILE).await?;
let bytes = read(file).await?;
assert!(bytes.as_ref() == b"Hello, World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn seek_file() {
run(async move {
let file = open(READ_FILE).await?;
let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?;
assert!(pos == 7);
let bytes = read(file).await?;
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn small_chunks() {
run(async move {
let file = open(READ_FILE).await?;
let mut bytes_mut = BytesMut::new();
let (file, _) = seek(file, io::SeekFrom::Start(7)).await?;
let mut stream = read_to_stream(file).await?.chunk_size(2);
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
let bytes = bytes_mut.freeze();
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
fn run<F: Future + 'static>(f: F) -> F::Output {
actix_rt::System::new("test-system").block_on(f)
}
}