Remove jitterbug

This commit is contained in:
Aode (lion) 2022-02-15 12:10:09 -06:00
parent e24c4fd05e
commit a6f1434548
9 changed files with 0 additions and 807 deletions

View file

@ -1,13 +0,0 @@
[package]
name = "jitterbug"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
polldance = { path = "../" }
rustix = "0.33.2"
[dev-dependencies]
async-join = { git = "https://git.asonix.dog/asonix/async-join" }

View file

@ -1,42 +0,0 @@
use async_join::join_all;
use jitterbug::Async;
async fn echo(port: u16) -> Result<(), jitterbug::Error> {
let listener = Async::bind([127, 0, 0, 1].into(), port).await?;
println!("bound listener");
loop {
let (stream, _addr) = listener.accept().await?;
println!("Accepted connection");
loop {
let mut buf = [0; 1024];
if let Err(e) = stream.read(&mut buf).await {
if e == rustix::io::Error::PIPE {
break;
}
return Err(e.into());
}
if let Err(e) = stream.write_all(&buf).await {
if e == rustix::io::Error::PIPE {
break;
}
return Err(e.into());
}
}
println!("Connection closed");
}
}
fn main() -> Result<(), jitterbug::Error> {
jitterbug::block_on(async move {
for res in join_all(vec![Box::pin(echo(4444)), Box::pin(echo(4445))]).await {
res?;
}
Ok(())
})?
}

View file

@ -1,45 +0,0 @@
use async_join::join_all;
use jitterbug::Async;
use std::net::{SocketAddr, SocketAddrV4, TcpStream};
async fn echo_to(port: u16) -> Result<(), Box<dyn std::error::Error>> {
let sockaddr = SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), port));
let stream = TcpStream::connect(sockaddr)?;
stream.set_nonblocking(true)?;
let stream = Async::new(stream);
println!("Connected");
loop {
let mut buf = [0; 1024];
if let Err(e) = stream.read(&mut buf).await {
if e == rustix::io::Error::PIPE {
break;
}
return Err(e.into());
}
if let Err(e) = stream.write_all(&buf).await {
if e == rustix::io::Error::PIPE {
break;
}
return Err(e.into());
}
}
println!("Connection closed");
Ok(())
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
jitterbug::block_on(async move {
for res in join_all(vec![Box::pin(echo_to(4444)), Box::pin(echo_to(4445))]).await {
res?;
}
Ok(())
})?
}

View file

@ -1,28 +0,0 @@
use async_join::join_all;
use jitterbug::Error;
use std::{future::Future, pin::Pin, time::Duration};
fn main() -> Result<(), Error> {
jitterbug::block_on(async move {
let f1 = Box::pin(async move {
jitterbug::time::sleep(Duration::from_secs(4)).await;
println!("waited 4");
}) as Pin<Box<dyn Future<Output = ()>>>;
let f2 = Box::pin(async move {
jitterbug::time::sleep(Duration::from_secs(2)).await;
println!("waited 2");
});
let f3 = Box::pin(async move {
let mut interval = jitterbug::time::interval(Duration::from_secs(1));
for count in 0..5 {
interval.tick().await;
println!("count {}", count);
}
});
join_all(vec![f1, f2, f3]).await;
})
}

View file

@ -1,257 +0,0 @@
use crate::reactor::ReactorRef;
use polldance::{
net::{TcpListener, TcpListenerBuilder, TcpStream, TcpStreamBuilder},
Readiness,
};
use rustix::{fd::AsFd, net::SocketAddrAny};
use std::{
future::Future,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Async<T: AsFd + 'static> {
io: Arc<T>,
}
pub struct Read<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
bytes: &'a mut [u8],
}
pub struct Write<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
bytes: &'a [u8],
}
pub struct Accept<'a> {
inner: &'a Async<TcpListener>,
}
pub struct Bind {
io: Option<rustix::io::Result<Arc<TcpListenerBuilder>>>,
}
pub struct Connect {
io: Option<rustix::io::Result<Arc<TcpStreamBuilder>>>,
}
impl<T: AsFd + 'static> Drop for Async<T> {
fn drop(&mut self) {
let _ = ReactorRef::with(|mut reactor| reactor.deregister(&self.io));
}
}
impl Async<TcpListener> {
pub fn bind(ip: IpAddr, port: u16) -> Bind {
Bind {
io: Some(polldance::net::TcpListener::bind(ip, port).map(Arc::new)),
}
}
pub fn accept(&self) -> Accept<'_> {
Accept { inner: self }
}
}
impl Async<TcpStream> {
pub fn connect(ip: IpAddr, port: u16) -> Connect {
Connect {
io: Some(polldance::net::TcpStream::connect(ip, port).map(Arc::new)),
}
}
}
impl<T: AsFd + 'static> Async<T> {
pub fn new(io: T) -> Self {
Async { io: Arc::new(io) }
}
pub fn read<'a>(&'a self, bytes: &'a mut [u8]) -> Read<'a, T> {
Read { inner: self, bytes }
}
pub async fn read_exact(&self, bytes: &mut [u8]) -> rustix::io::Result<usize> {
let mut start = 0;
while start < bytes.len() {
let n = self.read(&mut bytes[start..]).await?;
if n == 0 {
break;
}
start += n;
}
Ok(start)
}
pub fn write<'a>(&'a self, bytes: &'a [u8]) -> Write<'a, T> {
Write { inner: self, bytes }
}
pub async fn write_all(&self, bytes: &[u8]) -> rustix::io::Result<()> {
let mut start = 0;
while start < bytes.len() {
start += self.write(&bytes[start..]).await?;
}
Ok(())
}
}
impl Future for Bind {
type Output = rustix::io::Result<Async<TcpListener>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match this.io.take().unwrap() {
Ok(builder) => {
let builder = match Arc::try_unwrap(builder) {
Ok(builder) => builder,
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
};
match builder.try_finish() {
Ok(Ok(listener)) => Poll::Ready(Ok(Async::new(listener))),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&builder),
cx.waker().clone(),
Readiness::read(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl Future for Connect {
type Output = rustix::io::Result<Async<TcpStream>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match this.io.take().unwrap() {
Ok(builder) => {
let builder = match Arc::try_unwrap(builder) {
Ok(builder) => builder,
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
};
match builder.try_finish() {
Ok(Ok(stream)) => Poll::Ready(Ok(Async::new(stream))),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&builder),
cx.waker().clone(),
Readiness::write(),
);
})
.unwrap();
this.io = Some(Ok(builder));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Read<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match rustix::io::read(&*this.inner.io, &mut this.bytes[0..]) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&this.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Write<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match rustix::io::write(&*self.inner.io, self.bytes) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a> Future for Accept<'a> {
type Output = rustix::io::Result<(Async<TcpStream>, SocketAddrAny)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.io.try_accept() {
Ok(Some((stream, addr))) => Poll::Ready(Ok((Async::new(stream), addr))),
Ok(None) => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}

View file

@ -1,13 +0,0 @@
mod io;
mod reactor;
pub mod time;
mod timer;
use std::future::Future;
pub use io::Async;
pub use reactor::{Error, Reactor, ReactorRef};
pub fn block_on<F: Future>(future: F) -> Result<F::Output, Error> {
Reactor::new()?.block_on(future)
}

View file

@ -1,309 +0,0 @@
use polldance::{Key, PollManager, Readiness};
use rustix::fd::AsFd;
use std::{
collections::{BTreeMap, HashMap},
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, MutexGuard,
},
task::{Context, Poll, Wake, Waker},
time::{Duration, Instant},
};
thread_local! {
static REACTOR: Mutex<Option<Arc<Mutex<ReactorState>>>> = Mutex::new(None);
}
#[derive(Debug)]
pub enum Error {
IO(rustix::io::Error),
AlreadyInitialized,
Uninitialized,
}
pub struct Reactor {
state: Arc<Mutex<ReactorState>>,
}
pub struct ReactorRef<'a> {
state: MutexGuard<'a, ReactorState>,
}
pub struct RemoveFlag {
inner: Arc<AtomicBool>,
}
struct ReactorState {
now: Instant,
poller: PollManager,
io: HashMap<Key, Vec<(Readiness, Waker)>>,
timers: BTreeMap<Instant, Vec<(Arc<AtomicBool>, Waker)>>,
}
struct IoWaker {
woken: AtomicBool,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IO(ref e) => std::fmt::Display::fmt(e, f),
Self::AlreadyInitialized => {
write!(f, "A Reactor has already been initialized on this thread")
}
Self::Uninitialized => write!(f, "No reactor has been initialized on this thread"),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
if let Self::IO(ref e) = self {
return Some(e);
}
None
}
}
impl ReactorRef<'_> {
pub fn with<F, T>(callback: F) -> Result<T, Error>
where
F: for<'a> FnOnce(ReactorRef<'a>) -> T,
T: 'static,
{
REACTOR.with(|reactor| {
let guard = reactor.lock().unwrap();
if let Some(state) = guard.as_ref() {
let res = callback(ReactorRef {
state: state.lock().unwrap(),
});
return Ok(res);
}
Err(Error::Uninitialized)
})
}
pub fn register<A: AsFd + 'static>(&mut self, io: Arc<A>, waker: Waker, interests: Readiness) {
self.state.register(io, waker, interests)
}
pub fn add_timer(&mut self, duration: Duration, waker: Waker) -> RemoveFlag {
self.state.add_timer(duration, waker)
}
pub fn deregister<A: AsFd + 'static>(&mut self, io: &Arc<A>) {
self.state.deregister(io)
}
pub fn update_interests<A: AsFd + 'static>(
&mut self,
io: &Arc<A>,
callback: impl Fn(Readiness) -> Readiness,
) {
self.state.update_interests(io, callback);
}
}
impl From<rustix::io::Error> for Error {
fn from(e: rustix::io::Error) -> Self {
Error::IO(e)
}
}
impl Drop for Reactor {
fn drop(&mut self) {
REACTOR.with(|reactor| {
reactor.lock().unwrap().take();
});
}
}
impl Wake for IoWaker {
fn wake(self: Arc<Self>) {
self.woken.store(true, Ordering::Release);
}
fn wake_by_ref(self: &Arc<Self>) {
self.woken.store(true, Ordering::Release);
}
}
impl Reactor {
pub fn new() -> Result<Self, Error> {
let state = ReactorState::init()?;
Ok(Self { state })
}
pub fn block_on<F: Future>(&self, future: F) -> Result<F::Output, Error> {
let mut pinned = Box::pin(future);
let io_waker = Arc::new(IoWaker {
woken: AtomicBool::new(true),
});
let waker = Arc::clone(&io_waker).into();
let mut context = Context::from_waker(&waker);
loop {
if io_waker.woken.swap(false, Ordering::AcqRel) {
if let Poll::Ready(output) = pinned.as_mut().poll(&mut context) {
return Ok(output);
}
}
self.state.lock().unwrap().poll()?;
}
}
}
impl Drop for RemoveFlag {
fn drop(&mut self) {
self.inner.store(true, Ordering::Release);
}
}
impl ReactorState {
fn init() -> Result<Arc<Mutex<Self>>, Error> {
REACTOR.with(|reactor| {
let mut guard = reactor.lock().unwrap();
if guard.is_some() {
Err(Error::AlreadyInitialized)
} else {
let this = Arc::new(Mutex::new(Self {
now: Instant::now(),
poller: PollManager::new()?,
io: HashMap::new(),
timers: BTreeMap::new(),
}));
*guard = Some(Arc::clone(&this));
Ok(this)
}
})
}
fn register<A: AsFd + 'static>(&mut self, io: Arc<A>, waker: Waker, interests: Readiness) {
let key = self.poller.register(io, interests);
let entry = self.io.entry(key).or_insert_with(Vec::new);
for (ntrst, wkr) in entry.iter_mut() {
if waker.will_wake(wkr) {
*ntrst = *ntrst | interests;
return;
}
}
entry.push((interests, waker));
}
fn deregister<A: AsFd + 'static>(&mut self, io: &Arc<A>) {
self.poller.deregister(io);
}
fn update_interests<A: AsFd + 'static>(
&mut self,
io: &Arc<A>,
callback: impl Fn(Readiness) -> Readiness,
) {
self.poller.update_interests(io, callback);
}
fn poll(&mut self) -> Result<(), Error> {
let now = self.now();
let first_wakeup = self.timers.keys().copied().next();
let timeout = first_wakeup.map(|instant| {
instant
.checked_duration_since(now)
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX))
.unwrap_or(0)
});
for (key, readiness) in self.poller.poll(timeout)? {
if let Some((key, vec)) = self.io.remove_entry(&key) {
let new_vec = vec
.into_iter()
.filter_map(|(interests, waker)| {
if interests.is_intersect(readiness) {
waker.wake();
return None;
}
Some((interests, waker))
})
.collect::<Vec<_>>();
let readiness = new_vec
.iter()
.fold(Readiness::empty(), |readiness, (current, _)| {
readiness | *current
});
if new_vec.is_empty() || readiness.is_empty() {
self.poller.deregister_by_key(key);
} else {
self.poller.update_interests_by_key(&key, |_| readiness);
self.io.insert(key, new_vec);
}
}
}
self.check_timers();
Ok(())
}
fn now(&mut self) -> Instant {
let now = Instant::now();
if now.duration_since(self.now) >= Duration::from_millis(1) {
self.now = now;
}
self.now
}
fn add_timer(&mut self, duration: Duration, waker: Waker) -> RemoveFlag {
let now = self.now();
let expires = now + duration;
let flag = Arc::new(AtomicBool::new(false));
self.timers
.entry(expires)
.or_insert_with(Vec::new)
.push((Arc::clone(&flag), waker));
RemoveFlag { inner: flag }
}
fn check_timers(&mut self) {
let now = self.now();
let expired = self
.timers
.keys()
.copied()
.take_while(|instant| *instant <= now)
.collect::<Vec<_>>();
for timer in expired {
if let Some(wakers) = self.timers.remove(&timer) {
for (flag, waker) in wakers {
if !flag.load(Ordering::Acquire) {
waker.wake();
}
}
}
}
}
}

View file

@ -1,48 +0,0 @@
use std::time::{Duration, Instant};
pub use crate::timer::Timer;
pub fn sleep(duration: Duration) -> Timer {
Timer::new(duration)
}
pub fn interval(duration: Duration) -> Interval {
Interval {
next: Instant::now(),
duration,
}
}
pub fn interval_at(instant: Instant, duration: Duration) -> Interval {
Interval {
next: instant,
duration,
}
}
pub struct Interval {
next: Instant,
duration: Duration,
}
impl Interval {
pub fn tick(&mut self) -> Timer {
let now = Instant::now();
while self.next + self.duration <= now {
self.next += self.duration;
}
let duration = if let Some(duration) = now.checked_duration_since(self.next) {
duration
} else if let Some(duration) = self.next.checked_duration_since(now) {
duration
} else {
Duration::from_secs(0)
};
self.next += self.duration;
Timer::new(duration)
}
}

View file

@ -1,52 +0,0 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use crate::{reactor::RemoveFlag, ReactorRef};
pub struct Timer {
created: Instant,
duration: Duration,
remove_flag: Option<RemoveFlag>,
}
impl Timer {
pub fn new(duration: Duration) -> Self {
Timer {
duration,
created: Instant::now(),
remove_flag: None,
}
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
let diff = now.checked_duration_since(self.created);
let duration = if let Some(diff) = diff {
if self.duration <= diff {
return Poll::Ready(());
}
self.duration.checked_sub(diff)
} else {
None
};
let duration = duration.unwrap_or(self.duration);
self.remove_flag = Some(
ReactorRef::with(|mut reactor| reactor.add_timer(duration, cx.waker().clone()))
.unwrap(),
);
Poll::Pending
}
}