Add Timer, time helpers, delay example

This commit is contained in:
Aode (lion) 2022-02-14 19:19:13 -06:00
parent ef1336c675
commit f8279e2c7b
6 changed files with 327 additions and 173 deletions

View file

@ -1,5 +1,5 @@
use async_join::join_all;
use jitterbug::{Async, Reactor};
use jitterbug::Async;
use std::net::{SocketAddr, SocketAddrV4, TcpStream};
async fn echo_to(port: u16) -> Result<(), Box<dyn std::error::Error>> {
@ -35,9 +35,7 @@ async fn echo_to(port: u16) -> Result<(), Box<dyn std::error::Error>> {
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let reactor = Reactor::new()?;
reactor.block_on(async move {
jitterbug::block_on(async move {
for res in join_all(vec![Box::pin(echo_to(4444)), Box::pin(echo_to(4445))]).await {
res?;
}

View file

@ -0,0 +1,28 @@
use async_join::join_all;
use jitterbug::Error;
use std::{future::Future, pin::Pin, time::Duration};
fn main() -> Result<(), Error> {
jitterbug::block_on(async move {
let f1 = Box::pin(async move {
jitterbug::time::sleep(Duration::from_secs(4)).await;
println!("waited 4");
}) as Pin<Box<dyn Future<Output = ()>>>;
let f2 = Box::pin(async move {
jitterbug::time::sleep(Duration::from_secs(2)).await;
println!("waited 2");
});
let f3 = Box::pin(async move {
let mut interval = jitterbug::time::interval(Duration::from_secs(1));
for count in 0..5 {
interval.tick().await;
println!("count {}", count);
}
});
join_all(vec![f1, f2, f3]).await;
})
}

177
jitterbug/src/io.rs Normal file
View file

@ -0,0 +1,177 @@
use crate::reactor::ReactorRef;
use polldance::{
net::{TcpStream, TcpStreamBuilder},
Readiness,
};
use rustix::fd::AsFd;
use std::{
future::Future,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Async<T: AsFd + 'static> {
io: Arc<T>,
}
pub struct Read<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
bytes: &'a mut [u8],
}
pub struct Write<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
bytes: &'a [u8],
}
pub struct Connect {
io: Option<rustix::io::Result<Arc<TcpStreamBuilder>>>,
}
impl<T: AsFd + 'static> Drop for Async<T> {
fn drop(&mut self) {
let _ = ReactorRef::with(|mut reactor| reactor.deregister(&self.io));
}
}
impl Async<TcpStream> {
pub fn connect(ip: IpAddr, port: u16) -> Connect {
Connect {
io: Some(polldance::net::TcpStream::connect(ip, port).map(Arc::new)),
}
}
}
impl<T: AsFd + 'static> Async<T> {
pub fn new(io: T) -> Self {
Async { io: Arc::new(io) }
}
pub fn read<'a>(&'a self, bytes: &'a mut [u8]) -> Read<'a, T> {
Read { inner: self, bytes }
}
pub async fn read_exact(&self, bytes: &mut [u8]) -> rustix::io::Result<usize> {
let mut start = 0;
while start < bytes.len() {
let n = self.read(&mut bytes[start..]).await?;
if n == 0 {
break;
}
start += n;
}
Ok(start)
}
pub fn write<'a>(&'a self, bytes: &'a [u8]) -> Write<'a, T> {
Write { inner: self, bytes }
}
pub async fn write_all(&self, bytes: &[u8]) -> rustix::io::Result<()> {
let mut start = 0;
while start < bytes.len() {
start += self.write(&bytes[start..]).await?;
}
Ok(())
}
}
impl Future for Connect {
type Output = rustix::io::Result<Async<TcpStream>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match this.io.take().unwrap() {
Ok(builder) => {
let builder = match Arc::try_unwrap(builder) {
Ok(builder) => builder,
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
};
match builder.try_finish() {
Ok(Ok(stream)) => Poll::Ready(Ok(Async::new(stream))),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&builder),
cx.waker().clone(),
Readiness::write(),
);
})
.unwrap();
this.io = Some(Ok(builder));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Read<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match rustix::io::read(&*this.inner.io, &mut this.bytes[0..]) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&this.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Write<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match rustix::io::write(&*self.inner.io, self.bytes) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}

View file

@ -1,174 +1,13 @@
use polldance::{
net::{TcpStream, TcpStreamBuilder},
Readiness,
};
use rustix::fd::AsFd;
use std::{
future::Future,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
mod io;
mod reactor;
pub mod time;
mod timer;
use std::future::Future;
pub use io::Async;
pub use reactor::{Error, Reactor, ReactorRef};
pub struct Async<T> {
io: Arc<T>,
}
pub struct Read<'a, T> {
inner: &'a Async<T>,
bytes: &'a mut [u8],
}
pub struct Write<'a, T> {
inner: &'a Async<T>,
bytes: &'a [u8],
}
pub struct Connect {
io: Option<rustix::io::Result<Arc<TcpStreamBuilder>>>,
}
impl Async<TcpStream> {
pub fn connect(ip: IpAddr, port: u16) -> Connect {
Connect {
io: Some(polldance::net::TcpStream::connect(ip, port).map(Arc::new)),
}
}
}
impl<T: AsFd + 'static> Async<T> {
pub fn new(io: T) -> Self {
Async { io: Arc::new(io) }
}
pub fn read<'a>(&'a self, bytes: &'a mut [u8]) -> Read<'a, T> {
Read { inner: self, bytes }
}
pub async fn read_exact(&self, bytes: &mut [u8]) -> rustix::io::Result<usize> {
let mut start = 0;
while start < bytes.len() {
let n = self.read(&mut bytes[start..]).await?;
if n == 0 {
break;
}
start += n;
}
Ok(start)
}
pub fn write<'a>(&'a self, bytes: &'a [u8]) -> Write<'a, T> {
Write { inner: self, bytes }
}
pub async fn write_all(&self, bytes: &[u8]) -> rustix::io::Result<()> {
let mut start = 0;
while start < bytes.len() {
start += self.write(&bytes[start..]).await?;
}
Ok(())
}
}
impl Future for Connect {
type Output = rustix::io::Result<Async<TcpStream>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match this.io.take().unwrap() {
Ok(builder) => {
let builder = match Arc::try_unwrap(builder) {
Ok(builder) => builder,
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
};
match builder.try_finish() {
Ok(Ok(stream)) => Poll::Ready(Ok(Async::new(stream))),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&builder),
cx.waker().clone(),
Readiness::write(),
);
})
.unwrap();
this.io = Some(Ok(builder));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Read<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match rustix::io::read(&*this.inner.io, &mut this.bytes[0..]) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&this.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl<'a, T> Future for Write<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match rustix::io::write(&*self.inner.io, self.bytes) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
pub fn block_on<F: Future>(future: F) -> Result<F::Output, Error> {
Reactor::new()?.block_on(future)
}

48
jitterbug/src/time.rs Normal file
View file

@ -0,0 +1,48 @@
use std::time::{Duration, Instant};
pub use crate::timer::Timer;
pub fn sleep(duration: Duration) -> Timer {
Timer::new(duration)
}
pub fn interval(duration: Duration) -> Interval {
Interval {
next: Instant::now(),
duration,
}
}
pub fn interval_at(instant: Instant, duration: Duration) -> Interval {
Interval {
next: instant,
duration,
}
}
pub struct Interval {
next: Instant,
duration: Duration,
}
impl Interval {
pub fn tick(&mut self) -> Timer {
let now = Instant::now();
while self.next + self.duration <= now {
self.next += self.duration;
}
let duration = if let Some(duration) = now.checked_duration_since(self.next) {
duration
} else if let Some(duration) = self.next.checked_duration_since(now) {
duration
} else {
Duration::from_secs(0)
};
self.next += self.duration;
Timer::new(duration)
}
}

64
jitterbug/src/timer.rs Normal file
View file

@ -0,0 +1,64 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
time::{Duration, Instant},
};
use crate::ReactorRef;
pub struct Timer {
created: Instant,
duration: Duration,
waker: Option<Waker>,
}
impl Timer {
pub fn new(duration: Duration) -> Self {
Timer {
duration,
created: Instant::now(),
waker: None,
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
if let Some(waker) = self.waker.take() {
let _ = ReactorRef::with(move |mut reactor| {
reactor.remove_timer(waker.clone());
});
}
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
let diff = now.checked_duration_since(self.created);
let duration = if let Some(diff) = diff {
if self.duration <= diff {
return Poll::Ready(());
}
self.duration.checked_sub(diff)
} else {
None
};
let duration = duration.unwrap_or(self.duration);
self.as_mut().waker = Some(cx.waker().clone());
ReactorRef::with(|mut reactor| {
reactor.add_timer(duration, cx.waker().clone());
})
.unwrap();
Poll::Pending
}
}