notify/examples/mpsc.rs
2022-03-05 16:57:44 -06:00

152 lines
3.1 KiB
Rust

use notify::{Listener, Notify};
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::Duration,
};
struct State<T> {
items: Mutex<VecDeque<T>>,
rx_dropped: AtomicBool,
}
struct Sender<T> {
state: Option<Arc<State<T>>>,
notify: Notify,
}
struct Receiver<T> {
state: Arc<State<T>>,
listener: Listener,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
jive::block_on(async move {
let (tx, mut rx) = mpsc();
jive::spawn(async move {
let mut interval = jive::time::interval(Duration::from_secs(1));
for i in 0..5 {
interval.tick().await;
let _ = tx.send(i);
}
});
while let Some(val) = rx.recv().await {
println!("Got {}", val);
}
});
let (tx, rx) = mpsc();
std::thread::spawn(move || {
for i in -5..0 {
std::thread::sleep(Duration::from_secs(1));
let _ = tx.send(i);
}
});
while let Some(val) = rx.recv_blocking() {
println!("Got {}", val);
}
Ok(())
}
fn mpsc<T>() -> (Sender<T>, Receiver<T>) {
let state = Arc::new(State {
items: Mutex::new(VecDeque::new()),
rx_dropped: AtomicBool::new(false),
});
let notify = Notify::new();
let listener = notify.listener();
(
Sender {
state: Some(Arc::clone(&state)),
notify,
},
Receiver { state, listener },
)
}
impl<T> Sender<T> {
pub fn send(&self, item: T) -> Result<(), T> {
if self.is_closed() {
return Err(item);
}
self.state().items.lock().unwrap().push_back(item);
self.notify.notify_one();
Ok(())
}
pub fn is_closed(&self) -> bool {
self.state().rx_dropped.load(Ordering::Acquire)
}
fn state(&self) -> &State<T> {
self.state.as_deref().unwrap()
}
}
impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
loop {
self.listener.consume_all_notifications();
if let Some(item) = self.try_recv() {
return Some(item);
}
if self.is_closed() {
return None;
}
self.listener.listen().await;
}
}
pub fn recv_blocking(&self) -> Option<T> {
loop {
self.listener.consume_all_notifications();
if let Some(item) = self.try_recv() {
return Some(item);
}
if self.is_closed() {
return None;
}
self.listener.listen_blocking();
}
}
pub fn is_closed(&self) -> bool {
Arc::strong_count(&self.state) == 1
}
pub fn try_recv(&self) -> Option<T> {
self.state.items.lock().unwrap().pop_front()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.state.take();
self.notify.notify_one();
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.state.rx_dropped.store(true, Ordering::Release);
}
}