Implement oneshot and mpsc as examples

This commit is contained in:
Aode (Lion) 2022-02-24 20:37:54 -06:00
parent 03e28bff03
commit b80be5d7f9
2 changed files with 215 additions and 0 deletions

111
examples/mpsc.rs Normal file
View file

@ -0,0 +1,111 @@
use notify::{Listener, Notify};
use std::{time::Duration, sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}}, collections::VecDeque};
struct State<T> {
items: Mutex<VecDeque<T>>,
rx_dropped: AtomicBool,
}
struct Sender<T> {
state: Arc<State<T>>,
notify: Notify,
}
struct Receiver<T> {
state: Arc<State<T>>,
listener: Listener,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
jive::block_on(async move {
let (tx, mut rx) = mpsc();
jive::spawn(async move {
let mut interval = jive::time::interval(Duration::from_secs(1));
for i in 0..5 {
interval.tick().await;
let _ = tx.send(i);
}
});
while let Some(val) = rx.recv().await {
println!("Got {}", val);
}
})?;
let (tx, rx) = mpsc();
std::thread::spawn(move || {
for i in -5..0 {
std::thread::sleep(Duration::from_secs(1));
let _ = tx.send(i);
}
});
while let Some(val) = rx.recv_blocking() {
println!("Got {}", val);
}
Ok(())
}
fn mpsc<T>() -> (Sender<T>, Receiver<T>) {
let state = Arc::new(State {
items: Mutex::new(VecDeque::new()),
rx_dropped: AtomicBool::new(false),
});
let notify = Notify::new();
let listener = notify.listener();
(Sender { state: Arc::clone(&state), notify }, Receiver { state, listener })
}
impl<T> Sender<T> {
pub fn send(&self, item: T) -> Result<(), T> {
if self.state.rx_dropped.load(Ordering::Acquire) {
return Err(item);
}
self.state.items.lock().unwrap().push_back(item);
self.notify.notify_one();
Ok(())
}
}
impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
loop {
if let Some(item) = self.state.items.lock().unwrap().pop_front() {
return Some(item);
}
if Arc::strong_count(&self.state) == 1 {
return None;
}
self.listener.listen().await;
}
}
pub fn recv_blocking(&self) -> Option<T> {
loop {
if let Some(item) = self.state.items.lock().unwrap().pop_front() {
return Some(item);
}
if Arc::strong_count(&self.state) == 1 {
return None;
}
self.listener.listen_blocking();
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.state.rx_dropped.store(true, Ordering::Release);
}
}

104
examples/oneshot.rs Normal file
View file

@ -0,0 +1,104 @@
use notify::{Listener, Notify};
use std::{time::Duration, sync::{Arc, Mutex}};
struct Sender<T> {
item: Arc<Mutex<Option<Result<T, Dropped>>>>,
notify: Notify,
}
struct Receiver<T> {
item: Arc<Mutex<Option<Result<T, Dropped>>>>,
listener: Listener,
}
struct Dropped;
fn main() -> Result<(), Box<dyn std::error::Error>> {
jive::block_on(async move {
let (tx, rx) = oneshot();
jive::spawn(async move {
jive::time::sleep(Duration::from_secs(1)).await;
let _ = tx.send(5);
});
if let Ok(val) = rx.recv().await {
println!("Got {}", val);
}
})?;
let (tx, rx) = oneshot();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
let _ = tx.send(-5);
});
if let Ok(val) = rx.recv_blocking() {
println!("Got {}", val);
}
Ok(())
}
fn oneshot<T>() -> (Sender<T>, Receiver<T>) {
let item = Arc::new(Mutex::new(None));
let notify = Notify::new();
let listener = notify.listener();
(Sender { item: Arc::clone(&item), notify }, Receiver { item, listener })
}
impl<T> Sender<T> {
pub fn send(self, item: T) -> Result<(), T> {
let mut guard = self.item.lock().unwrap();
if let Some(Err(Dropped)) = guard.take() {
return Err(item);
}
*guard = Some(Ok(item));
self.notify.notify_one();
Ok(())
}
}
impl<T> Receiver<T> {
pub async fn recv(mut self) -> Result<T, Dropped> {
loop {
if let Some(res) = self.item.lock().unwrap().take() {
return res;
}
self.listener.listen().await;
}
}
pub fn recv_blocking(self) -> Result<T, Dropped> {
loop {
if let Some(res) = self.item.lock().unwrap().take() {
return res;
}
self.listener.listen_blocking();
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut guard = self.item.lock().unwrap();
if guard.is_none() {
*guard = Some(Err(Dropped));
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut guard = self.item.lock().unwrap();
if guard.is_none() {
*guard = Some(Err(Dropped));
}
}
}