Compare commits
2 commits
ce38df5b41
...
b05eded986
Author | SHA1 | Date | |
---|---|---|---|
asonix | b05eded986 | ||
asonix | 0795399da5 |
|
@ -16,6 +16,9 @@ edition = "2021"
|
|||
metrics = "0.22.0"
|
||||
tracing = "0.1.40"
|
||||
|
||||
[dev-dependencies]
|
||||
[target.'cfg(loom)'.dependencies]
|
||||
loom = { version = "0.7", features = ["futures"] }
|
||||
|
||||
[target.'cfg(not(loom))'.dev-dependencies]
|
||||
smol = "2.0.0"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
#[cfg(not(loom))]
|
||||
use std::time::Duration;
|
||||
#[cfg(not(loom))]
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
|
||||
#[cfg(not(loom))]
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt::layer())
|
||||
|
@ -52,3 +55,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(loom)]
|
||||
fn main() {}
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
nativeBuildInputs = [ cargo cargo-outdated clippy rust-analyzer rustc rustfmt ];
|
||||
|
||||
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
|
||||
|
||||
LOOM_MAX_PREEMPTIONS = "2";
|
||||
};
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::Arc;
|
||||
use crate::sync::Arc;
|
||||
|
||||
use crate::notify::Notify;
|
||||
|
||||
|
|
10
src/lib.rs
10
src/lib.rs
|
@ -7,6 +7,7 @@ mod notify;
|
|||
mod queue;
|
||||
mod selector;
|
||||
mod spsc;
|
||||
mod sync;
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
|
@ -21,6 +22,15 @@ use executor::block_on;
|
|||
use queue::Queue;
|
||||
use selector::select;
|
||||
|
||||
#[cfg(any(loom, test))]
|
||||
#[doc(hidden)]
|
||||
pub mod tests {
|
||||
#[doc(hidden)]
|
||||
pub mod queue {
|
||||
pub use crate::queue::{bounded, queue_count, Queue};
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration builder for the CpuPool
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
|
|
216
src/notify.rs
216
src/notify.rs
|
@ -1,13 +1,11 @@
|
|||
use std::{
|
||||
collections::VecDeque,
|
||||
future::{poll_fn, Future},
|
||||
sync::{
|
||||
atomic::{AtomicU8, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
|
||||
use crate::sync::{Arc, AtomicU8, Mutex, Ordering};
|
||||
|
||||
const UNNOTIFIED: u8 = 0b0000;
|
||||
const NOTIFIED_ONE: u8 = 0b0001;
|
||||
const RESOLVED: u8 = 0b0010;
|
||||
|
@ -210,3 +208,213 @@ impl Future for Listener<'_> {
|
|||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, loom))]
|
||||
mod tests {
|
||||
use super::Notify;
|
||||
use std::future::Future;
|
||||
|
||||
struct NoopWaker;
|
||||
|
||||
impl std::task::Wake for NoopWaker {
|
||||
fn wake(self: std::sync::Arc<Self>) {}
|
||||
fn wake_by_ref(self: &std::sync::Arc<Self>) {}
|
||||
}
|
||||
|
||||
fn noop_waker() -> std::task::Waker {
|
||||
std::sync::Arc::new(NoopWaker).into()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dropped_notified_listener() {
|
||||
loom::model(|| {
|
||||
loom::future::block_on(async {
|
||||
let notify = Notify::new();
|
||||
|
||||
let listener = notify.listen().await;
|
||||
|
||||
notify.notify_one();
|
||||
drop(listener);
|
||||
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
1,
|
||||
"Dropped notify should not have consumed token"
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threaded_dropped_notified_listener() {
|
||||
loom::model(|| {
|
||||
let notify = loom::sync::Arc::new(Notify::new());
|
||||
|
||||
let notify2 = notify.clone();
|
||||
let handle = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async move {
|
||||
drop(notify2.listen().await);
|
||||
});
|
||||
});
|
||||
|
||||
notify.notify_one();
|
||||
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
1,
|
||||
"Dropped notify should not have consumed token"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn notified_listener() {
|
||||
loom::model(|| {
|
||||
loom::future::block_on(async {
|
||||
let notify = Notify::new();
|
||||
|
||||
let mut listener = notify.listen().await;
|
||||
|
||||
notify.notify_one();
|
||||
|
||||
let waker = noop_waker();
|
||||
let mut cx = std::task::Context::from_waker(&waker);
|
||||
|
||||
assert_eq!(
|
||||
std::pin::Pin::new(&mut listener).poll(&mut cx),
|
||||
std::task::Poll::Ready(()),
|
||||
"Polled listen should be notified"
|
||||
);
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
0,
|
||||
"Dropped notify should have consumed token"
|
||||
);
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threaded_notified_listener() {
|
||||
loom::model(|| {
|
||||
let notify = loom::sync::Arc::new(Notify::new());
|
||||
|
||||
let notify2 = notify.clone();
|
||||
let handle = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async move {
|
||||
notify2.listen().await.await;
|
||||
});
|
||||
});
|
||||
|
||||
notify.notify_one();
|
||||
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
0,
|
||||
"Dropped notify should have consumed token"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_listeners() {
|
||||
loom::model(|| {
|
||||
loom::future::block_on(async {
|
||||
let notify = Notify::new();
|
||||
|
||||
let mut listener_1 = notify.listen().await;
|
||||
let mut listener_2 = notify.listen().await;
|
||||
|
||||
notify.notify_one();
|
||||
|
||||
let waker = noop_waker();
|
||||
let mut cx = std::task::Context::from_waker(&waker);
|
||||
|
||||
assert_eq!(
|
||||
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
|
||||
std::task::Poll::Ready(()),
|
||||
"Polled listen_1 should be notified"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
|
||||
std::task::Poll::Pending,
|
||||
"Polled listen_2 should not be notified"
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_notifies() {
|
||||
loom::model(|| {
|
||||
loom::future::block_on(async {
|
||||
let notify = Notify::new();
|
||||
|
||||
let mut listener_1 = notify.listen().await;
|
||||
let mut listener_2 = notify.listen().await;
|
||||
|
||||
notify.notify_one();
|
||||
notify.notify_one();
|
||||
|
||||
let waker = noop_waker();
|
||||
let mut cx = std::task::Context::from_waker(&waker);
|
||||
|
||||
assert_eq!(
|
||||
std::pin::Pin::new(&mut listener_1).poll(&mut cx),
|
||||
std::task::Poll::Ready(()),
|
||||
"Polled listen_1 should be notified"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
std::pin::Pin::new(&mut listener_2).poll(&mut cx),
|
||||
std::task::Poll::Ready(()),
|
||||
"Polled listen_2 should be notified"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
0,
|
||||
"notifies should have consumed tokens"
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threaded_multiple_notifies() {
|
||||
loom::model(|| {
|
||||
let notify = loom::sync::Arc::new(Notify::new());
|
||||
|
||||
let notify2 = notify.clone();
|
||||
let handle1 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async move {
|
||||
notify2.listen().await.await;
|
||||
})
|
||||
});
|
||||
|
||||
let notify2 = notify.clone();
|
||||
let handle2 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async move {
|
||||
notify2.listen().await.await;
|
||||
})
|
||||
});
|
||||
|
||||
notify.notify_one();
|
||||
notify.notify_one();
|
||||
|
||||
handle1.join().unwrap();
|
||||
handle2.join().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
notify.state.lock().unwrap().token,
|
||||
0,
|
||||
"threaded notifies should have consumed tokens"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
80
src/queue.rs
80
src/queue.rs
|
@ -1,15 +1,38 @@
|
|||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::notify::Notify;
|
||||
use crate::sync::{Arc, Mutex};
|
||||
|
||||
pub(super) fn bounded<T>(capacity: usize) -> Queue<T> {
|
||||
thread_local! {
|
||||
#[cfg(any(loom, test))]
|
||||
static QUEUE_COUNT: std::cell::RefCell<std::num::Wrapping<u64>> = std::cell::RefCell::new(std::num::Wrapping(0));
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn increment_queue_count() {
|
||||
#[cfg(any(loom, test))]
|
||||
QUEUE_COUNT.with_borrow_mut(|v| *v += 1);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn decrement_queue_count() {
|
||||
#[cfg(any(loom, test))]
|
||||
QUEUE_COUNT.with_borrow_mut(|v| *v -= 1);
|
||||
}
|
||||
|
||||
#[cfg(any(test, loom))]
|
||||
#[doc(hidden)]
|
||||
pub fn queue_count() -> u64 {
|
||||
QUEUE_COUNT.with_borrow(|v| v.0)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn bounded<T>(capacity: usize) -> Queue<T> {
|
||||
Queue::bounded(capacity)
|
||||
}
|
||||
|
||||
pub(crate) struct Queue<T> {
|
||||
#[doc(hidden)]
|
||||
pub struct Queue<T> {
|
||||
inner: Arc<QueueState<T>>,
|
||||
capacity: usize,
|
||||
}
|
||||
|
@ -21,8 +44,11 @@ struct QueueState<T> {
|
|||
}
|
||||
|
||||
impl<T> Queue<T> {
|
||||
pub(super) fn bounded(capacity: usize) -> Self {
|
||||
#[doc(hidden)]
|
||||
pub fn bounded(capacity: usize) -> Self {
|
||||
increment_queue_count();
|
||||
metrics::counter!("async-cpupool.queue.created").increment(1);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(QueueState {
|
||||
queue: Mutex::new(VecDeque::new()),
|
||||
|
@ -33,7 +59,8 @@ impl<T> Queue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn len(&self) -> usize {
|
||||
#[doc(hidden)]
|
||||
pub fn len(&self) -> usize {
|
||||
self.inner.queue.lock().expect("not poisoned").len()
|
||||
}
|
||||
|
||||
|
@ -47,21 +74,23 @@ impl<T> Queue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn push(&self, mut item: T) {
|
||||
#[doc(hidden)]
|
||||
pub async fn push(&self, mut item: T) {
|
||||
loop {
|
||||
let listener = self.inner.push_notify.listen().await;
|
||||
|
||||
if let Some(returned_item) = self.try_push(item) {
|
||||
item = returned_item;
|
||||
|
||||
listener.await;
|
||||
} else {
|
||||
let Some(returned_item) = self.try_push(item) else {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
item = returned_item;
|
||||
|
||||
listener.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_push(&self, item: T) -> Option<T> {
|
||||
#[doc(hidden)]
|
||||
pub fn try_push(&self, item: T) -> Option<T> {
|
||||
match self.try_push_impl(item) {
|
||||
Some(item) => Some(item),
|
||||
None => {
|
||||
|
@ -83,13 +112,12 @@ impl<T> Queue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) async fn pop(&self) -> T {
|
||||
#[doc(hidden)]
|
||||
pub async fn pop(&self) -> T {
|
||||
loop {
|
||||
let listener = self.inner.pop_notify.listen().await;
|
||||
|
||||
if let Some(item) = self.try_pop() {
|
||||
self.inner.push_notify.notify_one();
|
||||
metrics::counter!("async-cpupool.queue.popped").increment(1);
|
||||
return item;
|
||||
}
|
||||
|
||||
|
@ -97,7 +125,16 @@ impl<T> Queue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn try_pop(&self) -> Option<T> {
|
||||
#[doc(hidden)]
|
||||
pub fn try_pop(&self) -> Option<T> {
|
||||
let item = self.try_pop_impl()?;
|
||||
self.inner.push_notify.notify_one();
|
||||
metrics::counter!("async-cpupool.queue.popped").increment(1);
|
||||
|
||||
Some(item)
|
||||
}
|
||||
|
||||
fn try_pop_impl(&self) -> Option<T> {
|
||||
self.inner.queue.lock().expect("not poisoned").pop_front()
|
||||
}
|
||||
}
|
||||
|
@ -111,8 +148,9 @@ impl<T> Clone for Queue<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Queue<T> {
|
||||
impl<T> Drop for QueueState<T> {
|
||||
fn drop(&mut self) {
|
||||
decrement_queue_count();
|
||||
metrics::counter!("async-cpupool.queue.dropped").increment(1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll, Wake, Waker},
|
||||
};
|
||||
|
||||
use crate::sync::{Arc, AtomicBool, Ordering};
|
||||
|
||||
pub(super) enum Either<L, R> {
|
||||
Left(L),
|
||||
Right(R),
|
||||
|
@ -27,16 +25,16 @@ struct SelectWaker {
|
|||
}
|
||||
|
||||
impl Wake for SelectWaker {
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
fn wake_by_ref(self: &std::sync::Arc<Self>) {
|
||||
self.flag.store(true, Ordering::Release);
|
||||
|
||||
self.inner.wake_by_ref();
|
||||
}
|
||||
|
||||
fn wake(self: Arc<Self>) {
|
||||
fn wake(self: std::sync::Arc<Self>) {
|
||||
self.flag.store(true, Ordering::Release);
|
||||
|
||||
match Arc::try_unwrap(self) {
|
||||
match std::sync::Arc::try_unwrap(self) {
|
||||
Ok(this) => this.inner.wake(),
|
||||
Err(this) => this.inner.wake_by_ref(),
|
||||
}
|
||||
|
@ -51,7 +49,7 @@ where
|
|||
type Output = Either<F1::Output, F2::Output>;
|
||||
|
||||
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let left_waker = Arc::new(SelectWaker {
|
||||
let left_waker = std::sync::Arc::new(SelectWaker {
|
||||
inner: cx.waker().clone(),
|
||||
flag: self.left_woken.clone(),
|
||||
})
|
||||
|
@ -63,7 +61,7 @@ where
|
|||
return Poll::Ready(Either::Left(left_out));
|
||||
}
|
||||
|
||||
let right_waker = Arc::new(SelectWaker {
|
||||
let right_waker = std::sync::Arc::new(SelectWaker {
|
||||
inner: cx.waker().clone(),
|
||||
flag: self.right_woken.clone(),
|
||||
})
|
||||
|
|
11
src/sync.rs
Normal file
11
src/sync.rs
Normal file
|
@ -0,0 +1,11 @@
|
|||
#[cfg(not(loom))]
|
||||
pub(crate) use std::sync::{
|
||||
atomic::{AtomicBool, AtomicU8, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
||||
|
||||
#[cfg(loom)]
|
||||
pub(crate) use loom::sync::{
|
||||
atomic::{AtomicBool, AtomicU8, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
244
tests/queue.rs
Normal file
244
tests/queue.rs
Normal file
|
@ -0,0 +1,244 @@
|
|||
#![cfg(loom)]
|
||||
|
||||
#[test]
|
||||
fn push_pop() {
|
||||
loom::model(|| {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
assert!(queue.try_push(()).is_none(), "Failed to push item");
|
||||
|
||||
assert!(queue.try_push(()).is_some(), "Shouldn't have pushed item");
|
||||
|
||||
assert!(queue.try_pop().is_some(), "Failed to pop item");
|
||||
|
||||
assert!(queue.try_pop().is_none(), "Shoudln't have popped item");
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped pushed item");
|
||||
|
||||
assert!(queue.try_push(()).is_none(), "Failed to push item");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn async_push_pop() {
|
||||
loom::model(|| {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
loom::future::block_on(async {
|
||||
queue.push(()).await;
|
||||
queue.pop().await;
|
||||
});
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped pushed item");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn threaded_push_pop() {
|
||||
loom::model(|| {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
let q2 = queue.clone();
|
||||
let handle = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.pop().await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
assert!(queue.try_push(()).is_none(), "failed to push item");
|
||||
handle.join().unwrap();
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped pushed item");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_threaded_push_pop() {
|
||||
loom::model(move || {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h1 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.pop().await;
|
||||
q2.pop().await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
loom::future::block_on(async {
|
||||
queue.push(()).await;
|
||||
queue.push(()).await;
|
||||
});
|
||||
|
||||
h1.join().unwrap();
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_threaded_push_pop_2() {
|
||||
loom::model(|| {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h1 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.push(()).await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h2 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.push(()).await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
loom::future::block_on(async {
|
||||
queue.pop().await;
|
||||
queue.pop().await;
|
||||
});
|
||||
|
||||
h1.join().unwrap();
|
||||
h2.join().unwrap();
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_threaded_push_pop_3() {
|
||||
loom::model(|| {
|
||||
let queue = async_cpupool::tests::queue::bounded(1);
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h1 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.push(()).await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h2 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.push(()).await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
let q2 = queue.clone();
|
||||
let h3 = loom::thread::spawn(move || {
|
||||
loom::future::block_on(async {
|
||||
q2.pop().await;
|
||||
});
|
||||
|
||||
drop(q2);
|
||||
});
|
||||
|
||||
loom::future::block_on(async {
|
||||
queue.pop().await;
|
||||
});
|
||||
|
||||
h1.join().unwrap();
|
||||
h2.join().unwrap();
|
||||
h3.join().unwrap();
|
||||
|
||||
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
1,
|
||||
"Should have created one queue"
|
||||
);
|
||||
|
||||
drop(queue);
|
||||
|
||||
assert_eq!(
|
||||
async_cpupool::tests::queue::queue_count(),
|
||||
0,
|
||||
"Should have dropped queue"
|
||||
);
|
||||
});
|
||||
}
|
Loading…
Reference in a new issue