diff --git a/Cargo.toml b/Cargo.toml index faa9bd2..d19825c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,12 +13,12 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-threadpool = "0.3.2" -bytes = "0.5.4" +actix-rt = "2.0.0" +bytes = "1" futures = "0.3.5" log = "0.4" thiserror = "1.0" +tokio = { version = "1", default-features = false, features = ["rt"] } [dev-dependencies] -actix-rt = "1.1.1" anyhow = "1.0" diff --git a/src/file.rs b/src/file.rs index 5c09985..65f699a 100644 --- a/src/file.rs +++ b/src/file.rs @@ -4,7 +4,7 @@ //! to the ownership requirements of threaded interaction. use crate::Error; -use actix_threadpool::{run, BlockingError}; +use actix_rt::task::spawn_blocking; use bytes::{Bytes, BytesMut}; use futures::{ future::{FutureExt, LocalBoxFuture}, @@ -20,6 +20,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::task::JoinError; /// A stream of bytes from a file on the filesystem pub struct FileStream { @@ -27,12 +28,13 @@ pub struct FileStream { size: u64, offset: u64, file: Option, - fut: Option>>>, + fut: + Option, JoinError>>>, } struct FileSink { file: Option, - fut: Option>>>, + fut: Option, JoinError>>>, closing: bool, _error: PhantomData, } @@ -59,7 +61,7 @@ pub async fn create

(path: P) -> Result where P: AsRef + Send + 'static, { - let file = run(move || File::create(path)).await?; + let file = spawn_blocking(move || File::create(path)).await??; Ok(file) } @@ -77,12 +79,12 @@ where /// } /// ``` pub async fn metadata(file: File) -> Result<(File, Metadata), Error> { - let tup = run(move || { + let tup = spawn_blocking(move || { let metadata = file.metadata()?; Ok((file, metadata)) as Result<_, io::Error> }) - .await?; + .await??; Ok(tup) } @@ -111,7 +113,7 @@ pub async fn open

(path: P) -> Result where P: AsRef + Send + 'static, { - let file = run(move || File::open(path)).await?; + let file = spawn_blocking(move || File::open(path)).await??; Ok(file) } @@ -146,11 +148,11 @@ where /// } /// ``` pub async fn set_permissions(file: File, perm: Permissions) -> Result { - let file = run(move || { + let file = spawn_blocking(move || { file.set_permissions(perm)?; Ok(file) as io::Result<_> }) - .await?; + .await??; Ok(file) } @@ -179,11 +181,11 @@ pub async fn set_permissions(file: File, perm: Permissions) -> Result Result { - let file = run(move || { + let file = spawn_blocking(move || { file.sync_all()?; Ok(file) as Result<_, io::Error> }) - .await?; + .await??; Ok(file) } @@ -215,11 +217,11 @@ pub async fn sync_all(file: File) -> Result { /// } /// ``` pub async fn sync_data(file: File) -> Result { - let file = run(move || { + let file = spawn_blocking(move || { file.sync_data()?; Ok(file) as Result<_, io::Error> }) - .await?; + .await??; Ok(file) } @@ -253,11 +255,11 @@ pub async fn sync_data(file: File) -> Result { /// } /// ``` pub async fn set_len(file: File, size: u64) -> Result { - let file = run(move || { + let file = spawn_blocking(move || { file.set_len(size)?; Ok(file) as Result<_, io::Error> }) - .await?; + .await??; Ok(file) } @@ -283,11 +285,11 @@ pub async fn set_len(file: File, size: u64) -> Result { /// } /// ``` pub async fn seek(mut file: File, seek: io::SeekFrom) -> Result<(File, u64), Error> { - let tup = run(move || { + let tup = spawn_blocking(move || { let pos = file.seek(seek)?; Ok((file, pos)) as io::Result<_> }) - .await?; + .await??; Ok(tup) } @@ -527,12 +529,13 @@ impl Stream for FileStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ref mut fut) = self.fut { return match Pin::new(fut).poll(cx) { - Poll::Ready(Ok((file, bytes, offset))) => { + Poll::Ready(Ok(Ok((file, bytes, offset)))) => { self.fut.take(); self.file = Some(file); self.offset = offset as u64; Poll::Ready(Some(Ok(bytes))) } + Poll::Ready(Ok(Err(e))) => Poll::Ready(Some(Err(e.into()))), Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e.into()))), Poll::Pending => Poll::Pending, }; @@ -548,7 +551,7 @@ impl Stream for FileStream { let mut file = self.file.take().expect("Use after completion"); self.fut = Some( - run(move || { + spawn_blocking(move || { let max_bytes: usize; max_bytes = std::cmp::min(size.saturating_sub(offset), chunk_size) as usize; let mut buf = Vec::with_capacity(max_bytes); @@ -576,7 +579,7 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ref mut fut) = self.fut { return match Pin::new(fut).poll(cx) { - Poll::Ready(Ok(file)) => { + Poll::Ready(Ok(Ok(file))) => { self.fut.take(); self.file = Some(file); @@ -584,6 +587,7 @@ where Poll::Ready(Ok(())) } + Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(Error::from(e).into())), Poll::Ready(Err(e)) => Poll::Ready(Err(Error::from(e).into())), Poll::Pending => Poll::Pending, }; @@ -599,7 +603,7 @@ where log::debug!("Writing {} bytes", item.len()); self.fut = Some( - run(move || { + spawn_blocking(move || { file.write_all(item.as_ref())?; Ok(file) as Result<_, io::Error> }) @@ -617,9 +621,10 @@ where if !self.closing { if let Some(ref mut fut) = self.fut { match Pin::new(fut).poll(cx) { - Poll::Ready(Ok(file)) => { + Poll::Ready(Ok(Ok(file))) => { self.file = Some(file); } + Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(Error::from(e).into())), Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::from(e).into())), Poll::Pending => return Poll::Pending, }; @@ -629,7 +634,7 @@ where self.closing = true; self.fut = Some( - run(move || { + spawn_blocking(move || { file.flush()?; Ok(file) as Result<_, io::Error> }) @@ -641,12 +646,9 @@ where } } -impl From> for Error { - fn from(e: BlockingError) -> Self { - match e { - BlockingError::Error(e) => e.into(), - _ => Error::Canceled, - } +impl From for Error { + fn from(_: JoinError) -> Self { + Error::Canceled } } @@ -749,6 +751,6 @@ mod tests { } fn run(f: F) -> F::Output { - actix_rt::System::new("test-system").block_on(f) + actix_rt::System::new().block_on(f) } } diff --git a/src/lib.rs b/src/lib.rs index 44b4766..3bf549c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,7 @@ #![doc(html_root_url = "https://actix-fs.asonix.dog")] -use actix_threadpool::run; +use actix_rt::task::spawn_blocking; use bytes::Bytes; use futures::stream::Stream; use std::{ @@ -117,7 +117,7 @@ pub async fn canonicalize

(path: P) -> Result where P: AsRef + Send + 'static, { - let path = run(move || fs::canonicalize(path)).await?; + let path = spawn_blocking(move || fs::canonicalize(path)).await??; Ok(path) } @@ -176,7 +176,7 @@ where P: AsRef + Send + 'static, Q: AsRef + Send + 'static, { - let bytes = run(move || fs::copy(from, to)).await?; + let bytes = spawn_blocking(move || fs::copy(from, to)).await??; Ok(bytes) } @@ -221,7 +221,7 @@ pub async fn create_dir

(path: P) -> Result<()> where P: AsRef + Send + 'static, { - run(move || fs::create_dir(path)).await?; + spawn_blocking(move || fs::create_dir(path)).await??; Ok(()) } @@ -268,7 +268,7 @@ pub async fn create_dir_all

(path: P) -> Result<()> where P: AsRef + Send + 'static, { - run(move || fs::create_dir_all(path)).await?; + spawn_blocking(move || fs::create_dir_all(path)).await??; Ok(()) } @@ -307,7 +307,7 @@ where P: AsRef + Send + 'static, Q: AsRef + Send + 'static, { - run(move || fs::hard_link(src, dst)).await?; + spawn_blocking(move || fs::hard_link(src, dst)).await??; Ok(()) } @@ -348,7 +348,7 @@ pub async fn metadata

(path: P) -> Result where P: AsRef + Send + 'static, { - let metadata = run(move || fs::metadata(path)).await?; + let metadata = spawn_blocking(move || fs::metadata(path)).await??; Ok(metadata) } @@ -427,7 +427,7 @@ pub async fn read_link

(path: P) -> Result where P: AsRef + Send + 'static, { - let path = run(move || fs::read_link(path)).await?; + let path = spawn_blocking(move || fs::read_link(path)).await??; Ok(path) } @@ -546,7 +546,7 @@ pub async fn remove_dir

(path: P) -> Result<()> where P: AsRef + Send + 'static, { - run(move || fs::remove_dir(path)).await?; + spawn_blocking(move || fs::remove_dir(path)).await??; Ok(()) } @@ -586,7 +586,7 @@ pub async fn remove_dir_all

(path: P) -> Result<()> where P: AsRef + Send + 'static, { - run(move || fs::remove_dir_all(path)).await?; + spawn_blocking(move || fs::remove_dir_all(path)).await??; Ok(()) } @@ -626,7 +626,7 @@ pub async fn remove_file

(path: P) -> Result<()> where P: AsRef + Send + 'static, { - run(move || fs::remove_file(path)).await?; + spawn_blocking(move || fs::remove_file(path)).await??; Ok(()) } @@ -673,7 +673,7 @@ where P: AsRef + Send + 'static, Q: AsRef + Send + 'static, { - run(move || fs::rename(from, to)).await?; + spawn_blocking(move || fs::rename(from, to)).await??; Ok(()) } @@ -711,7 +711,7 @@ pub async fn set_permissions

(path: P, permissions: fs::Permissions) -> Result where P: AsRef + Send + 'static, { - run(move || fs::set_permissions(path, permissions)).await?; + spawn_blocking(move || fs::set_permissions(path, permissions)).await??; Ok(()) } @@ -748,7 +748,7 @@ pub async fn symlink_metadata

(path: P) -> Result where P: AsRef + Send + 'static, { - let metadata = run(move || fs::symlink_metadata(path)).await?; + let metadata = spawn_blocking(move || fs::symlink_metadata(path)).await??; Ok(metadata) }