Support actix-rt 2.0
This commit is contained in:
parent
dc37026b9f
commit
602d9de3e1
|
@ -13,12 +13,12 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
actix-threadpool = "0.3.2"
|
||||
bytes = "0.5.4"
|
||||
actix-rt = "2.0.0"
|
||||
bytes = "1"
|
||||
futures = "0.3.5"
|
||||
log = "0.4"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["rt"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "1.1.1"
|
||||
anyhow = "1.0"
|
||||
|
|
62
src/file.rs
62
src/file.rs
|
@ -4,7 +4,7 @@
|
|||
//! to the ownership requirements of threaded interaction.
|
||||
|
||||
use crate::Error;
|
||||
use actix_threadpool::{run, BlockingError};
|
||||
use actix_rt::task::spawn_blocking;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{
|
||||
future::{FutureExt, LocalBoxFuture},
|
||||
|
@ -20,6 +20,7 @@ use std::{
|
|||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::task::JoinError;
|
||||
|
||||
/// A stream of bytes from a file on the filesystem
|
||||
pub struct FileStream {
|
||||
|
@ -27,12 +28,13 @@ pub struct FileStream {
|
|||
size: u64,
|
||||
offset: u64,
|
||||
file: Option<File>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<(File, Bytes, usize), BlockingError<io::Error>>>>,
|
||||
fut:
|
||||
Option<LocalBoxFuture<'static, Result<Result<(File, Bytes, usize), io::Error>, JoinError>>>,
|
||||
}
|
||||
|
||||
struct FileSink<E> {
|
||||
file: Option<File>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<File, BlockingError<io::Error>>>>,
|
||||
fut: Option<LocalBoxFuture<'static, Result<Result<File, io::Error>, JoinError>>>,
|
||||
closing: bool,
|
||||
_error: PhantomData<E>,
|
||||
}
|
||||
|
@ -59,7 +61,7 @@ pub async fn create<P>(path: P) -> Result<File, Error>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let file = run(move || File::create(path)).await?;
|
||||
let file = spawn_blocking(move || File::create(path)).await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -77,12 +79,12 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn metadata(file: File) -> Result<(File, Metadata), Error> {
|
||||
let tup = run(move || {
|
||||
let tup = spawn_blocking(move || {
|
||||
let metadata = file.metadata()?;
|
||||
|
||||
Ok((file, metadata)) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(tup)
|
||||
}
|
||||
|
@ -111,7 +113,7 @@ pub async fn open<P>(path: P) -> Result<File, Error>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let file = run(move || File::open(path)).await?;
|
||||
let file = spawn_blocking(move || File::open(path)).await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -146,11 +148,11 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn set_permissions(file: File, perm: Permissions) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.set_permissions(perm)?;
|
||||
Ok(file) as io::Result<_>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -179,11 +181,11 @@ pub async fn set_permissions(file: File, perm: Permissions) -> Result<File, Erro
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn sync_all(file: File) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.sync_all()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -215,11 +217,11 @@ pub async fn sync_all(file: File) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn sync_data(file: File) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.sync_data()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -253,11 +255,11 @@ pub async fn sync_data(file: File) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn set_len(file: File, size: u64) -> Result<File, Error> {
|
||||
let file = run(move || {
|
||||
let file = spawn_blocking(move || {
|
||||
file.set_len(size)?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
@ -283,11 +285,11 @@ pub async fn set_len(file: File, size: u64) -> Result<File, Error> {
|
|||
/// }
|
||||
/// ```
|
||||
pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> {
|
||||
let tup = run(move || {
|
||||
let tup = spawn_blocking(move || {
|
||||
let pos = file.seek(seek)?;
|
||||
Ok((file, pos)) as io::Result<_>
|
||||
})
|
||||
.await?;
|
||||
.await??;
|
||||
|
||||
Ok(tup)
|
||||
}
|
||||
|
@ -527,12 +529,13 @@ impl Stream for FileStream {
|
|||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
return match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok((file, bytes, offset))) => {
|
||||
Poll::Ready(Ok(Ok((file, bytes, offset)))) => {
|
||||
self.fut.take();
|
||||
self.file = Some(file);
|
||||
self.offset = offset as u64;
|
||||
Poll::Ready(Some(Ok(bytes)))
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
};
|
||||
|
@ -548,7 +551,7 @@ impl Stream for FileStream {
|
|||
|
||||
let mut file = self.file.take().expect("Use after completion");
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
let max_bytes: usize;
|
||||
max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize;
|
||||
let mut buf = Vec::with_capacity(max_bytes);
|
||||
|
@ -576,7 +579,7 @@ where
|
|||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
return match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok(file)) => {
|
||||
Poll::Ready(Ok(Ok(file))) => {
|
||||
self.fut.take();
|
||||
self.file = Some(file);
|
||||
|
||||
|
@ -584,6 +587,7 @@ where
|
|||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
};
|
||||
|
@ -599,7 +603,7 @@ where
|
|||
log::debug!("Writing {} bytes", item.len());
|
||||
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
file.write_all(item.as_ref())?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
|
@ -617,9 +621,10 @@ where
|
|||
if !self.closing {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
match Pin::new(fut).poll(cx) {
|
||||
Poll::Ready(Ok(file)) => {
|
||||
Poll::Ready(Ok(Ok(file))) => {
|
||||
self.file = Some(file);
|
||||
}
|
||||
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
};
|
||||
|
@ -629,7 +634,7 @@ where
|
|||
self.closing = true;
|
||||
|
||||
self.fut = Some(
|
||||
run(move || {
|
||||
spawn_blocking(move || {
|
||||
file.flush()?;
|
||||
Ok(file) as Result<_, io::Error>
|
||||
})
|
||||
|
@ -641,12 +646,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl From<BlockingError<io::Error>> for Error {
|
||||
fn from(e: BlockingError<io::Error>) -> Self {
|
||||
match e {
|
||||
BlockingError::Error(e) => e.into(),
|
||||
_ => Error::Canceled,
|
||||
}
|
||||
impl From<JoinError> for Error {
|
||||
fn from(_: JoinError) -> Self {
|
||||
Error::Canceled
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -749,6 +751,6 @@ mod tests {
|
|||
}
|
||||
|
||||
fn run<F: Future + 'static>(f: F) -> F::Output {
|
||||
actix_rt::System::new("test-system").block_on(f)
|
||||
actix_rt::System::new().block_on(f)
|
||||
}
|
||||
}
|
||||
|
|
28
src/lib.rs
28
src/lib.rs
|
@ -35,7 +35,7 @@
|
|||
|
||||
#![doc(html_root_url = "https://actix-fs.asonix.dog")]
|
||||
|
||||
use actix_threadpool::run;
|
||||
use actix_rt::task::spawn_blocking;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use std::{
|
||||
|
@ -117,7 +117,7 @@ pub async fn canonicalize<P>(path: P) -> Result<PathBuf>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let path = run(move || fs::canonicalize(path)).await?;
|
||||
let path = spawn_blocking(move || fs::canonicalize(path)).await??;
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let bytes = run(move || fs::copy(from, to)).await?;
|
||||
let bytes = spawn_blocking(move || fs::copy(from, to)).await??;
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ pub async fn create_dir<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::create_dir(path)).await?;
|
||||
spawn_blocking(move || fs::create_dir(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ pub async fn create_dir_all<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::create_dir_all(path)).await?;
|
||||
spawn_blocking(move || fs::create_dir_all(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::hard_link(src, dst)).await?;
|
||||
spawn_blocking(move || fs::hard_link(src, dst)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -348,7 +348,7 @@ pub async fn metadata<P>(path: P) -> Result<fs::Metadata>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let metadata = run(move || fs::metadata(path)).await?;
|
||||
let metadata = spawn_blocking(move || fs::metadata(path)).await??;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
@ -427,7 +427,7 @@ pub async fn read_link<P>(path: P) -> Result<PathBuf>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let path = run(move || fs::read_link(path)).await?;
|
||||
let path = spawn_blocking(move || fs::read_link(path)).await??;
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
@ -546,7 +546,7 @@ pub async fn remove_dir<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_dir(path)).await?;
|
||||
spawn_blocking(move || fs::remove_dir(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -586,7 +586,7 @@ pub async fn remove_dir_all<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_dir_all(path)).await?;
|
||||
spawn_blocking(move || fs::remove_dir_all(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -626,7 +626,7 @@ pub async fn remove_file<P>(path: P) -> Result<()>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::remove_file(path)).await?;
|
||||
spawn_blocking(move || fs::remove_file(path)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -673,7 +673,7 @@ where
|
|||
P: AsRef<Path> + Send + 'static,
|
||||
Q: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::rename(from, to)).await?;
|
||||
spawn_blocking(move || fs::rename(from, to)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -711,7 +711,7 @@ pub async fn set_permissions<P>(path: P, permissions: fs::Permissions) -> Result
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
run(move || fs::set_permissions(path, permissions)).await?;
|
||||
spawn_blocking(move || fs::set_permissions(path, permissions)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -748,7 +748,7 @@ pub async fn symlink_metadata<P>(path: P) -> Result<fs::Metadata>
|
|||
where
|
||||
P: AsRef<Path> + Send + 'static,
|
||||
{
|
||||
let metadata = run(move || fs::symlink_metadata(path)).await?;
|
||||
let metadata = spawn_blocking(move || fs::symlink_metadata(path)).await??;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue