From 2318fd9dca2b7bf32f721b28e224a52028574623 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Tue, 12 Oct 2021 23:16:31 -0500 Subject: [PATCH] Enable io-uring --- Cargo.lock | 41 +++- Cargo.toml | 5 + src/file.rs | 423 ++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 9 +- src/range.rs | 2 +- src/upload_manager.rs | 37 +++- 6 files changed, 497 insertions(+), 20 deletions(-) create mode 100644 src/file.rs diff --git a/Cargo.lock b/Cargo.lock index 1835663..d37f00f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,20 +120,21 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc7d7cd957c9ed92288a7c3c96af81fa5291f65247a76a34dac7b6af74e52ba0" +checksum = "ea360596a50aa9af459850737f99293e5cb9114ae831118cb6026b3bbc7583ad" dependencies = [ "actix-macros", "futures-core", "tokio", + "tokio-uring", ] [[package]] name = "actix-server" -version = "2.0.0-beta.5" +version = "2.0.0-beta.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26369215fcc3b0176018b3b68756a8bcc275bb000e6212e454944913a1f9bf87" +checksum = "7367665785765b066ad16e1086d26a087f696bc7c42b6f93004ced6cfcf1eeca" dependencies = [ "actix-rt", "actix-service", @@ -142,7 +143,6 @@ dependencies = [ "log", "mio", "num_cpus", - "slab", "tokio", ] @@ -872,6 +872,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-uring" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d75829ed9377bab6c90039fe47b9d84caceb4b5063266142e21bcce6550cda8" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "itertools" version = "0.10.1" @@ -1162,6 +1172,7 @@ version = "0.3.0-alpha.38" dependencies = [ "actix-form-data", "actix-rt", + "actix-server", "actix-web", "anyhow", "awc", @@ -1182,6 +1193,7 @@ dependencies = [ "thiserror", "time 0.3.3", "tokio", + "tokio-uring", "tokio-util", "tracing", "tracing-actix-web", @@ -1473,6 +1485,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1936,6 +1954,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-uring" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f54af096a39b937631659f1a5da60ab7c1af334025c33b87ed913072c3c8a9" +dependencies = [ + "io-uring", + "libc", + "scoped-tls", + "slab", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.8" diff --git a/Cargo.toml b/Cargo.toml index 7b024df..dcfd0b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,14 @@ repository = "https://git.asonix.dog/asonix/pict-rs" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring"] [dependencies] actix-form-data = "0.6.0-beta.1" actix-rt = "2.2.0" +actix-server = "2.0.0-beta.6" actix-web = { version = "4.0.0-beta.8", default-features = false } anyhow = "1.0" awc = { version = "3.0.0-beta.7", default-features = false, features = ["rustls"] } @@ -33,6 +37,7 @@ structopt = "0.3.14" thiserror = "1.0" time = { version = "0.3.0", features = ["serde"] } tokio = { version = "1", default-features = false, features = ["fs", "io-util", "process", "sync"] } +tokio-uring = { version = "0.1", optional = true } tokio-util = { version = "0.6", default-features = false, features = ["codec"] } tracing = "0.1.15" tracing-error = "0.1.2" diff --git a/src/file.rs b/src/file.rs new file mode 100644 index 0000000..3ac1bf0 --- /dev/null +++ b/src/file.rs @@ -0,0 +1,423 @@ +#[cfg(feature = "io-uring")] +pub(crate) use io_uring::File; + +#[cfg(not(feature = "io-uring"))] +pub(crate) use tokio::fs::File; + +#[cfg(feature = "io-uring")] +mod io_uring { + use std::{ + convert::TryInto, + fs::Metadata, + future::Future, + io::SeekFrom, + path::{Path, PathBuf}, + pin::Pin, + task::{Context, Poll, Waker}, + }; + use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; + + type IoFuture = + Pin, Vec)>>>; + + type FlushFuture = Pin)>>>; + + type ShutdownFuture = Pin>>>; + + type SeekFuture = Pin>>>; + + enum FileState { + Reading { future: IoFuture }, + Writing { future: IoFuture }, + Syncing { future: FlushFuture }, + Seeking { future: SeekFuture }, + Shutdown { future: ShutdownFuture }, + Pending, + } + + impl FileState { + fn take(&mut self) -> Self { + std::mem::replace(self, FileState::Pending) + } + } + + pub(crate) struct File { + path: PathBuf, + inner: Option, + cursor: usize, + wakers: Vec, + state: FileState, + } + + impl File { + pub(crate) async fn open(path: impl AsRef) -> std::io::Result { + tracing::info!("Opening io-uring file"); + Ok(File { + path: path.as_ref().to_owned(), + inner: Some(tokio_uring::fs::File::open(path).await?), + cursor: 0, + wakers: vec![], + state: FileState::Pending, + }) + } + + pub(crate) async fn create(path: impl AsRef) -> std::io::Result { + tracing::info!("Creating io-uring file"); + Ok(File { + path: path.as_ref().to_owned(), + inner: Some(tokio_uring::fs::File::create(path).await?), + cursor: 0, + wakers: vec![], + state: FileState::Pending, + }) + } + + pub(crate) async fn metadata(&self) -> std::io::Result { + tokio::fs::metadata(&self.path).await + } + + fn poll_read( + &mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + mut future: IoFuture, + ) -> Poll> { + match Pin::new(&mut future).poll(cx) { + Poll::Ready((file, Ok(bytes_read), vec)) => { + self.cursor += bytes_read; + self.inner = Some(file); + buf.put_slice(&vec[0..bytes_read]); + + // Wake tasks waiting on read to complete + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(Ok(())) + } + Poll::Ready((file, Err(err), _vec)) => { + self.inner = Some(file); + // Wake tasks waiting on read to complete + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(Err(err)) + } + Poll::Pending => { + self.state = FileState::Reading { future }; + + Poll::Pending + } + } + } + + fn poll_write( + &mut self, + cx: &mut Context<'_>, + mut future: IoFuture, + ) -> Poll> { + match Pin::new(&mut future).poll(cx) { + Poll::Ready((file, Ok(bytes_written), _vec)) => { + self.cursor += bytes_written; + self.inner = Some(file); + + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(Ok(bytes_written)) + } + Poll::Ready((file, Err(err), _vec)) => { + self.inner = Some(file); + + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(Err(err)) + } + Poll::Pending => { + self.state = FileState::Writing { future }; + + Poll::Pending + } + } + } + + fn poll_flush( + &mut self, + cx: &mut Context<'_>, + mut future: FlushFuture, + ) -> Poll> { + match Pin::new(&mut future).poll(cx) { + Poll::Ready((file, res)) => { + self.inner = Some(file); + + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(res) + } + Poll::Pending => { + self.state = FileState::Syncing { future }; + + Poll::Pending + } + } + } + + fn poll_shutdown( + &mut self, + cx: &mut Context<'_>, + mut future: ShutdownFuture, + ) -> Poll> { + match Pin::new(&mut future).poll(cx) { + Poll::Ready(res) => { + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(res) + } + Poll::Pending => { + self.state = FileState::Shutdown { future }; + + Poll::Pending + } + } + } + + fn poll_seek( + &mut self, + cx: &mut Context<'_>, + mut future: SeekFuture, + ) -> Poll> { + match Pin::new(&mut future).poll(cx) { + Poll::Ready(Ok(new_position)) => { + for waker in self.wakers.drain(..) { + waker.wake(); + } + + if let Ok(position) = new_position.try_into() { + self.cursor = position; + Poll::Ready(Ok(new_position)) + } else { + Poll::Ready(Err(std::io::ErrorKind::Other.into())) + } + } + Poll::Ready(Err(err)) => { + for waker in self.wakers.drain(..) { + waker.wake(); + } + + Poll::Ready(Err(err)) + } + Poll::Pending => { + self.state = FileState::Seeking { future }; + + Poll::Pending + } + } + } + + fn prepare_read(&mut self, buf: &mut ReadBuf<'_>) -> IoFuture { + let bytes_to_read = buf.remaining().min(65_536); + + let vec = vec![0u8; bytes_to_read]; + + let file = self.inner.take().unwrap(); + let position: u64 = self.cursor.try_into().unwrap(); + + Box::pin(async move { + let (res, vec) = file.read_at(vec, position).await; + (file, res, vec) + }) + } + + fn prepare_write(&mut self, buf: &[u8]) -> IoFuture { + let vec = buf.to_vec(); + + let file = self.inner.take().unwrap(); + let position: u64 = self.cursor.try_into().unwrap(); + + Box::pin(async move { + let (res, vec) = file.write_at(vec, position).await; + (file, res, vec) + }) + } + + fn prepare_flush(&mut self) -> FlushFuture { + let file = self.inner.take().unwrap(); + + Box::pin(async move { + let res = file.sync_all().await; + (file, res) + }) + } + + fn prepare_shutdown(&mut self) -> ShutdownFuture { + let file = self.inner.take().unwrap(); + + Box::pin(async move { + file.sync_all().await?; + file.close().await + }) + } + + fn prepare_seek(&self, from_end: i64) -> SeekFuture { + let path = self.path.clone(); + + Box::pin(async move { + let meta = tokio::fs::metadata(path).await?; + let end = meta.len(); + + if from_end < 0 { + let from_end = (-1) * from_end; + let from_end: u64 = + from_end.try_into().map_err(|_| std::io::ErrorKind::Other)?; + + return Ok(end + from_end); + } + + let from_end: u64 = from_end.try_into().map_err(|_| std::io::ErrorKind::Other)?; + + if from_end > end { + return Err(std::io::ErrorKind::Other.into()); + } + + Ok(end - from_end) + }) + } + + fn register_waker(&mut self, cx: &mut Context<'_>) -> Poll { + let already_registered = self.wakers.iter().any(|waker| cx.waker().will_wake(waker)); + + if !already_registered { + self.wakers.push(cx.waker().clone()); + } + + Poll::Pending + } + } + + impl AsyncRead for File { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.state.take() { + FileState::Pending => { + let future = (*self).prepare_read(buf); + + (*self).poll_read(cx, buf, future) + } + FileState::Reading { future } => (*self).poll_read(cx, buf, future), + _ => (*self).register_waker(cx), + } + } + } + + impl AsyncWrite for File { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.state.take() { + FileState::Pending => { + let future = (*self).prepare_write(buf); + + (*self).poll_write(cx, future) + } + FileState::Writing { future } => (*self).poll_write(cx, future), + _ => (*self).register_waker(cx), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.state.take() { + FileState::Pending => { + let future = (*self).prepare_flush(); + + (*self).poll_flush(cx, future) + } + FileState::Syncing { future } => (*self).poll_flush(cx, future), + _ => (*self).register_waker(cx), + } + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.state.take() { + FileState::Pending => { + let future = (*self).prepare_shutdown(); + + (*self).poll_shutdown(cx, future) + } + FileState::Shutdown { future } => (*self).poll_shutdown(cx, future), + _ => (*self).register_waker(cx), + } + } + } + + impl AsyncSeek for File { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { + match position { + SeekFrom::Start(from_start) => { + self.cursor = from_start.try_into().unwrap(); + Ok(()) + } + SeekFrom::End(from_end) => match self.state.take() { + FileState::Pending => { + let future = self.prepare_seek(from_end); + + self.state = FileState::Seeking { future }; + Ok(()) + } + _ => Err(std::io::ErrorKind::Other.into()), + }, + SeekFrom::Current(from_current) => { + if from_current < 0 { + let to_subtract = (-1) * from_current; + let to_subtract: usize = to_subtract + .try_into() + .map_err(|_| std::io::ErrorKind::Other)?; + + if to_subtract > self.cursor { + return Err(std::io::ErrorKind::Other.into()); + } + + self.cursor -= to_subtract; + } else { + let from_current: usize = from_current + .try_into() + .map_err(|_| std::io::ErrorKind::Other)?; + + self.cursor += from_current; + } + + Ok(()) + } + } + } + + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.state.take() { + FileState::Pending => Poll::Ready(Ok(self + .cursor + .try_into() + .map_err(|_| std::io::ErrorKind::Other)?)), + FileState::Seeking { future } => (*self).poll_seek(cx, future), + _ => Poll::Ready(Err(std::io::ErrorKind::Other.into())), + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 9a87041..5ef6df4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,6 +44,7 @@ mod config; mod error; mod exiftool; mod ffmpeg; +mod file; mod magick; mod middleware; mod migrate; @@ -231,7 +232,7 @@ async fn safe_save_file(path: PathBuf, mut bytes: web::Bytes) -> Result<(), Erro // Open the file for writing debug!("Creating {:?}", path); - let mut file = tokio::fs::File::create(&path).await?; + let mut file = crate::file::File::create(&path).await?; // try writing debug!("Writing to {:?}", path); @@ -541,7 +542,7 @@ async fn process( let permit = PROCESS_SEMAPHORE.acquire().await?; - let file = tokio::fs::File::open(original_path.clone()).await?; + let file = crate::file::File::open(original_path.clone()).await?; let mut processed_reader = crate::magick::process_image_write_read(file, thumbnail_args, format)?; @@ -708,7 +709,7 @@ async fn ranged_file_resp( if range_header.is_empty() { return Err(UploadError::Range.into()); } else if range_header.len() == 1 { - let file = tokio::fs::File::open(path).await?; + let file = crate::file::File::open(path).await?; let meta = file.metadata().await?; @@ -724,7 +725,7 @@ async fn ranged_file_resp( } //No Range header in the request - return the entire document None => { - let file = tokio::fs::File::open(path).await?; + let file = crate::file::File::open(path).await?; let stream = Box::pin(crate::stream::bytes_stream(file)) as LocalBoxStream<'_, _>; (HttpResponse::Ok(), stream) } diff --git a/src/range.rs b/src/range.rs index 8828067..c3c2ffa 100644 --- a/src/range.rs +++ b/src/range.rs @@ -61,7 +61,7 @@ impl Range { pub(crate) async fn chop_file( &self, - mut file: tokio::fs::File, + mut file: crate::file::File, ) -> Result>, Error> { match self { Range::Start(start) => { diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 0f827ca..2987e97 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -991,7 +991,7 @@ pub(crate) async fn safe_save_reader( debug!("Writing stream to {:?}", to); - let mut file = tokio::fs::File::create(to).await?; + let mut file = crate::file::File::create(to).await?; tokio::io::copy(input, &mut file).await?; @@ -1025,7 +1025,7 @@ where let to1 = to.clone(); let fut = async move { - let mut file = tokio::fs::File::create(to1).await?; + let mut file = crate::file::File::create(to1).await?; while let Some(res) = stream.next().await { let mut bytes = res?; @@ -1086,19 +1086,36 @@ mod test { use sha2::{Digest, Sha256}; use std::io::Read; + macro_rules! test_on_arbiter { + ($fut:expr) => { + actix_rt::System::new().block_on(async move { + let arbiter = actix_rt::Arbiter::new(); + + let (tx, rx) = tokio::sync::oneshot::channel(); + + arbiter.spawn(async move { + let handle = actix_rt::spawn($fut); + + let _ = tx.send(handle.await.unwrap()); + }); + + rx.await.unwrap() + }) + }; + } + #[test] fn hasher_works() { - let hash = actix_rt::System::new() - .block_on(async move { - let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; + let hash = test_on_arbiter!(async move { + let file1 = crate::file::File::open("./client-examples/earth.gif").await?; - let mut hasher = Hasher::new(file1, Sha256::new()); + let mut hasher = Hasher::new(file1, Sha256::new()); - tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; + tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; - hasher.finalize_reset().await - }) - .unwrap(); + hasher.finalize_reset().await + }) + .unwrap(); let mut file = std::fs::File::open("./client-examples/earth.gif").unwrap(); let mut vec = Vec::new();