diff --git a/src/lib.rs b/src/lib.rs index 6bdd21b..e331dfd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,11 @@ pub mod tests { pub mod queue { pub use crate::queue::{bounded, queue_count, Queue}; } + + #[doc(hidden)] + pub mod notify { + pub use crate::notify::{notify_count, Listener, Notify}; + } } /// Configuration builder for the CpuPool diff --git a/src/notify.rs b/src/notify.rs index 1017ac9..0ad3fc4 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -6,6 +6,29 @@ use std::{ use crate::sync::{Arc, AtomicU8, Mutex, Ordering}; +thread_local! { + #[cfg(any(loom, test))] + static NOTIFY_COUNT: std::cell::RefCell> = std::cell::RefCell::new(std::num::Wrapping(0)); +} + +#[inline(always)] +fn increment_notify_count() { + #[cfg(any(loom, test))] + NOTIFY_COUNT.with_borrow_mut(|v| *v += 1); +} + +#[inline(always)] +fn decrement_notify_count() { + #[cfg(any(loom, test))] + NOTIFY_COUNT.with_borrow_mut(|v| *v -= 1); +} + +#[cfg(any(test, loom))] +#[doc(hidden)] +pub fn notify_count() -> u64 { + NOTIFY_COUNT.with_borrow(|v| v.0) +} + const UNNOTIFIED: u8 = 0b0000; const NOTIFIED_ONE: u8 = 0b0001; const RESOLVED: u8 = 0b0010; @@ -13,7 +36,8 @@ const RESOLVED: u8 = 0b0010; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct NotifyId(u64); -pub(super) struct Notify { +#[doc(hidden)] +pub struct Notify { state: Mutex, } @@ -23,7 +47,8 @@ struct NotifyState { next_id: u64, } -pub(super) struct Listener<'a> { +#[doc(hidden)] +pub struct Listener<'a> { state: &'a Mutex, waker: Waker, woken: Arc, @@ -31,7 +56,9 @@ pub(super) struct Listener<'a> { } impl Notify { - pub(super) fn new() -> Self { + #[doc(hidden)] + pub fn new() -> Self { + increment_notify_count(); metrics::counter!("async-cpupool.notify.created").increment(1); Notify { @@ -44,10 +71,17 @@ impl Notify { } // although this is an async fn, it is not capable of yielding to the executor - pub(super) async fn listen(&self) -> Listener<'_> { + #[doc(hidden)] + pub async fn listen(&self) -> Listener<'_> { poll_fn(|cx| Poll::Ready(self.make_listener(cx.waker().clone()))).await } + #[doc(hidden)] + #[cfg(any(loom, test))] + pub fn token(&self) -> u64 { + self.state.lock().unwrap().token + } + pub(super) fn make_listener(&self, waker: Waker) -> Listener<'_> { let (id, woken) = self .state @@ -63,7 +97,8 @@ impl Notify { } } - pub(super) fn notify_one(&self) { + #[doc(hidden)] + pub fn notify_one(&self) { self.state.lock().expect("not poisoned").notify_one(); } } @@ -146,6 +181,7 @@ impl NotifyState { impl Drop for Notify { fn drop(&mut self) { + decrement_notify_count(); metrics::counter!("async-cpupool.notify.dropped").increment(1); } } @@ -208,213 +244,3 @@ impl Future for Listener<'_> { Poll::Pending } } - -#[cfg(all(test, loom))] -mod tests { - use super::Notify; - use std::future::Future; - - struct NoopWaker; - - impl std::task::Wake for NoopWaker { - fn wake(self: std::sync::Arc) {} - fn wake_by_ref(self: &std::sync::Arc) {} - } - - fn noop_waker() -> std::task::Waker { - std::sync::Arc::new(NoopWaker).into() - } - - #[test] - fn dropped_notified_listener() { - loom::model(|| { - loom::future::block_on(async { - let notify = Notify::new(); - - let listener = notify.listen().await; - - notify.notify_one(); - drop(listener); - - assert_eq!( - notify.state.lock().unwrap().token, - 1, - "Dropped notify should not have consumed token" - ); - }); - }); - } - - #[test] - fn threaded_dropped_notified_listener() { - loom::model(|| { - let notify = loom::sync::Arc::new(Notify::new()); - - let notify2 = notify.clone(); - let handle = loom::thread::spawn(move || { - loom::future::block_on(async move { - drop(notify2.listen().await); - }); - }); - - notify.notify_one(); - - handle.join().unwrap(); - - assert_eq!( - notify.state.lock().unwrap().token, - 1, - "Dropped notify should not have consumed token" - ); - }); - } - - #[test] - fn notified_listener() { - loom::model(|| { - loom::future::block_on(async { - let notify = Notify::new(); - - let mut listener = notify.listen().await; - - notify.notify_one(); - - let waker = noop_waker(); - let mut cx = std::task::Context::from_waker(&waker); - - assert_eq!( - std::pin::Pin::new(&mut listener).poll(&mut cx), - std::task::Poll::Ready(()), - "Polled listen should be notified" - ); - assert_eq!( - notify.state.lock().unwrap().token, - 0, - "Dropped notify should have consumed token" - ); - }) - }); - } - - #[test] - fn threaded_notified_listener() { - loom::model(|| { - let notify = loom::sync::Arc::new(Notify::new()); - - let notify2 = notify.clone(); - let handle = loom::thread::spawn(move || { - loom::future::block_on(async move { - notify2.listen().await.await; - }); - }); - - notify.notify_one(); - - handle.join().unwrap(); - - assert_eq!( - notify.state.lock().unwrap().token, - 0, - "Dropped notify should have consumed token" - ); - }); - } - - #[test] - fn multiple_listeners() { - loom::model(|| { - loom::future::block_on(async { - let notify = Notify::new(); - - let mut listener_1 = notify.listen().await; - let mut listener_2 = notify.listen().await; - - notify.notify_one(); - - let waker = noop_waker(); - let mut cx = std::task::Context::from_waker(&waker); - - assert_eq!( - std::pin::Pin::new(&mut listener_1).poll(&mut cx), - std::task::Poll::Ready(()), - "Polled listen_1 should be notified" - ); - - assert_eq!( - std::pin::Pin::new(&mut listener_2).poll(&mut cx), - std::task::Poll::Pending, - "Polled listen_2 should not be notified" - ); - }); - }); - } - - #[test] - fn multiple_notifies() { - loom::model(|| { - loom::future::block_on(async { - let notify = Notify::new(); - - let mut listener_1 = notify.listen().await; - let mut listener_2 = notify.listen().await; - - notify.notify_one(); - notify.notify_one(); - - let waker = noop_waker(); - let mut cx = std::task::Context::from_waker(&waker); - - assert_eq!( - std::pin::Pin::new(&mut listener_1).poll(&mut cx), - std::task::Poll::Ready(()), - "Polled listen_1 should be notified" - ); - - assert_eq!( - std::pin::Pin::new(&mut listener_2).poll(&mut cx), - std::task::Poll::Ready(()), - "Polled listen_2 should be notified" - ); - - assert_eq!( - notify.state.lock().unwrap().token, - 0, - "notifies should have consumed tokens" - ); - }); - }); - } - - #[test] - fn threaded_multiple_notifies() { - loom::model(|| { - let notify = loom::sync::Arc::new(Notify::new()); - - let notify2 = notify.clone(); - let handle1 = loom::thread::spawn(move || { - loom::future::block_on(async move { - notify2.listen().await.await; - }) - }); - - let notify2 = notify.clone(); - let handle2 = loom::thread::spawn(move || { - loom::future::block_on(async move { - notify2.listen().await.await; - }) - }); - - notify.notify_one(); - notify.notify_one(); - - handle1.join().unwrap(); - handle2.join().unwrap(); - - assert_eq!( - notify.state.lock().unwrap().token, - 0, - "threaded notifies should have consumed tokens" - ); - }); - } -} diff --git a/tests/notify.rs b/tests/notify.rs new file mode 100644 index 0000000..4e624c0 --- /dev/null +++ b/tests/notify.rs @@ -0,0 +1,290 @@ +#![cfg(loom)] + +use async_cpupool::tests::notify::Notify; +use std::future::Future; + +struct NoopWaker; + +impl std::task::Wake for NoopWaker { + fn wake(self: std::sync::Arc) {} + fn wake_by_ref(self: &std::sync::Arc) {} +} + +fn noop_waker() -> std::task::Waker { + std::sync::Arc::new(NoopWaker).into() +} + +#[test] +fn dropped_notified_listener() { + loom::model(|| { + let notify = Notify::new(); + + loom::future::block_on(async { + let listener = notify.listen().await; + + notify.notify_one(); + drop(listener); + + assert_eq!( + notify.token(), + 1, + "Dropped notify should not have consumed token" + ); + }); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn threaded_dropped_notified_listener() { + loom::model(|| { + let notify = loom::sync::Arc::new(Notify::new()); + + let notify2 = notify.clone(); + let handle = loom::thread::spawn(move || { + loom::future::block_on(async move { + drop(notify2.listen().await); + }); + }); + + notify.notify_one(); + + handle.join().unwrap(); + + assert_eq!( + notify.token(), + 1, + "Dropped notify should not have consumed token" + ); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn notified_listener() { + loom::model(|| { + let notify = Notify::new(); + + loom::future::block_on(async { + let mut listener = notify.listen().await; + + notify.notify_one(); + + let waker = noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + assert_eq!( + std::pin::Pin::new(&mut listener).poll(&mut cx), + std::task::Poll::Ready(()), + "Polled listen should be notified" + ); + assert_eq!( + notify.token(), + 0, + "Dropped notify should have consumed token" + ); + }); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn threaded_notified_listener() { + loom::model(|| { + let notify = loom::sync::Arc::new(Notify::new()); + + let notify2 = notify.clone(); + let handle = loom::thread::spawn(move || { + loom::future::block_on(async move { + notify2.listen().await.await; + }); + }); + + notify.notify_one(); + + handle.join().unwrap(); + + assert_eq!( + notify.token(), + 0, + "Dropped notify should have consumed token" + ); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn multiple_listeners() { + loom::model(|| { + let notify = Notify::new(); + + loom::future::block_on(async { + let mut listener_1 = notify.listen().await; + let mut listener_2 = notify.listen().await; + + notify.notify_one(); + + let waker = noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + assert_eq!( + std::pin::Pin::new(&mut listener_1).poll(&mut cx), + std::task::Poll::Ready(()), + "Polled listen_1 should be notified" + ); + + assert_eq!( + std::pin::Pin::new(&mut listener_2).poll(&mut cx), + std::task::Poll::Pending, + "Polled listen_2 should not be notified" + ); + }); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn multiple_notifies() { + loom::model(|| { + let notify = Notify::new(); + + loom::future::block_on(async { + let mut listener_1 = notify.listen().await; + let mut listener_2 = notify.listen().await; + + notify.notify_one(); + notify.notify_one(); + + assert_eq!(notify.token(), 0, "listeners should have consumed tokens"); + + let waker = noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + assert_eq!( + std::pin::Pin::new(&mut listener_1).poll(&mut cx), + std::task::Poll::Ready(()), + "Polled listen_1 should be notified" + ); + + assert_eq!( + std::pin::Pin::new(&mut listener_2).poll(&mut cx), + std::task::Poll::Ready(()), + "Polled listen_2 should be notified" + ); + }); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} + +#[test] +fn threaded_multiple_notifies() { + loom::model(|| { + let notify = loom::sync::Arc::new(Notify::new()); + + let notify2 = notify.clone(); + let handle1 = loom::thread::spawn(move || { + loom::future::block_on(async move { + notify2.listen().await.await; + }) + }); + + let notify2 = notify.clone(); + let handle2 = loom::thread::spawn(move || { + loom::future::block_on(async { + notify2.listen().await.await; + }); + + drop(notify2); + }); + + notify.notify_one(); + notify.notify_one(); + + handle1.join().unwrap(); + handle2.join().unwrap(); + + assert_eq!( + notify.token(), + 0, + "threaded notifies should have consumed tokens" + ); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 1, + "Should have created one Notify" + ); + drop(notify); + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notify" + ); + }); +} diff --git a/tests/queue.rs b/tests/queue.rs index f20a519..6d7223b 100644 --- a/tests/queue.rs +++ b/tests/queue.rs @@ -1,5 +1,32 @@ #![cfg(loom)] +fn assert_counts(queue: async_cpupool::tests::queue::Queue) { + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 1, + "Should have created one queue" + ); + + assert!( + async_cpupool::tests::notify::notify_count() > 0, + "Should have created notifies" + ); + + drop(queue); + + assert_eq!( + async_cpupool::tests::queue::queue_count(), + 0, + "Should have dropped queue" + ); + + assert_eq!( + async_cpupool::tests::notify::notify_count(), + 0, + "Should have dropped notifies" + ); +} + #[test] fn push_pop() { loom::model(|| { @@ -17,19 +44,7 @@ fn push_pop() { assert!(queue.try_push(()).is_none(), "Failed to push item"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); } @@ -45,19 +60,7 @@ fn async_push_pop() { assert_eq!(queue.len(), 0, "Should have popped pushed item"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); } @@ -80,19 +83,7 @@ fn threaded_push_pop() { assert_eq!(queue.len(), 0, "Should have popped pushed item"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); } @@ -120,19 +111,7 @@ fn multiple_threaded_push_pop() { assert_eq!(queue.len(), 0, "Should have popped both pushed items"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); } @@ -169,19 +148,7 @@ fn multiple_threaded_push_pop_2() { assert_eq!(queue.len(), 0, "Should have popped both pushed items"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); } @@ -227,18 +194,6 @@ fn multiple_threaded_push_pop_3() { assert_eq!(queue.len(), 0, "Should have popped both pushed items"); - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 1, - "Should have created one queue" - ); - - drop(queue); - - assert_eq!( - async_cpupool::tests::queue::queue_count(), - 0, - "Should have dropped queue" - ); + assert_counts(queue); }); }