167 lines
3.7 KiB
Rust
167 lines
3.7 KiB
Rust
use std::collections::VecDeque;
|
|
|
|
use crate::notify::Notify;
|
|
use crate::sync::{Arc, Mutex};
|
|
|
|
thread_local! {
|
|
#[cfg(any(loom, test))]
|
|
static QUEUE_COUNT: std::cell::RefCell<std::num::Wrapping<u64>> = std::cell::RefCell::new(std::num::Wrapping(0));
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn increment_queue_count() {
|
|
#[cfg(any(loom, test))]
|
|
QUEUE_COUNT.with_borrow_mut(|v| *v += 1);
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn decrement_queue_count() {
|
|
#[cfg(any(loom, test))]
|
|
QUEUE_COUNT.with_borrow_mut(|v| *v -= 1);
|
|
}
|
|
|
|
#[cfg(any(test, loom))]
|
|
#[doc(hidden)]
|
|
pub fn queue_count() -> u64 {
|
|
QUEUE_COUNT.with_borrow(|v| v.0)
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub fn bounded<T>(capacity: usize) -> Queue<T> {
|
|
Queue::bounded(capacity)
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub struct Queue<T> {
|
|
inner: Arc<QueueState<T>>,
|
|
capacity: usize,
|
|
}
|
|
|
|
struct QueueState<T> {
|
|
queue: Mutex<VecDeque<T>>,
|
|
push_notify: Notify,
|
|
pop_notify: Notify,
|
|
}
|
|
|
|
impl<T> Queue<T> {
|
|
#[doc(hidden)]
|
|
pub fn bounded(capacity: usize) -> Self {
|
|
increment_queue_count();
|
|
metrics::counter!("async-cpupool.queue.created").increment(1);
|
|
|
|
Self {
|
|
inner: Arc::new(QueueState {
|
|
queue: Mutex::new(VecDeque::new()),
|
|
push_notify: Notify::new(),
|
|
pop_notify: Notify::new(),
|
|
}),
|
|
capacity,
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub fn len(&self) -> usize {
|
|
self.inner.queue.lock().expect("not poisoned").len()
|
|
}
|
|
|
|
pub(super) fn is_full_or(&self) -> Result<(), usize> {
|
|
let len = self.len();
|
|
|
|
if len >= self.capacity {
|
|
Ok(())
|
|
} else {
|
|
Err(len)
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub async fn push(&self, mut item: T) {
|
|
let Some(returned_item) = self.try_push(item) else {
|
|
return;
|
|
};
|
|
|
|
item = returned_item;
|
|
|
|
loop {
|
|
let listener = self.inner.push_notify.listen().await;
|
|
|
|
let Some(returned_item) = self.try_push(item) else {
|
|
return;
|
|
};
|
|
|
|
item = returned_item;
|
|
|
|
listener.await;
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub fn try_push(&self, item: T) -> Option<T> {
|
|
match self.try_push_impl(item) {
|
|
Some(item) => Some(item),
|
|
None => {
|
|
self.inner.pop_notify.notify_one();
|
|
metrics::counter!("async-cpupool.queue.pushed").increment(1);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
fn try_push_impl(&self, item: T) -> Option<T> {
|
|
let mut guard = self.inner.queue.lock().expect("not poisoned");
|
|
|
|
if self.capacity <= guard.len() {
|
|
Some(item)
|
|
} else {
|
|
guard.push_back(item);
|
|
None
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub async fn pop(&self) -> T {
|
|
if let Some(item) = self.try_pop() {
|
|
return item;
|
|
}
|
|
|
|
loop {
|
|
let listener = self.inner.pop_notify.listen().await;
|
|
|
|
if let Some(item) = self.try_pop() {
|
|
return item;
|
|
}
|
|
|
|
listener.await;
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub fn try_pop(&self) -> Option<T> {
|
|
let item = self.try_pop_impl()?;
|
|
self.inner.push_notify.notify_one();
|
|
metrics::counter!("async-cpupool.queue.popped").increment(1);
|
|
|
|
Some(item)
|
|
}
|
|
|
|
fn try_pop_impl(&self) -> Option<T> {
|
|
self.inner.queue.lock().expect("not poisoned").pop_front()
|
|
}
|
|
}
|
|
|
|
impl<T> Clone for Queue<T> {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
inner: Arc::clone(&self.inner),
|
|
capacity: self.capacity,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for QueueState<T> {
|
|
fn drop(&mut self) {
|
|
decrement_queue_count();
|
|
metrics::counter!("async-cpupool.queue.dropped").increment(1);
|
|
}
|
|
}
|