diff --git a/flake.nix b/flake.nix index 4b8a515..7ad1b9e 100644 --- a/flake.nix +++ b/flake.nix @@ -20,6 +20,8 @@ nativeBuildInputs = [ cargo cargo-outdated clippy rust-analyzer rustc rustfmt ]; RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}"; + + LOOM_MAX_PREEMPTIONS = "2"; }; }); } diff --git a/src/lib.rs b/src/lib.rs index efeac16..6bdd21b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,15 @@ use executor::block_on; use queue::Queue; use selector::select; +#[cfg(any(loom, test))] +#[doc(hidden)] +pub mod tests { + #[doc(hidden)] + pub mod queue { + pub use crate::queue::{bounded, queue_count, Queue}; + } +} + /// Configuration builder for the CpuPool #[derive(Debug)] pub struct Config { diff --git a/src/queue.rs b/src/queue.rs index 708b499..6e96264 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -3,11 +3,36 @@ use std::collections::VecDeque; use crate::notify::Notify; use crate::sync::{Arc, Mutex}; -pub(super) fn bounded(capacity: usize) -> Queue { +thread_local! { + #[cfg(any(loom, test))] + static QUEUE_COUNT: std::cell::RefCell> = std::cell::RefCell::new(std::num::Wrapping(0)); +} + +#[inline(always)] +fn increment_queue_count() { + #[cfg(any(loom, test))] + QUEUE_COUNT.with_borrow_mut(|v| *v += 1); +} + +#[inline(always)] +fn decrement_queue_count() { + #[cfg(any(loom, test))] + QUEUE_COUNT.with_borrow_mut(|v| *v -= 1); +} + +#[cfg(any(test, loom))] +#[doc(hidden)] +pub fn queue_count() -> u64 { + QUEUE_COUNT.with_borrow(|v| v.0) +} + +#[doc(hidden)] +pub fn bounded(capacity: usize) -> Queue { Queue::bounded(capacity) } -pub(crate) struct Queue { +#[doc(hidden)] +pub struct Queue { inner: Arc>, capacity: usize, } @@ -19,8 +44,11 @@ struct QueueState { } impl Queue { - pub(super) fn bounded(capacity: usize) -> Self { + #[doc(hidden)] + pub fn bounded(capacity: usize) -> Self { + increment_queue_count(); metrics::counter!("async-cpupool.queue.created").increment(1); + Self { inner: Arc::new(QueueState { queue: Mutex::new(VecDeque::new()), @@ -31,7 +59,8 @@ impl Queue { } } - pub(super) fn len(&self) -> usize { + #[doc(hidden)] + pub fn len(&self) -> usize { self.inner.queue.lock().expect("not poisoned").len() } @@ -45,21 +74,23 @@ impl Queue { } } - pub(super) async fn push(&self, mut item: T) { + #[doc(hidden)] + pub async fn push(&self, mut item: T) { loop { let listener = self.inner.push_notify.listen().await; - if let Some(returned_item) = self.try_push(item) { - item = returned_item; - - listener.await; - } else { + let Some(returned_item) = self.try_push(item) else { return; - } + }; + + item = returned_item; + + listener.await; } } - pub(super) fn try_push(&self, item: T) -> Option { + #[doc(hidden)] + pub fn try_push(&self, item: T) -> Option { match self.try_push_impl(item) { Some(item) => Some(item), None => { @@ -81,13 +112,12 @@ impl Queue { } } - pub(super) async fn pop(&self) -> T { + #[doc(hidden)] + pub async fn pop(&self) -> T { loop { let listener = self.inner.pop_notify.listen().await; if let Some(item) = self.try_pop() { - self.inner.push_notify.notify_one(); - metrics::counter!("async-cpupool.queue.popped").increment(1); return item; } @@ -95,7 +125,16 @@ impl Queue { } } - fn try_pop(&self) -> Option { + #[doc(hidden)] + pub fn try_pop(&self) -> Option { + let item = self.try_pop_impl()?; + self.inner.push_notify.notify_one(); + metrics::counter!("async-cpupool.queue.popped").increment(1); + + Some(item) + } + + fn try_pop_impl(&self) -> Option { self.inner.queue.lock().expect("not poisoned").pop_front() } } @@ -109,8 +148,9 @@ impl Clone for Queue { } } -impl Drop for Queue { +impl Drop for QueueState { fn drop(&mut self) { + decrement_queue_count(); metrics::counter!("async-cpupool.queue.dropped").increment(1); } } diff --git a/tests/queue.rs b/tests/queue.rs new file mode 100644 index 0000000..f20a519 --- /dev/null +++ b/tests/queue.rs @@ -0,0 +1,244 @@ +#![cfg(loom)] + +#[test] +fn push_pop() { + loom::model(|| { + let queue = async_cpupool::tests::queue::bounded(1); + + assert!(queue.try_push(()).is_none(), "Failed to push item"); + + assert!(queue.try_push(()).is_some(), "Shouldn't have pushed item"); + + assert!(queue.try_pop().is_some(), "Failed to pop item"); + + assert!(queue.try_pop().is_none(), "Shoudln't have popped item"); + + assert_eq!(queue.len(), 0, "Should have popped pushed item"); + + assert!(queue.try_push(()).is_none(), "Failed to push item"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +} + +#[test] +fn async_push_pop() { + loom::model(|| { + let queue = async_cpupool::tests::queue::bounded(1); + + loom::future::block_on(async { + queue.push(()).await; + queue.pop().await; + }); + + assert_eq!(queue.len(), 0, "Should have popped pushed item"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +} + +#[test] +fn threaded_push_pop() { + loom::model(|| { + let queue = async_cpupool::tests::queue::bounded(1); + + let q2 = queue.clone(); + let handle = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.pop().await; + }); + + drop(q2); + }); + + assert!(queue.try_push(()).is_none(), "failed to push item"); + handle.join().unwrap(); + + assert_eq!(queue.len(), 0, "Should have popped pushed item"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +} + +#[test] +fn multiple_threaded_push_pop() { + loom::model(move || { + let queue = async_cpupool::tests::queue::bounded(1); + + let q2 = queue.clone(); + let h1 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.pop().await; + q2.pop().await; + }); + + drop(q2); + }); + + loom::future::block_on(async { + queue.push(()).await; + queue.push(()).await; + }); + + h1.join().unwrap(); + + assert_eq!(queue.len(), 0, "Should have popped both pushed items"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +} + +#[test] +fn multiple_threaded_push_pop_2() { + loom::model(|| { + let queue = async_cpupool::tests::queue::bounded(1); + + let q2 = queue.clone(); + let h1 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.push(()).await; + }); + + drop(q2); + }); + + let q2 = queue.clone(); + let h2 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.push(()).await; + }); + + drop(q2); + }); + + loom::future::block_on(async { + queue.pop().await; + queue.pop().await; + }); + + h1.join().unwrap(); + h2.join().unwrap(); + + assert_eq!(queue.len(), 0, "Should have popped both pushed items"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +} + +#[test] +fn multiple_threaded_push_pop_3() { + loom::model(|| { + let queue = async_cpupool::tests::queue::bounded(1); + + let q2 = queue.clone(); + let h1 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.push(()).await; + }); + + drop(q2); + }); + + let q2 = queue.clone(); + let h2 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.push(()).await; + }); + + drop(q2); + }); + + let q2 = queue.clone(); + let h3 = loom::thread::spawn(move || { + loom::future::block_on(async { + q2.pop().await; + }); + + drop(q2); + }); + + loom::future::block_on(async { + queue.pop().await; + }); + + h1.join().unwrap(); + h2.join().unwrap(); + h3.join().unwrap(); + + assert_eq!(queue.len(), 0, "Should have popped both pushed items"); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + }); +}