commit ac89bc9a9148b029e134c1889baec61d510126f8 Author: Aode (lion) Date: Thu Feb 17 13:39:47 2022 -0500 Basic channel diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a0e64df --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "mpsc" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c879844 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,116 @@ +use std::{ + collections::VecDeque, + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread::Thread, +}; + +pub fn channel() -> (Sender, Receiver) { + let state = Arc::new(Mutex::new(State { + items: VecDeque::new(), + wake: None, + })); + + ( + Sender { + state: Arc::clone(&state), + }, + Receiver { state }, + ) +} + +pub struct Sender { + state: Arc>>, +} + +pub struct Receiver { + state: Arc>>, +} + +struct State { + items: VecDeque, + wake: Option, +} + +enum WakerKind { + Waker(Waker), + Thread(Thread), +} + +struct Receive<'a, T> { + state: &'a Arc>>, +} + +impl Sender { + pub fn send(&self, item: T) { + let mut guard = self.state.lock().unwrap(); + guard.items.push_back(item); + match &guard.wake { + Some(WakerKind::Waker(ref waker)) => waker.wake_by_ref(), + Some(WakerKind::Thread(ref thread)) => thread.unpark(), + None => {} + } + } +} + +impl Receiver { + pub async fn recv(&mut self) -> Option { + Receive { state: &self.state }.await + } + + pub fn recv_blocking(&mut self) -> Option { + loop { + { + let mut guard = self.state.lock().unwrap(); + + if let Some(item) = guard.items.pop_front() { + guard.wake.take(); + return Some(item); + } + + if Arc::strong_count(&self.state) == 1 { + return None; + } + + guard.wake = Some(WakerKind::Thread(std::thread::current())); + } + + std::thread::park(); + } + } + + pub fn try_recv(&mut self) -> Option { + self.state.lock().unwrap().items.pop_front() + } +} + +impl<'a, T> Future for Receive<'a, T> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut guard = self.state.lock().unwrap(); + + if let Some(item) = guard.items.pop_front() { + guard.wake.take(); + return Poll::Ready(Some(item)); + } + + if Arc::strong_count(self.state) == 1 { + return Poll::Ready(None); + } + + guard.wake = Some(WakerKind::Waker(cx.waker().clone())); + + Poll::Pending + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + } + } +}