//! File-specific operations //! //! This module provides methods analogous to the std::fs::File struct, but as free functions due //! to the ownership requirements of threaded interaction. use crate::Error; use actix_threadpool::{run, BlockingError}; use bytes::{Bytes, BytesMut}; use futures::{ future::{FutureExt, LocalBoxFuture}, sink::{Sink, SinkExt}, stream::{Stream, StreamExt}, }; use std::{ fs::{File, Metadata, Permissions}, future::Future, io::{self, prelude::*}, marker::PhantomData, path::Path, pin::Pin, task::{Context, Poll}, }; /// A stream of bytes from a file on the filesystem pub struct FileStream { chunk_size: u64, size: u64, offset: u64, file: Option, fut: Option>>>, } struct FileSink { file: Option, fut: Option>>>, closing: bool, _error: PhantomData, } /// Opens a file in write-only mode. /// /// This function will create a file if it does not exist, /// and will truncate it if it does. /// /// See the [`OpenOptions::open`] function for more details. /// /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::create("foo.txt").await?; /// Ok(()) /// } /// ``` pub async fn create

(path: P) -> Result where P: AsRef + Send + 'static, { let file = run(move || File::create(path)).await?; Ok(file) } /// Queries metadata about the underlying file. /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// let (f, metadata) = actix_fs::file::metadata(f).await?; /// Ok(()) /// } /// ``` pub async fn metadata(file: File) -> Result<(File, Metadata), Error> { let tup = run(move || { let metadata = file.metadata()?; Ok((file, metadata)) as Result<_, io::Error> }) .await?; Ok(tup) } /// Attempts to open a file in read-only mode. /// /// See the [`OpenOptions::open`] method for more details. /// /// # Errors /// /// This function will return an error if `path` does not already exist. /// Other errors may also be returned according to [`OpenOptions::open`]. /// /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// Ok(()) /// } /// ``` pub async fn open

(path: P) -> Result where P: AsRef + Send + 'static, { let file = run(move || File::open(path)).await?; Ok(file) } /// Changes the permissions on the underlying file. /// /// # Platform-specific behavior /// /// This function currently corresponds to the `fchmod` function on Unix and /// the `SetFileInformationByHandle` function on Windows. Note that, this /// [may change in the future][changes]. /// /// [changes]: ../io/index.html#platform-specific-behavior /// /// # Errors /// /// This function will return an error if the user lacks permission change /// attributes on the underlying file. It may also return an error in other /// os-specific unspecified cases. /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let file = actix_fs::file::open("foo.txt").await?; /// let (file, meta) = actix_fs::file::metadata(file).await?; /// let perms = meta.permissions(); /// perms.set_readonly(true); /// actix_fs::file::set_permissions(file, perms).await?; /// Ok(()) /// } /// ``` pub async fn set_permissions(file: File, perm: Permissions) -> Result { let file = run(move || { file.set_permissions(perm)?; Ok(file) as io::Result<_> }) .await?; Ok(file) } /// Attempts to sync all OS-internal metadata to disk. /// /// This function will attempt to ensure that all in-memory data reaches the /// filesystem before returning. /// /// This can be used to handle errors that would otherwise only be caught /// when the `File` is closed. Dropping a file will ignore errors in /// synchronizing this in-memory data. /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::create("foo.txt").await?; /// let f = actix_fs::file::write(f, b"Hello, world!").await?; /// /// actix_fs::file::sync_all(f).await?; /// Ok(()) /// } /// ``` pub async fn sync_all(file: File) -> Result { let file = run(move || { file.sync_all()?; Ok(file) as Result<_, io::Error> }) .await?; Ok(file) } /// This function is similar to [`sync_all`], except that it may not /// synchronize file metadata to the filesystem. /// /// This is intended for use cases that must synchronize content, but don't /// need the metadata on disk. The goal of this method is to reduce disk /// operations. /// /// Note that some platforms may simply implement this in terms of /// [`sync_all`]. /// /// [`sync_all`]: struct.File.html#method.sync_all /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::create("foo.txt").await?; /// let f = actix_fs::file::write_all(f, b"Hello, world!").await?; /// /// actix_fs::file::sync_data(f).await?; /// Ok(()) /// } /// ``` pub async fn sync_data(file: File) -> Result { let file = run(move || { file.sync_data()?; Ok(file) as Result<_, io::Error> }) .await?; Ok(file) } /// Truncates or extends the underlying file, updating the size of /// this file to become `size`. /// /// If the `size` is less than the current file's size, then the file will /// be shrunk. If it is greater than the current file's size, then the file /// will be extended to `size` and have all of the intermediate data filled /// in with 0s. /// /// The file's cursor isn't changed. In particular, if the cursor was at the /// end and the file is shrunk using this operation, the cursor will now be /// past the end. /// /// # Errors /// /// This function will return an error if the file is not opened for writing. /// Also, std::io::ErrorKind::InvalidInput will be returned if the desired /// length would cause an overflow due to the implementation specifics. /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::create("foo.txt").await?; /// actix_fs::file::set_len(f, 10).await?; /// Ok(()) /// } /// ``` pub async fn set_len(file: File, size: u64) -> Result { let file = run(move || { file.set_len(size)?; Ok(file) as Result<_, io::Error> }) .await?; Ok(file) } /// Seek to an offset, in bytes, in a file. /// /// If the seek operation completed successfully, /// this method returns the new position from the start of the stream. /// That position can be used later with [`SeekFrom::Start`]. /// /// # Errors /// /// Seeking to a negative offset is considered an error. /// /// [`SeekFrom::Start`]: enum.SeekFrom.html#variant.Start /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// let (f, pos) = actix_fs::file::seek(f, std::io::SeekFrom::Start(8)).await?; /// } /// ``` pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> { let tup = run(move || { let pos = file.seek(seek)?; Ok((file, pos)) as io::Result<_> }) .await?; Ok(tup) } /// Pull all bytes from this file. /// /// # Errors /// /// If this function encounters any form of I/O or other error, an error /// variant will be returned. If an error is returned then it must be /// guaranteed that no bytes were read. /// /// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the read /// operation should be retried if there is nothing else to do. /// /// # Examples /// /// [`File`]s implement `Read`: /// /// [`Err`]: ../../std/result/enum.Result.html#variant.Err /// [`Ok(n)`]: ../../std/result/enum.Result.html#variant.Ok /// [`ErrorKind::Interrupted`]: ../../std/io/enum.ErrorKind.html#variant.Interrupted /// [`File`]: ../fs/struct.File.html /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// /// let bytes = actix_fs::file::read(f).await?; /// /// println!("The bytes: {:?}", bytes; /// Ok(()) /// } /// ``` pub async fn read(file: File) -> Result { let mut stream = FileStream::new(file).await?; let mut bytes_mut = BytesMut::with_capacity(stream.size() as usize); while let Some(res) = stream.next().await { bytes_mut.extend(res?); } Ok(bytes_mut.freeze()) } /// Produce a new stream of bytes from this File /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// let mut stream = actix_fs::file::read_to_stream(f).await?; /// /// while let Some(res) = stream.next().await { /// println!("bytes: {:?}", res?); /// } /// /// Ok(()) /// } /// ``` pub async fn read_to_stream(file: File) -> Result { FileStream::new(file).await } /// Read all bytes until EOF in this source. /// /// # Errors /// /// If the data in this stream is *not* valid UTF-8 then an error is /// returned and `buf` is unchanged. /// /// See [`read_to_end`][readtoend] for other error semantics. /// /// [readtoend]: #method.read_to_end /// /// # Examples /// /// [`File`][file]s implement `Read`: /// /// [file]: ../fs/struct.File.html /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// let s = actix_fs::file::read_to_string(f).await?; /// Ok(()) /// } /// ``` /// /// (See also the [`std::fs::read_to_string`] convenience function for /// reading from a file.) /// /// [`std::fs::read_to_string`]: ../fs/fn.read_to_string.html pub async fn read_to_string(file: File) -> Result { let mut stream = read_to_stream(file).await?; let mut string = String::with_capacity(stream.size() as usize); while let Some(res) = stream.next().await { let bytes = res?; let s = std::str::from_utf8(&bytes)?; string.extend(s.chars()); } Ok(string) } /// Write bytes into this writer. /// /// # Errors /// /// Each call to `write` may generate an I/O error indicating that the /// operation could not be completed. If an error is returned then no bytes /// in the buffer were written to this writer. /// /// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the /// write operation should be retried if there is nothing else to do. /// /// [`Err`]: ../../std/result/enum.Result.html#variant.Err /// [`ErrorKind::Interrupted`]: ../../std/io/enum.ErrorKind.html#variant.Interrupted /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let file = actix_fs::file::create("foo.txt").await?; /// /// actix_fs::file::write(file, b"some bytes").await?; /// Ok(()) /// } /// ``` pub async fn write(file: File, bytes: B) -> Result<(), Error> where B: Into, { let mut sink = FileSink::::new(file); sink.send(bytes.into()).await?; sink.close().await?; Ok(()) } /// Write a stream of bytes into this writer. /// /// # Errors /// /// Each call to `write` may generate an I/O error indicating that the /// operation could not be completed. If an error is returned then no bytes /// in the buffer were written to this writer. /// /// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the /// write operation should be retried if there is nothing else to do. /// /// [`Err`]: ../../std/result/enum.Result.html#variant.Err /// [`ErrorKind::Interrupted`]: ../../std/io/enum.ErrorKind.html#variant.Interrupted /// /// # Examples /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let file = actix_fs::file::create("foo.txt").await?; /// let stream = actix_fs::read_to_stream("bar.txt").await?; /// /// actix_fs::file::write(file, stream).await?; /// Ok(()) /// } /// ``` pub async fn write_stream(file: File, mut stream: S) -> Result<(), E> where S: Stream> + Unpin, E: From + Unpin, { let mut sink = FileSink::::new(file); sink.send_all(&mut stream).await?; sink.close().await?; Ok(()) } impl FileStream { async fn new(file: File) -> Result { let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?; let (file, metadata) = metadata(file).await?; Ok(FileStream { chunk_size: 65_356, size: metadata.len(), offset, file: Some(file), fut: None, }) } /// Change the size of chunks read by the stream /// /// This can be used to only read certain bytes from a given file /// /// ```no_run /// #[actix_rt::main] /// async fn main() -> actix_fs::Result<()> { /// let f = actix_fs::file::open("foo.txt").await?; /// let (f, _) = actix_fs::file::seek(std::io::SeekFrom::Start(16)).await?; /// let mut stream = actix_fs::file::read_to_stream(f).await?.chunk_size(16); /// /// if let Some(res) = stream.next().await { /// let sixteen_bytes = res?; /// // ... /// } /// Ok(()) /// } /// ``` pub fn chunk_size(mut self, chunk_size: u64) -> Self { self.chunk_size = chunk_size; self } /// Get the size of the file being streamed pub fn size(&self) -> u64 { self.size } } impl FileSink { fn new(file: File) -> Self { FileSink { file: Some(file), fut: None, closing: false, _error: PhantomData, } } } impl Stream for FileStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ref mut fut) = self.fut { return match Pin::new(fut).poll(cx) { Poll::Ready(Ok((file, bytes, offset))) => { self.fut.take(); self.file = Some(file); self.offset = offset as u64; Poll::Ready(Some(Ok(bytes))) } Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))), Poll::Pending => Poll::Pending, }; } let size = self.size; let offset = self.offset; let chunk_size = self.chunk_size; if size == offset { return Poll::Ready(None); } let mut file = self.file.take().expect("Use after completion"); self.fut = Some( run(move || { let max_bytes: usize; max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize; let mut buf = Vec::with_capacity(max_bytes); let pos = file.seek(io::SeekFrom::Start(offset))?; let nbytes = Read::by_ref(&mut file) .take(max_bytes as u64) .read_to_end(&mut buf)?; if nbytes == 0 { return Err(io::ErrorKind::UnexpectedEof.into()); } Ok((file, Bytes::from(buf), pos as usize + nbytes)) }) .boxed_local(), ); self.poll_next(cx) } } impl Sink for FileSink where E: From + Unpin, { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ref mut fut) = self.fut { return match Pin::new(fut).poll(cx) { Poll::Ready(Ok(file)) => { self.fut.take(); self.file = Some(file); Poll::Ready(Ok(())) } Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())), Poll::Pending => Poll::Pending, }; } Poll::Ready(Ok(())) } fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { let mut file = self.file.take().expect("Use after completion"); self.fut = Some( run(move || { file.write_all(item.as_ref())?; Ok(file) as Result<_, io::Error> }) .boxed_local(), ); Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_ready(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if !self.closing { if let Some(ref mut fut) = self.fut { match Pin::new(fut).poll(cx) { Poll::Ready(Ok(file)) => { self.file = Some(file); } Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())), Poll::Pending => return Poll::Pending, }; } let mut file = self.file.take().expect("Use after completion"); self.closing = true; self.fut = Some( run(move || { file.flush()?; Ok(file) as Result<_, io::Error> }) .boxed_local(), ); } self.poll_ready(cx) } } impl From> for Error { fn from(e: BlockingError) -> Self { match e { BlockingError::Error(e) => e.into(), _ => Error::Canceled, } } } #[cfg(test)] mod tests { use super::*; use crate::remove_file; const READ_FILE: &str = "tests/read.txt"; const WRITE_FILE: &str = "tests/write.txt"; const TEST_FILE: &str = "tests/test.txt"; #[test] fn stream_file() { run(async move { let read_file = open(READ_FILE).await?; let stream = read_to_stream(read_file).await?; let write_file = create(WRITE_FILE).await?; write_stream(write_file, stream).await?; let read_file = open(READ_FILE).await?; let write_file = open(WRITE_FILE).await?; let read_bytes = read(read_file).await?; let written_bytes = read(write_file).await?; assert!(written_bytes.as_ref() == read_bytes.as_ref()); remove_file(WRITE_FILE).await?; Ok(()) as Result<_, Error> }) .unwrap() } #[test] fn read_write_file() { let bytes_to_be_written = b"abcdefg"; run(async move { let file = create(TEST_FILE).await?; write(file, bytes_to_be_written.to_vec().into()).await?; let file = open(TEST_FILE).await?; let bytes = read(file).await?; assert!(bytes.as_ref() == bytes_to_be_written); remove_file(TEST_FILE).await?; Ok(()) as Result<_, Error> }) .unwrap(); } #[test] fn read_file() { run(async move { let file = open(READ_FILE).await?; let bytes = read(file).await?; assert!(bytes.as_ref() == b"Hello, World!\n"); Ok(()) as Result<_, Error> }) .unwrap(); } #[test] fn seek_file() { run(async move { let file = open(READ_FILE).await?; let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?; assert!(pos == 7); let bytes = read(file).await?; assert!(bytes.as_ref() == b"World!\n"); Ok(()) as Result<_, Error> }) .unwrap(); } #[test] fn small_chunks() { run(async move { let file = open(READ_FILE).await?; let mut bytes_mut = BytesMut::new(); let (file, _) = seek(file, io::SeekFrom::Start(7)).await?; let mut stream = read_to_stream(file).await?.chunk_size(2); while let Some(res) = stream.next().await { bytes_mut.extend(res?); } let bytes = bytes_mut.freeze(); assert!(bytes.as_ref() == b"World!\n"); Ok(()) as Result<_, Error> }) .unwrap(); } fn run(f: F) -> F::Output { actix_rt::System::new("test-system").block_on(f) } }