async-cpupool/src/spsc.rs

65 lines
1.5 KiB
Rust

use crate::{
drop_notifier::{DropListener, DropNotifier},
executor::block_on,
queue::Queue,
selector::{select, Either},
Canceled,
};
pub(super) fn channel<T>() -> (Sender<T>, Receiver<T>) {
let queue = crate::queue::bounded(1);
let (send_notifier, send_listener) = crate::drop_notifier::notifier();
let (recv_notifier, recv_listener) = crate::drop_notifier::notifier();
(
Sender {
queue: queue.clone(),
send_notifier,
recv_listener,
},
Receiver {
queue,
recv_notifier,
send_listener,
},
)
}
pub(super) struct Sender<T> {
queue: Queue<T>,
#[allow(unused)]
send_notifier: DropNotifier,
recv_listener: DropListener,
}
pub(super) struct Receiver<T> {
queue: Queue<T>,
#[allow(unused)]
recv_notifier: DropNotifier,
send_listener: DropListener,
}
impl<T> Sender<T> {
pub(super) async fn send(self, item: T) -> Result<(), Canceled> {
match select(self.queue.push(item), self.recv_listener.listen()).await {
Either::Left(()) => Ok(()),
Either::Right(()) => Err(Canceled),
}
}
pub(super) fn blocking_send(self, item: T) -> Result<(), Canceled> {
block_on(self.send(item))
}
}
impl<T> Receiver<T> {
pub(super) async fn recv(self) -> Result<T, Canceled> {
match select(self.queue.pop(), self.send_listener.listen()).await {
Either::Left(item) => Ok(item),
Either::Right(()) => Err(Canceled),
}
}
}