#![cfg(loom)] use async_cpupool::tests::notify::Notify; use std::future::Future; struct NoopWaker; impl std::task::Wake for NoopWaker { fn wake(self: std::sync::Arc) {} fn wake_by_ref(self: &std::sync::Arc) {} } fn noop_waker() -> std::task::Waker { std::sync::Arc::new(NoopWaker).into() } #[test] fn dropped_notified_listener() { loom::model(|| { let notify = Notify::new(); loom::future::block_on(async { let listener = notify.listen().await; notify.notify_one(); drop(listener); assert_eq!( notify.token(), 1, "Dropped notify should not have consumed token" ); }); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn threaded_dropped_notified_listener() { loom::model(|| { let notify = loom::sync::Arc::new(Notify::new()); let notify2 = notify.clone(); let handle = loom::thread::spawn(move || { loom::future::block_on(async move { drop(notify2.listen().await); }); }); notify.notify_one(); handle.join().unwrap(); assert_eq!( notify.token(), 1, "Dropped notify should not have consumed token" ); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn notified_listener() { loom::model(|| { let notify = Notify::new(); loom::future::block_on(async { let mut listener = notify.listen().await; notify.notify_one(); let waker = noop_waker(); let mut cx = std::task::Context::from_waker(&waker); assert_eq!( std::pin::Pin::new(&mut listener).poll(&mut cx), std::task::Poll::Ready(()), "Polled listen should be notified" ); assert_eq!( notify.token(), 0, "Dropped notify should have consumed token" ); }); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn threaded_notified_listener() { loom::model(|| { let notify = loom::sync::Arc::new(Notify::new()); let notify2 = notify.clone(); let handle = loom::thread::spawn(move || { loom::future::block_on(async move { notify2.listen().await.await; }); }); notify.notify_one(); handle.join().unwrap(); assert_eq!( notify.token(), 0, "Dropped notify should have consumed token" ); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn multiple_listeners() { loom::model(|| { let notify = Notify::new(); loom::future::block_on(async { let mut listener_1 = notify.listen().await; let mut listener_2 = notify.listen().await; notify.notify_one(); let waker = noop_waker(); let mut cx = std::task::Context::from_waker(&waker); assert_eq!( std::pin::Pin::new(&mut listener_1).poll(&mut cx), std::task::Poll::Ready(()), "Polled listen_1 should be notified" ); assert_eq!( std::pin::Pin::new(&mut listener_2).poll(&mut cx), std::task::Poll::Pending, "Polled listen_2 should not be notified" ); }); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn multiple_notifies() { loom::model(|| { let notify = Notify::new(); loom::future::block_on(async { let mut listener_1 = notify.listen().await; let mut listener_2 = notify.listen().await; notify.notify_one(); notify.notify_one(); assert_eq!(notify.token(), 0, "listeners should have consumed tokens"); let waker = noop_waker(); let mut cx = std::task::Context::from_waker(&waker); assert_eq!( std::pin::Pin::new(&mut listener_1).poll(&mut cx), std::task::Poll::Ready(()), "Polled listen_1 should be notified" ); assert_eq!( std::pin::Pin::new(&mut listener_2).poll(&mut cx), std::task::Poll::Ready(()), "Polled listen_2 should be notified" ); }); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); } #[test] fn threaded_multiple_notifies() { loom::model(|| { let notify = loom::sync::Arc::new(Notify::new()); let notify2 = notify.clone(); let handle1 = loom::thread::spawn(move || { loom::future::block_on(async move { notify2.listen().await.await; }) }); let notify2 = notify.clone(); let handle2 = loom::thread::spawn(move || { loom::future::block_on(async { notify2.listen().await.await; }); drop(notify2); }); notify.notify_one(); notify.notify_one(); handle1.join().unwrap(); handle2.join().unwrap(); assert_eq!( notify.token(), 0, "threaded notifies should have consumed tokens" ); assert_eq!( async_cpupool::tests::notify::notify_count(), 1, "Should have created one Notify" ); drop(notify); assert_eq!( async_cpupool::tests::notify::notify_count(), 0, "Should have dropped notify" ); }); }