Instrument queue for loom
All checks were successful
/ clippy (push) Successful in 16s
/ tests (push) Successful in 19s
/ check (armv7-unknown-linux-musleabihf) (push) Successful in 12s
/ check (x86_64-unknown-linux-musl) (push) Successful in 12s
/ check (aarch64-unknown-linux-musl) (push) Successful in 12s

This commit is contained in:
asonix 2024-04-17 12:46:13 -05:00
parent 0795399da5
commit b05eded986
4 changed files with 312 additions and 17 deletions

View file

@ -20,6 +20,8 @@
nativeBuildInputs = [ cargo cargo-outdated clippy rust-analyzer rustc rustfmt ];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
LOOM_MAX_PREEMPTIONS = "2";
};
});
}

View file

@ -22,6 +22,15 @@ use executor::block_on;
use queue::Queue;
use selector::select;
#[cfg(any(loom, test))]
#[doc(hidden)]
pub mod tests {
#[doc(hidden)]
pub mod queue {
pub use crate::queue::{bounded, queue_count, Queue};
}
}
/// Configuration builder for the CpuPool
#[derive(Debug)]
pub struct Config {

View file

@ -3,11 +3,36 @@ use std::collections::VecDeque;
use crate::notify::Notify;
use crate::sync::{Arc, Mutex};
pub(super) fn bounded<T>(capacity: usize) -> Queue<T> {
thread_local! {
#[cfg(any(loom, test))]
static QUEUE_COUNT: std::cell::RefCell<std::num::Wrapping<u64>> = std::cell::RefCell::new(std::num::Wrapping(0));
}
#[inline(always)]
fn increment_queue_count() {
#[cfg(any(loom, test))]
QUEUE_COUNT.with_borrow_mut(|v| *v += 1);
}
#[inline(always)]
fn decrement_queue_count() {
#[cfg(any(loom, test))]
QUEUE_COUNT.with_borrow_mut(|v| *v -= 1);
}
#[cfg(any(test, loom))]
#[doc(hidden)]
pub fn queue_count() -> u64 {
QUEUE_COUNT.with_borrow(|v| v.0)
}
#[doc(hidden)]
pub fn bounded<T>(capacity: usize) -> Queue<T> {
Queue::bounded(capacity)
}
pub(crate) struct Queue<T> {
#[doc(hidden)]
pub struct Queue<T> {
inner: Arc<QueueState<T>>,
capacity: usize,
}
@ -19,8 +44,11 @@ struct QueueState<T> {
}
impl<T> Queue<T> {
pub(super) fn bounded(capacity: usize) -> Self {
#[doc(hidden)]
pub fn bounded(capacity: usize) -> Self {
increment_queue_count();
metrics::counter!("async-cpupool.queue.created").increment(1);
Self {
inner: Arc::new(QueueState {
queue: Mutex::new(VecDeque::new()),
@ -31,7 +59,8 @@ impl<T> Queue<T> {
}
}
pub(super) fn len(&self) -> usize {
#[doc(hidden)]
pub fn len(&self) -> usize {
self.inner.queue.lock().expect("not poisoned").len()
}
@ -45,21 +74,23 @@ impl<T> Queue<T> {
}
}
pub(super) async fn push(&self, mut item: T) {
#[doc(hidden)]
pub async fn push(&self, mut item: T) {
loop {
let listener = self.inner.push_notify.listen().await;
if let Some(returned_item) = self.try_push(item) {
item = returned_item;
listener.await;
} else {
let Some(returned_item) = self.try_push(item) else {
return;
}
};
item = returned_item;
listener.await;
}
}
pub(super) fn try_push(&self, item: T) -> Option<T> {
#[doc(hidden)]
pub fn try_push(&self, item: T) -> Option<T> {
match self.try_push_impl(item) {
Some(item) => Some(item),
None => {
@ -81,13 +112,12 @@ impl<T> Queue<T> {
}
}
pub(super) async fn pop(&self) -> T {
#[doc(hidden)]
pub async fn pop(&self) -> T {
loop {
let listener = self.inner.pop_notify.listen().await;
if let Some(item) = self.try_pop() {
self.inner.push_notify.notify_one();
metrics::counter!("async-cpupool.queue.popped").increment(1);
return item;
}
@ -95,7 +125,16 @@ impl<T> Queue<T> {
}
}
fn try_pop(&self) -> Option<T> {
#[doc(hidden)]
pub fn try_pop(&self) -> Option<T> {
let item = self.try_pop_impl()?;
self.inner.push_notify.notify_one();
metrics::counter!("async-cpupool.queue.popped").increment(1);
Some(item)
}
fn try_pop_impl(&self) -> Option<T> {
self.inner.queue.lock().expect("not poisoned").pop_front()
}
}
@ -109,8 +148,9 @@ impl<T> Clone for Queue<T> {
}
}
impl<T> Drop for Queue<T> {
impl<T> Drop for QueueState<T> {
fn drop(&mut self) {
decrement_queue_count();
metrics::counter!("async-cpupool.queue.dropped").increment(1);
}
}

244
tests/queue.rs Normal file
View file

@ -0,0 +1,244 @@
#![cfg(loom)]
#[test]
fn push_pop() {
loom::model(|| {
let queue = async_cpupool::tests::queue::bounded(1);
assert!(queue.try_push(()).is_none(), "Failed to push item");
assert!(queue.try_push(()).is_some(), "Shouldn't have pushed item");
assert!(queue.try_pop().is_some(), "Failed to pop item");
assert!(queue.try_pop().is_none(), "Shoudln't have popped item");
assert_eq!(queue.len(), 0, "Should have popped pushed item");
assert!(queue.try_push(()).is_none(), "Failed to push item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}
#[test]
fn async_push_pop() {
loom::model(|| {
let queue = async_cpupool::tests::queue::bounded(1);
loom::future::block_on(async {
queue.push(()).await;
queue.pop().await;
});
assert_eq!(queue.len(), 0, "Should have popped pushed item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}
#[test]
fn threaded_push_pop() {
loom::model(|| {
let queue = async_cpupool::tests::queue::bounded(1);
let q2 = queue.clone();
let handle = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.pop().await;
});
drop(q2);
});
assert!(queue.try_push(()).is_none(), "failed to push item");
handle.join().unwrap();
assert_eq!(queue.len(), 0, "Should have popped pushed item");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}
#[test]
fn multiple_threaded_push_pop() {
loom::model(move || {
let queue = async_cpupool::tests::queue::bounded(1);
let q2 = queue.clone();
let h1 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.pop().await;
q2.pop().await;
});
drop(q2);
});
loom::future::block_on(async {
queue.push(()).await;
queue.push(()).await;
});
h1.join().unwrap();
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}
#[test]
fn multiple_threaded_push_pop_2() {
loom::model(|| {
let queue = async_cpupool::tests::queue::bounded(1);
let q2 = queue.clone();
let h1 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.push(()).await;
});
drop(q2);
});
let q2 = queue.clone();
let h2 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.push(()).await;
});
drop(q2);
});
loom::future::block_on(async {
queue.pop().await;
queue.pop().await;
});
h1.join().unwrap();
h2.join().unwrap();
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}
#[test]
fn multiple_threaded_push_pop_3() {
loom::model(|| {
let queue = async_cpupool::tests::queue::bounded(1);
let q2 = queue.clone();
let h1 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.push(()).await;
});
drop(q2);
});
let q2 = queue.clone();
let h2 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.push(()).await;
});
drop(q2);
});
let q2 = queue.clone();
let h3 = loom::thread::spawn(move || {
loom::future::block_on(async {
q2.pop().await;
});
drop(q2);
});
loom::future::block_on(async {
queue.pop().await;
});
h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
assert_eq!(queue.len(), 0, "Should have popped both pushed items");
assert_eq!(
async_cpupool::tests::queue::queue_count(),
1,
"Should have created one queue"
);
drop(queue);
assert_eq!(
async_cpupool::tests::queue::queue_count(),
0,
"Should have dropped queue"
);
});
}