async-cpupool/tests/notify.rs

291 lines
7.2 KiB
Rust

#![cfg(loom)]
use async_cpupool::tests::notify::Notify;
use std::future::Future;
struct NoopWaker;
impl std::task::Wake for NoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
fn wake_by_ref(self: &std::sync::Arc<Self>) {}
}
fn noop_waker() -> std::task::Waker {
std::sync::Arc::new(NoopWaker).into()
}
#[test]
fn dropped_notified_listener() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let listener = notify.listen().await;
notify.notify_one();
drop(listener);
assert_eq!(
notify.token(),
1,
"Dropped notify should not have consumed token"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_dropped_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
drop(notify2.listen().await);
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.token(),
1,
"Dropped notify should not have consumed token"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn notified_listener() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen should be notified"
);
assert_eq!(
notify.token(),
0,
"Dropped notify should have consumed token"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_notified_listener() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
});
});
notify.notify_one();
handle.join().unwrap();
assert_eq!(
notify.token(),
0,
"Dropped notify should have consumed token"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn multiple_listeners() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Pending,
"Polled listen_2 should not be notified"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn multiple_notifies() {
loom::model(|| {
let notify = Notify::new();
loom::future::block_on(async {
let mut listener_1 = notify.listen().await;
let mut listener_2 = notify.listen().await;
notify.notify_one();
notify.notify_one();
assert_eq!(notify.token(), 0, "listeners should have consumed tokens");
let waker = noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
assert_eq!(
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_1 should be notified"
);
assert_eq!(
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
std::task::Poll::Ready(()),
"Polled listen_2 should be notified"
);
});
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}
#[test]
fn threaded_multiple_notifies() {
loom::model(|| {
let notify = loom::sync::Arc::new(Notify::new());
let notify2 = notify.clone();
let handle1 = loom::thread::spawn(move || {
loom::future::block_on(async move {
notify2.listen().await.await;
})
});
let notify2 = notify.clone();
let handle2 = loom::thread::spawn(move || {
loom::future::block_on(async {
notify2.listen().await.await;
});
drop(notify2);
});
notify.notify_one();
notify.notify_one();
handle1.join().unwrap();
handle2.join().unwrap();
assert_eq!(
notify.token(),
0,
"threaded notifies should have consumed tokens"
);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
1,
"Should have created one Notify"
);
drop(notify);
assert_eq!(
async_cpupool::tests::notify::notify_count(),
0,
"Should have dropped notify"
);
});
}