Compare commits

...

3 commits

Author SHA1 Message Date
asonix 2f3d973338 Add loom to CI
All checks were successful
/ clippy (push) Successful in 19s
/ loom (notify) (push) Successful in 27s
/ loom (queue) (push) Successful in 36s
/ check (aarch64-unknown-linux-musl) (push) Successful in 11s
/ check (x86_64-unknown-linux-musl) (push) Successful in 13s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 13s
/ tests (push) Successful in 19s
2024-04-17 13:10:06 -05:00
asonix e4cc505951 Try pushing and popping before acquiring a listener 2024-04-17 13:05:38 -05:00
asonix b48d7df004 Move notify tests to tests directory 2024-04-17 13:05:12 -05:00
7 changed files with 427 additions and 293 deletions

View file

@ -38,6 +38,30 @@ jobs:
name: Test
run: cargo test
loom:
strategy:
fail-fast: false
matrix:
suite:
- queue
- notify
runs-on: docker
container:
image: docker.io/asonix/actions-base-image:0.1
steps:
-
name: Checkout async-cpupool
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Loom
run: cargo test --release --test ${{ matrix.suite }}
env:
RUSTFLAGS: '--cfg loom'
LOOM_MAX_PREEMPTIONS: '2'
check:
strategy:
fail-fast: false

View file

@ -35,6 +35,30 @@ jobs:
name: Test
run: cargo test
loom:
strategy:
fail-fast: false
matrix:
suite:
- queue
- notify
runs-on: docker
container:
image: docker.io/asonix/actions-base-image:0.1
steps:
-
name: Checkout async-cpupool
uses: https://github.com/actions/checkout@v4
-
name: Cargo Cache
uses: https://git.asonix.dog/asonix/actions/cache-rust-dependencies@main
-
name: Loom
run: cargo test --release --test ${{ matrix.suite }}
env:
RUSTFLAGS: '--cfg loom'
LOOM_MAX_PREEMPTIONS: '2'
build:
needs:
- clippy

View file

@ -29,6 +29,11 @@ pub mod tests {
pub mod queue {
pub use crate::queue::{bounded, queue_count, Queue};
}
#[doc(hidden)]
pub mod notify {
pub use crate::notify::{notify_count, Listener, Notify};
}
}
/// Configuration builder for the CpuPool

View file

@ -6,6 +6,29 @@ use std::{
use crate::sync::{Arc, AtomicU8, Mutex, Ordering};
thread_local! {
#[cfg(any(loom, test))]
static NOTIFY_COUNT: std::cell::RefCell<std::num::Wrapping<u64>> = std::cell::RefCell::new(std::num::Wrapping(0));
}
#[inline(always)]
fn increment_notify_count() {
#[cfg(any(loom, test))]
NOTIFY_COUNT.with_borrow_mut(|v| *v += 1);
}
#[inline(always)]
fn decrement_notify_count() {
#[cfg(any(loom, test))]
NOTIFY_COUNT.with_borrow_mut(|v| *v -= 1);
}
#[cfg(any(test, loom))]
#[doc(hidden)]
pub fn notify_count() -> u64 {
NOTIFY_COUNT.with_borrow(|v| v.0)
}
const UNNOTIFIED: u8 = 0b0000;
const NOTIFIED_ONE: u8 = 0b0001;
const RESOLVED: u8 = 0b0010;
@ -13,7 +36,8 @@ const RESOLVED: u8 = 0b0010;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct NotifyId(u64);
pub(super) struct Notify {
#[doc(hidden)]
pub struct Notify {
state: Mutex<NotifyState>,
}
@ -23,7 +47,8 @@ struct NotifyState {
next_id: u64,
}
pub(super) struct Listener<'a> {
#[doc(hidden)]
pub struct Listener<'a> {
state: &'a Mutex<NotifyState>,
waker: Waker,
woken: Arc<AtomicU8>,
@ -31,7 +56,9 @@ pub(super) struct Listener<'a> {
}
impl Notify {
pub(super) fn new() -> Self {
#[doc(hidden)]
pub fn new() -> Self {
increment_notify_count();
metrics::counter!("async-cpupool.notify.created").increment(1);
Notify {
@ -44,10 +71,17 @@ impl Notify {
}
// although this is an async fn, it is not capable of yielding to the executor
pub(super) async fn listen(&self) -> Listener<'_> {
#[doc(hidden)]
pub async fn listen(&self) -> Listener<'_> {
poll_fn(|cx| Poll::Ready(self.make_listener(cx.waker().clone()))).await
}
#[doc(hidden)]
#[cfg(any(loom, test))]
pub fn token(&self) -> u64 {
self.state.lock().unwrap().token
}
pub(super) fn make_listener(&self, waker: Waker) -> Listener<'_> {
let (id, woken) = self
.state
@ -63,7 +97,8 @@ impl Notify {
}
}
pub(super) fn notify_one(&self) {
#[doc(hidden)]
pub fn notify_one(&self) {
self.state.lock().expect("not poisoned").notify_one();
}
}
@ -146,6 +181,7 @@ impl NotifyState {
impl Drop for Notify {
fn drop(&mut self) {
decrement_notify_count();
metrics::counter!("async-cpupool.notify.dropped").increment(1);
}
}
@ -208,213 +244,3 @@ impl Future for Listener<'_> {
Poll::Pending
}
}
#[cfg(all(test, loom))]
mod tests {
use super::Notify;
use std::future::Future;
struct NoopWaker;
impl std::task::Wake for NoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
fn wake_by_ref(self: &std::sync::Arc<Self>) {}
}
fn noop_waker() -> std::task::Waker {
std::sync::Arc::new(NoopWaker).into()
}
#[test]
fn dropped_notified_listener() {
loom::model(|| {
loom::future::block_on(async {
let notify = Notify::new();
let listener = notify.listen().await;
notify.notify_one();
drop(listener);
assert_eq!(
notify.state.lock().unwrap().token,
1,
"Dropped notify should not have consumed token"
);
});
});
}
#[test]
fn threaded_dropped_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
drop(notify2.listen().await);
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.state.lock().unwrap().token,
1,
"Dropped notify should not have consumed token"
);
});
}
#[test]
fn notified_listener() {
loom::model(|| {
loom::future::block_on(async {
let notify = Notify::new();
let mut listener = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen should be notified"
);
assert_eq!(
notify.state.lock().unwrap().token,
0,
"Dropped notify should have consumed token"
);
})
});
}
#[test]
fn threaded_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.state.lock().unwrap().token,
0,
"Dropped notify should have consumed token"
);
});
}
#[test]
fn multiple_listeners() {
loom::model(|| {
loom::future::block_on(async {
let notify = Notify::new();
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Pending,
"Polled listen_2 should not be notified"
);
});
});
}
#[test]
fn multiple_notifies() {
loom::model(|| {
loom::future::block_on(async {
let notify = Notify::new();
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_2 should be notified"
);
assert_eq!(
notify.state.lock().unwrap().token,
0,
"notifies should have consumed tokens"
);
});
});
}
#[test]
fn threaded_multiple_notifies() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle1 = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
})
});
let notify2 = notify.clone();
let handle2 = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
})
});
notify.notify_one();
notify.notify_one();
handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(
notify.state.lock().unwrap().token,
0,
"threaded notifies should have consumed tokens"
);
});
}
}

View file

@ -76,6 +76,12 @@ impl<T> Queue<T> {
#[doc(hidden)]
pub async fn push(&self, mut item: T) {
let Some(returned_item) = self.try_push(item) else {
return;
};
item = returned_item;
loop {
let listener = self.inner.push_notify.listen().await;
@ -114,6 +120,10 @@ impl<T> Queue<T> {
#[doc(hidden)]
pub async fn pop(&self) -> T {
if let Some(item) = self.try_pop() {
return item;
}
loop {
let listener = self.inner.pop_notify.listen().await;

290
tests/notify.rs Normal file
View file

@ -0,0 +1,290 @@
#![cfg(loom)]
use async_cpupool::tests::notify::Notify;
use std::future::Future;
struct NoopWaker;
impl std::task::Wake for NoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
fn wake_by_ref(self: &std::sync::Arc<Self>) {}
}
fn noop_waker() -> std::task::Waker {
std::sync::Arc::new(NoopWaker).into()
}
#[test]
fn dropped_notified_listener() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let listener = notify.listen().await;
notify.notify_one();
drop(listener);
assert_eq!(
notify.token(),
1,
"Dropped notify should not have consumed token"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_dropped_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
drop(notify2.listen().await);
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.token(),
1,
"Dropped notify should not have consumed token"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn notified_listener() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen should be notified"
);
assert_eq!(
notify.token(),
0,
"Dropped notify should have consumed token"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.token(),
0,
"Dropped notify should have consumed token"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn multiple_listeners() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Pending,
"Polled listen_2 should not be notified"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn multiple_notifies() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
notify.notify_one();
assert_eq!(notify.token(), 0, "listeners should have consumed tokens");
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_2 should be notified"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_multiple_notifies() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle1 = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
})
});
let notify2 = notify.clone();
let handle2 = loom::thread::spawn(move || {
loom::future::block_on(async {
notify2.listen().await.await;
});
drop(notify2);
});
notify.notify_one();
notify.notify_one();
handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(
notify.token(),
0,
"threaded notifies should have consumed tokens"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}

View file

@ -1,5 +1,32 @@
#![cfg(loom)]
fn assert_counts<T>(queue: async_cpupool::tests::queue::Queue<T>) {
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
assert!(
async_cpupool::tests::notify::notify_count() > 0,
"Should have created notifies"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notifies"
);
}
#[test]
fn push_pop() {
loom::model(|| {
@ -17,19 +44,7 @@ fn push_pop() {
assert!(queue.try_push(()).is_none(), "Failed to push item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}
@ -45,19 +60,7 @@ fn async_push_pop() {
assert_eq!(queue.len(), 0, "Should have popped pushed item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}
@ -80,19 +83,7 @@ fn threaded_push_pop() {
assert_eq!(queue.len(), 0, "Should have popped pushed item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}
@ -120,19 +111,7 @@ fn multiple_threaded_push_pop() {
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}
@ -169,19 +148,7 @@ fn multiple_threaded_push_pop_2() {
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}
@ -227,18 +194,6 @@ fn multiple_threaded_push_pop_3() {
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
assert_counts(queue);
});
}