actix-fs/src/lib.rs

423 lines
12 KiB
Rust

//! # Actix FS
//! _Asyncronous filesystem operations for actix-based systems_
//!
//! ## Usage
//!
//! ```rust
//! use std::io::SeekFrom;
//!
//! #[actix_rt::main]
//! async fn main() -> Result<(), anyhow::Error> {
//! let file = actix_fs::open("tests/read.txt").await?;
//! let (file, position) = actix_fs::seek(file, SeekFrom::Start(7)).await?;
//! let bytes = actix_fs::read_bytes(file).await?;
//!
//! assert!(position == 7);
//! assert!(bytes.as_ref() == b"World!\n");
//! Ok(())
//! }
//! ```
//!
//! ### Contributing
//! Unless otherwise stated, all contributions to this project will be licensed under the CSL with
//! the exceptions listed in the License section of this file.
//!
//! ### License
//! This work is licensed under the Cooperative Software License. This is not a Free Software
//! License, but may be considered a "source-available License." For most hobbyists, self-employed
//! developers, worker-owned companies, and cooperatives, this software can be used in most
//! projects so long as this software is distributed under the terms of the CSL. For more
//! information, see the provided LICENSE file. If none exists, the license can be found online
//! [here](https://lynnesbian.space/csl/). If you are a free software project and wish to use this
//! software under the terms of the GNU Affero General Public License, please contact me at
//! [asonix@asonix.dog](mailto:asonix@asonix.dog) and we can sort that out. If you wish to use this
//! project under any other license, especially in proprietary software, the answer is likely no.
//!
//! Actix FS is currently licensed under the AGPL to the Lemmy project, found
//! at [github.com/LemmyNet/lemmy](https://github.com/LemmyNet/lemmy)
use actix_threadpool::BlockingError;
use bytes::{Bytes, BytesMut};
use futures::{
future::{FutureExt, LocalBoxFuture},
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use std::{
fs::{File, Metadata},
future::Future,
io::{self, prelude::*},
marker::PhantomData,
path::Path,
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}")]
Io(#[from] io::Error),
#[error("Task canceled")]
Canceled,
}
pub struct FileStream {
chunk_size: u64,
size: u64,
offset: u64,
file: Option<File>,
fut: Option<LocalBoxFuture<'static, Result<(File, Bytes, usize), BlockingError<io::Error>>>>,
}
impl FileStream {
async fn new(file: File) -> Result<Self, Error> {
let (file, offset) = seek(file, io::SeekFrom::Current(0)).await?;
let (file, metadata) = metadata(file).await?;
Ok(FileStream {
chunk_size: 65_356,
size: metadata.len(),
offset,
file: Some(file),
fut: None,
})
}
pub fn chunk_size(mut self, chunk_size: u64) -> Self {
self.chunk_size = chunk_size;
self
}
}
struct FileSink<E> {
file: Option<File>,
fut: Option<LocalBoxFuture<'static, Result<File, BlockingError<io::Error>>>>,
chunk_size: u64,
closing: bool,
_error: PhantomData<E>,
}
impl<E> FileSink<E> {
fn new(file: File) -> Self {
FileSink {
file: Some(file),
fut: None,
chunk_size: 0,
closing: false,
_error: PhantomData,
}
}
}
pub async fn open<P>(path: P) -> Result<File, Error>
where
P: AsRef<Path> + Send + 'static,
{
let file = actix_threadpool::run(move || File::open(path)).await?;
Ok(file)
}
pub async fn create<P>(path: P) -> Result<File, Error>
where
P: AsRef<Path> + Send + 'static,
{
let file = actix_threadpool::run(move || File::create(path)).await?;
Ok(file)
}
pub async fn remove<P>(path: P) -> Result<(), Error>
where
P: AsRef<Path> + Send + 'static,
{
actix_threadpool::run(move || std::fs::remove_file(path)).await?;
Ok(())
}
pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> {
let tup = actix_threadpool::run(move || {
let pos = file.seek(seek)?;
Ok((file, pos)) as Result<_, io::Error>
})
.await?;
Ok(tup)
}
pub async fn metadata(file: File) -> Result<(File, Metadata), Error> {
let tup = actix_threadpool::run(move || {
let metadata = file.metadata()?;
Ok((file, metadata)) as Result<_, io::Error>
})
.await?;
Ok(tup)
}
pub async fn read_stream(file: File) -> Result<FileStream, Error> {
FileStream::new(file).await
}
pub async fn read_bytes(file: File) -> Result<Bytes, Error> {
let mut stream = FileStream::new(file).await?;
let mut bytes_mut = BytesMut::new();
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
Ok(bytes_mut.freeze())
}
pub async fn write_stream<S, E>(file: File, mut stream: S) -> Result<(), E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: From<Error> + Unpin,
{
let mut sink = FileSink::<E>::new(file);
sink.send_all(&mut stream).await?;
sink.close().await?;
Ok(())
}
pub async fn write_bytes(file: File, bytes: Bytes) -> Result<(), Error> {
let mut sink = FileSink::<Error>::new(file);
sink.send(bytes).await?;
sink.close().await?;
Ok(())
}
impl Stream for FileStream {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok((file, bytes, offset))) => {
self.fut.take();
self.file = Some(file);
self.offset = offset as u64;
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))),
Poll::Pending => Poll::Pending,
};
}
let size = self.size;
let offset = self.offset;
let chunk_size = self.chunk_size;
if size == offset {
return Poll::Ready(None);
}
let mut file = self.file.take().expect("Use after completion");
self.fut = Some(
actix_threadpool::run(move || {
let max_bytes: usize;
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
let mut buf = Vec::with_capacity(max_bytes);
let pos = file.seek(io::SeekFrom::Start(offset))?;
let nbytes = Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
if nbytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf), pos as usize + nbytes))
})
.boxed_local(),
);
self.poll_next(cx)
}
}
impl<E> Sink<Bytes> for FileSink<E>
where
E: From<Error> + Unpin,
{
type Error = E;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(file)) => {
self.fut.take();
self.file = Some(file);
self.chunk_size = 0;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())),
Poll::Pending => Poll::Pending,
};
}
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let mut file = self.file.take().expect("Use after completion");
self.chunk_size = item.len() as u64;
self.fut = Some(
actix_threadpool::run(move || {
file.write_all(item.as_ref())?;
Ok(file) as Result<_, io::Error>
})
.boxed_local(),
);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
if !self.closing {
if let Some(ref mut fut) = self.fut {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(file)) => {
self.file = Some(file);
self.chunk_size = 0;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())),
Poll::Pending => return Poll::Pending,
};
}
let mut file = self.file.take().expect("Use after completion");
self.closing = true;
self.fut = Some(
actix_threadpool::run(move || {
file.flush()?;
Ok(file) as Result<_, io::Error>
})
.boxed_local(),
);
}
self.poll_ready(cx)
}
}
impl From<BlockingError<io::Error>> for Error {
fn from(e: BlockingError<io::Error>) -> Self {
match e {
BlockingError::Error(e) => e.into(),
_ => Error::Canceled,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const READ_FILE: &str = "tests/read.txt";
const WRITE_FILE: &str = "tests/write.txt";
const TEST_FILE: &str = "tests/test.txt";
#[test]
fn stream_file() {
run(async move {
let read_file = open(READ_FILE).await?;
let stream = read_stream(read_file).await?;
let write_file = create(WRITE_FILE).await?;
write_stream(write_file, stream).await?;
let read_file = open(READ_FILE).await?;
let write_file = open(WRITE_FILE).await?;
let read = read_bytes(read_file).await?;
let written = read_bytes(write_file).await?;
assert!(written.as_ref() == read.as_ref());
remove(WRITE_FILE).await?;
Ok(()) as Result<_, Error>
})
.unwrap()
}
#[test]
fn read_write_file() {
let bytes_to_be_written = b"abcdefg";
run(async move {
let file = create(TEST_FILE).await?;
write_bytes(file, bytes_to_be_written.to_vec().into()).await?;
let file = open(TEST_FILE).await?;
let bytes = read_bytes(file).await?;
assert!(bytes.as_ref() == bytes_to_be_written);
remove(TEST_FILE).await?;
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn read_file() {
run(async move {
let file = open(READ_FILE).await?;
let bytes = read_bytes(file).await?;
assert!(bytes.as_ref() == b"Hello, World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn seek_file() {
run(async move {
let file = open(READ_FILE).await?;
let (file, pos) = seek(file, io::SeekFrom::Start(7)).await?;
assert!(pos == 7);
let bytes = read_bytes(file).await?;
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
#[test]
fn small_chunks() {
run(async move {
let file = open(READ_FILE).await?;
let mut bytes_mut = BytesMut::new();
let (file, _) = seek(file, io::SeekFrom::Start(7)).await?;
let mut stream = read_stream(file).await?.chunk_size(2);
while let Some(res) = stream.next().await {
bytes_mut.extend(res?);
}
let bytes = bytes_mut.freeze();
assert!(bytes.as_ref() == b"World!\n");
Ok(()) as Result<_, Error>
})
.unwrap();
}
fn run<F: Future + 'static>(f: F) -> F::Output {
actix_rt::System::new("test-system").block_on(f)
}
}