Break out of polldance repo
This commit is contained in:
commit
6eb7cc35cb
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
112
Cargo.lock
generated
Normal file
112
Cargo.lock
generated
Normal file
|
@ -0,0 +1,112 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "async-join"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.asonix.dog/asonix/async-join#67c7725dba6c44261e8ee581aef090276d88cbc2"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.72"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1"
|
||||
dependencies = [
|
||||
"errno-dragonfly",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "errno-dragonfly"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foxtrot"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-join",
|
||||
"polldance",
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-lifetimes"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "768dbad422f45f69c8f5ce59c0802e2681aa3e751c5db8217901607bb2bc24dd"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.118"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06e509672465a0504304aa87f9f176f2b2b716ed8fb105ebe5c02dc6dce96a94"
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.0.40"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5bdc16c6ce4c85d9b46b4e66f2a814be5b3f034dbd5131c268a24ca26d970db8"
|
||||
|
||||
[[package]]
|
||||
name = "polldance"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.asonix.dog/asonix/polldance#b17e24103b3ca66deeb86a3aa9aa55c0583c0dd2"
|
||||
dependencies = [
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.33.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db890a96e64911e67fa84e58ce061a40a6a65c231e5fad20b190933f8991a27c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"errno",
|
||||
"io-lifetimes",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
|
||||
dependencies = [
|
||||
"winapi-i686-pc-windows-gnu",
|
||||
"winapi-x86_64-pc-windows-gnu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-i686-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
13
Cargo.toml
Normal file
13
Cargo.toml
Normal file
|
@ -0,0 +1,13 @@
|
|||
[package]
|
||||
name = "foxtrot"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
polldance = { git = "https://git.asonix.dog/asonix/polldance" }
|
||||
rustix = "0.33.2"
|
||||
|
||||
[dev-dependencies]
|
||||
async-join = { git = "https://git.asonix.dog/asonix/async-join" }
|
42
examples/async-echo-server.rs
Normal file
42
examples/async-echo-server.rs
Normal file
|
@ -0,0 +1,42 @@
|
|||
use async_join::join_all;
|
||||
use foxtrot::Async;
|
||||
|
||||
async fn echo(port: u16) -> Result<(), foxtrot::Error> {
|
||||
let listener = Async::bind([127, 0, 0, 1].into(), port).await?;
|
||||
println!("bound listener");
|
||||
|
||||
loop {
|
||||
let (stream, _addr) = listener.accept().await?;
|
||||
println!("Accepted connection");
|
||||
|
||||
loop {
|
||||
let mut buf = [0; 1024];
|
||||
if let Err(e) = stream.read(&mut buf).await {
|
||||
if e == rustix::io::Error::PIPE {
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(e.into());
|
||||
}
|
||||
if let Err(e) = stream.write_all(&buf).await {
|
||||
if e == rustix::io::Error::PIPE {
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection closed");
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<(), foxtrot::Error> {
|
||||
foxtrot::block_on(async move {
|
||||
for res in join_all(vec![Box::pin(echo(4444)), Box::pin(echo(4445))]).await {
|
||||
res?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?
|
||||
}
|
45
examples/async-echo.rs
Normal file
45
examples/async-echo.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
use async_join::join_all;
|
||||
use foxtrot::Async;
|
||||
use std::net::{SocketAddr, SocketAddrV4, TcpStream};
|
||||
|
||||
async fn echo_to(port: u16) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let sockaddr = SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), port));
|
||||
|
||||
let stream = TcpStream::connect(sockaddr)?;
|
||||
stream.set_nonblocking(true)?;
|
||||
let stream = Async::new(stream);
|
||||
|
||||
println!("Connected");
|
||||
|
||||
loop {
|
||||
let mut buf = [0; 1024];
|
||||
if let Err(e) = stream.read(&mut buf).await {
|
||||
if e == rustix::io::Error::PIPE {
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(e.into());
|
||||
}
|
||||
if let Err(e) = stream.write_all(&buf).await {
|
||||
if e == rustix::io::Error::PIPE {
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection closed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
foxtrot::block_on(async move {
|
||||
for res in join_all(vec![Box::pin(echo_to(4444)), Box::pin(echo_to(4445))]).await {
|
||||
res?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?
|
||||
}
|
28
examples/delay.rs
Normal file
28
examples/delay.rs
Normal file
|
@ -0,0 +1,28 @@
|
|||
use async_join::join_all;
|
||||
use foxtrot::Error;
|
||||
use std::{future::Future, pin::Pin, time::Duration};
|
||||
|
||||
fn main() -> Result<(), Error> {
|
||||
foxtrot::block_on(async move {
|
||||
let f1 = Box::pin(async move {
|
||||
foxtrot::time::sleep(Duration::from_secs(4)).await;
|
||||
println!("waited 4");
|
||||
}) as Pin<Box<dyn Future<Output = ()>>>;
|
||||
|
||||
let f2 = Box::pin(async move {
|
||||
foxtrot::time::sleep(Duration::from_secs(2)).await;
|
||||
println!("waited 2");
|
||||
});
|
||||
|
||||
let f3 = Box::pin(async move {
|
||||
let mut interval = foxtrot::time::interval(Duration::from_secs(1));
|
||||
|
||||
for count in 0..5 {
|
||||
interval.tick().await;
|
||||
println!("count {}", count);
|
||||
}
|
||||
});
|
||||
|
||||
join_all(vec![f1, f2, f3]).await;
|
||||
})
|
||||
}
|
257
src/io.rs
Normal file
257
src/io.rs
Normal file
|
@ -0,0 +1,257 @@
|
|||
use crate::reactor::ReactorRef;
|
||||
use polldance::{
|
||||
net::{TcpListener, TcpListenerBuilder, TcpStream, TcpStreamBuilder},
|
||||
Readiness,
|
||||
};
|
||||
use rustix::{fd::AsFd, net::SocketAddrAny};
|
||||
use std::{
|
||||
future::Future,
|
||||
net::IpAddr,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub struct Async<T: AsFd + 'static> {
|
||||
io: Arc<T>,
|
||||
}
|
||||
|
||||
pub struct Read<'a, T: AsFd + 'static> {
|
||||
inner: &'a Async<T>,
|
||||
bytes: &'a mut [u8],
|
||||
}
|
||||
|
||||
pub struct Write<'a, T: AsFd + 'static> {
|
||||
inner: &'a Async<T>,
|
||||
bytes: &'a [u8],
|
||||
}
|
||||
|
||||
pub struct Accept<'a> {
|
||||
inner: &'a Async<TcpListener>,
|
||||
}
|
||||
|
||||
pub struct Bind {
|
||||
io: Option<rustix::io::Result<Arc<TcpListenerBuilder>>>,
|
||||
}
|
||||
|
||||
pub struct Connect {
|
||||
io: Option<rustix::io::Result<Arc<TcpStreamBuilder>>>,
|
||||
}
|
||||
|
||||
impl<T: AsFd + 'static> Drop for Async<T> {
|
||||
fn drop(&mut self) {
|
||||
let _ = ReactorRef::with(|mut reactor| reactor.deregister(&self.io));
|
||||
}
|
||||
}
|
||||
|
||||
impl Async<TcpListener> {
|
||||
pub fn bind(ip: IpAddr, port: u16) -> Bind {
|
||||
Bind {
|
||||
io: Some(polldance::net::TcpListener::bind(ip, port).map(Arc::new)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn accept(&self) -> Accept<'_> {
|
||||
Accept { inner: self }
|
||||
}
|
||||
}
|
||||
|
||||
impl Async<TcpStream> {
|
||||
pub fn connect(ip: IpAddr, port: u16) -> Connect {
|
||||
Connect {
|
||||
io: Some(polldance::net::TcpStream::connect(ip, port).map(Arc::new)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsFd + 'static> Async<T> {
|
||||
pub fn new(io: T) -> Self {
|
||||
Async { io: Arc::new(io) }
|
||||
}
|
||||
|
||||
pub fn read<'a>(&'a self, bytes: &'a mut [u8]) -> Read<'a, T> {
|
||||
Read { inner: self, bytes }
|
||||
}
|
||||
|
||||
pub async fn read_exact(&self, bytes: &mut [u8]) -> rustix::io::Result<usize> {
|
||||
let mut start = 0;
|
||||
|
||||
while start < bytes.len() {
|
||||
let n = self.read(&mut bytes[start..]).await?;
|
||||
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
start += n;
|
||||
}
|
||||
|
||||
Ok(start)
|
||||
}
|
||||
|
||||
pub fn write<'a>(&'a self, bytes: &'a [u8]) -> Write<'a, T> {
|
||||
Write { inner: self, bytes }
|
||||
}
|
||||
|
||||
pub async fn write_all(&self, bytes: &[u8]) -> rustix::io::Result<()> {
|
||||
let mut start = 0;
|
||||
|
||||
while start < bytes.len() {
|
||||
start += self.write(&bytes[start..]).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Bind {
|
||||
type Output = rustix::io::Result<Async<TcpListener>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut();
|
||||
|
||||
match this.io.take().unwrap() {
|
||||
Ok(builder) => {
|
||||
let builder = match Arc::try_unwrap(builder) {
|
||||
Ok(builder) => builder,
|
||||
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
|
||||
};
|
||||
|
||||
match builder.try_finish() {
|
||||
Ok(Ok(listener)) => Poll::Ready(Ok(Async::new(listener))),
|
||||
Ok(Err(builder)) => {
|
||||
let builder = Arc::new(builder);
|
||||
|
||||
ReactorRef::with(|mut reactor| {
|
||||
reactor.register(
|
||||
Arc::clone(&builder),
|
||||
cx.waker().clone(),
|
||||
Readiness::read(),
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Connect {
|
||||
type Output = rustix::io::Result<Async<TcpStream>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut();
|
||||
|
||||
match this.io.take().unwrap() {
|
||||
Ok(builder) => {
|
||||
let builder = match Arc::try_unwrap(builder) {
|
||||
Ok(builder) => builder,
|
||||
Err(_) => unreachable!("Should never hold the inner Arc more than once"),
|
||||
};
|
||||
|
||||
match builder.try_finish() {
|
||||
Ok(Ok(stream)) => Poll::Ready(Ok(Async::new(stream))),
|
||||
Ok(Err(builder)) => {
|
||||
let builder = Arc::new(builder);
|
||||
|
||||
ReactorRef::with(|mut reactor| {
|
||||
reactor.register(
|
||||
Arc::clone(&builder),
|
||||
cx.waker().clone(),
|
||||
Readiness::write(),
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
this.io = Some(Ok(builder));
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Future for Read<'a, T>
|
||||
where
|
||||
T: AsFd + 'static,
|
||||
{
|
||||
type Output = rustix::io::Result<usize>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.as_mut();
|
||||
|
||||
match rustix::io::read(&*this.inner.io, &mut this.bytes[0..]) {
|
||||
Ok(n) => Poll::Ready(Ok(n)),
|
||||
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
|
||||
ReactorRef::with(|mut reactor| {
|
||||
reactor.register(
|
||||
Arc::clone(&this.inner.io),
|
||||
cx.waker().clone(),
|
||||
Readiness::read() | Readiness::hangup(),
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Future for Write<'a, T>
|
||||
where
|
||||
T: AsFd + 'static,
|
||||
{
|
||||
type Output = rustix::io::Result<usize>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match rustix::io::write(&*self.inner.io, self.bytes) {
|
||||
Ok(n) => Poll::Ready(Ok(n)),
|
||||
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
|
||||
ReactorRef::with(|mut reactor| {
|
||||
reactor.register(
|
||||
Arc::clone(&self.inner.io),
|
||||
cx.waker().clone(),
|
||||
Readiness::write() | Readiness::hangup(),
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Future for Accept<'a> {
|
||||
type Output = rustix::io::Result<(Async<TcpStream>, SocketAddrAny)>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.inner.io.try_accept() {
|
||||
Ok(Some((stream, addr))) => Poll::Ready(Ok((Async::new(stream), addr))),
|
||||
Ok(None) => {
|
||||
ReactorRef::with(|mut reactor| {
|
||||
reactor.register(
|
||||
Arc::clone(&self.inner.io),
|
||||
cx.waker().clone(),
|
||||
Readiness::read() | Readiness::hangup(),
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
13
src/lib.rs
Normal file
13
src/lib.rs
Normal file
|
@ -0,0 +1,13 @@
|
|||
mod io;
|
||||
mod reactor;
|
||||
pub mod time;
|
||||
mod timer;
|
||||
|
||||
use std::future::Future;
|
||||
|
||||
pub use io::Async;
|
||||
pub use reactor::{Error, Reactor, ReactorRef};
|
||||
|
||||
pub fn block_on<F: Future>(future: F) -> Result<F::Output, Error> {
|
||||
Reactor::new()?.block_on(future)
|
||||
}
|
309
src/reactor.rs
Normal file
309
src/reactor.rs
Normal file
|
@ -0,0 +1,309 @@
|
|||
use polldance::{Key, PollManager, Readiness};
|
||||
use rustix::fd::AsFd;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
future::Future,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex, MutexGuard,
|
||||
},
|
||||
task::{Context, Poll, Wake, Waker},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
thread_local! {
|
||||
static REACTOR: Mutex<Option<Arc<Mutex<ReactorState>>>> = Mutex::new(None);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
IO(rustix::io::Error),
|
||||
AlreadyInitialized,
|
||||
Uninitialized,
|
||||
}
|
||||
|
||||
pub struct Reactor {
|
||||
state: Arc<Mutex<ReactorState>>,
|
||||
}
|
||||
|
||||
pub struct ReactorRef<'a> {
|
||||
state: MutexGuard<'a, ReactorState>,
|
||||
}
|
||||
|
||||
pub struct RemoveFlag {
|
||||
inner: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
struct ReactorState {
|
||||
now: Instant,
|
||||
poller: PollManager,
|
||||
io: HashMap<Key, Vec<(Readiness, Waker)>>,
|
||||
timers: BTreeMap<Instant, Vec<(Arc<AtomicBool>, Waker)>>,
|
||||
}
|
||||
|
||||
struct IoWaker {
|
||||
woken: AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::IO(ref e) => std::fmt::Display::fmt(e, f),
|
||||
Self::AlreadyInitialized => {
|
||||
write!(f, "A Reactor has already been initialized on this thread")
|
||||
}
|
||||
Self::Uninitialized => write!(f, "No reactor has been initialized on this thread"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
if let Self::IO(ref e) = self {
|
||||
return Some(e);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ReactorRef<'_> {
|
||||
pub fn with<F, T>(callback: F) -> Result<T, Error>
|
||||
where
|
||||
F: for<'a> FnOnce(ReactorRef<'a>) -> T,
|
||||
T: 'static,
|
||||
{
|
||||
REACTOR.with(|reactor| {
|
||||
let guard = reactor.lock().unwrap();
|
||||
|
||||
if let Some(state) = guard.as_ref() {
|
||||
let res = callback(ReactorRef {
|
||||
state: state.lock().unwrap(),
|
||||
});
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
Err(Error::Uninitialized)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn register<A: AsFd + 'static>(&mut self, io: Arc<A>, waker: Waker, interests: Readiness) {
|
||||
self.state.register(io, waker, interests)
|
||||
}
|
||||
|
||||
pub fn add_timer(&mut self, duration: Duration, waker: Waker) -> RemoveFlag {
|
||||
self.state.add_timer(duration, waker)
|
||||
}
|
||||
|
||||
pub fn deregister<A: AsFd + 'static>(&mut self, io: &Arc<A>) {
|
||||
self.state.deregister(io)
|
||||
}
|
||||
|
||||
pub fn update_interests<A: AsFd + 'static>(
|
||||
&mut self,
|
||||
io: &Arc<A>,
|
||||
callback: impl Fn(Readiness) -> Readiness,
|
||||
) {
|
||||
self.state.update_interests(io, callback);
|
||||
}
|
||||
}
|
||||
|
||||
impl From<rustix::io::Error> for Error {
|
||||
fn from(e: rustix::io::Error) -> Self {
|
||||
Error::IO(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Reactor {
|
||||
fn drop(&mut self) {
|
||||
REACTOR.with(|reactor| {
|
||||
reactor.lock().unwrap().take();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Wake for IoWaker {
|
||||
fn wake(self: Arc<Self>) {
|
||||
self.woken.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
self.woken.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
pub fn new() -> Result<Self, Error> {
|
||||
let state = ReactorState::init()?;
|
||||
|
||||
Ok(Self { state })
|
||||
}
|
||||
|
||||
pub fn block_on<F: Future>(&self, future: F) -> Result<F::Output, Error> {
|
||||
let mut pinned = Box::pin(future);
|
||||
|
||||
let io_waker = Arc::new(IoWaker {
|
||||
woken: AtomicBool::new(true),
|
||||
});
|
||||
let waker = Arc::clone(&io_waker).into();
|
||||
let mut context = Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
if io_waker.woken.swap(false, Ordering::AcqRel) {
|
||||
if let Poll::Ready(output) = pinned.as_mut().poll(&mut context) {
|
||||
return Ok(output);
|
||||
}
|
||||
}
|
||||
|
||||
self.state.lock().unwrap().poll()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoveFlag {
|
||||
fn drop(&mut self) {
|
||||
self.inner.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl ReactorState {
|
||||
fn init() -> Result<Arc<Mutex<Self>>, Error> {
|
||||
REACTOR.with(|reactor| {
|
||||
let mut guard = reactor.lock().unwrap();
|
||||
|
||||
if guard.is_some() {
|
||||
Err(Error::AlreadyInitialized)
|
||||
} else {
|
||||
let this = Arc::new(Mutex::new(Self {
|
||||
now: Instant::now(),
|
||||
poller: PollManager::new()?,
|
||||
io: HashMap::new(),
|
||||
timers: BTreeMap::new(),
|
||||
}));
|
||||
|
||||
*guard = Some(Arc::clone(&this));
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn register<A: AsFd + 'static>(&mut self, io: Arc<A>, waker: Waker, interests: Readiness) {
|
||||
let key = self.poller.register(io, interests);
|
||||
let entry = self.io.entry(key).or_insert_with(Vec::new);
|
||||
|
||||
for (ntrst, wkr) in entry.iter_mut() {
|
||||
if waker.will_wake(wkr) {
|
||||
*ntrst = *ntrst | interests;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
entry.push((interests, waker));
|
||||
}
|
||||
|
||||
fn deregister<A: AsFd + 'static>(&mut self, io: &Arc<A>) {
|
||||
self.poller.deregister(io);
|
||||
}
|
||||
|
||||
fn update_interests<A: AsFd + 'static>(
|
||||
&mut self,
|
||||
io: &Arc<A>,
|
||||
callback: impl Fn(Readiness) -> Readiness,
|
||||
) {
|
||||
self.poller.update_interests(io, callback);
|
||||
}
|
||||
|
||||
fn poll(&mut self) -> Result<(), Error> {
|
||||
let now = self.now();
|
||||
|
||||
let first_wakeup = self.timers.keys().copied().next();
|
||||
|
||||
let timeout = first_wakeup.map(|instant| {
|
||||
instant
|
||||
.checked_duration_since(now)
|
||||
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX))
|
||||
.unwrap_or(0)
|
||||
});
|
||||
|
||||
for (key, readiness) in self.poller.poll(timeout)? {
|
||||
if let Some((key, vec)) = self.io.remove_entry(&key) {
|
||||
let new_vec = vec
|
||||
.into_iter()
|
||||
.filter_map(|(interests, waker)| {
|
||||
if interests.is_intersect(readiness) {
|
||||
waker.wake();
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((interests, waker))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let readiness = new_vec
|
||||
.iter()
|
||||
.fold(Readiness::empty(), |readiness, (current, _)| {
|
||||
readiness | *current
|
||||
});
|
||||
|
||||
if new_vec.is_empty() || readiness.is_empty() {
|
||||
self.poller.deregister_by_key(key);
|
||||
} else {
|
||||
self.poller.update_interests_by_key(&key, |_| readiness);
|
||||
self.io.insert(key, new_vec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.check_timers();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn now(&mut self) -> Instant {
|
||||
let now = Instant::now();
|
||||
|
||||
if now.duration_since(self.now) >= Duration::from_millis(1) {
|
||||
self.now = now;
|
||||
}
|
||||
|
||||
self.now
|
||||
}
|
||||
|
||||
fn add_timer(&mut self, duration: Duration, waker: Waker) -> RemoveFlag {
|
||||
let now = self.now();
|
||||
|
||||
let expires = now + duration;
|
||||
|
||||
let flag = Arc::new(AtomicBool::new(false));
|
||||
|
||||
self.timers
|
||||
.entry(expires)
|
||||
.or_insert_with(Vec::new)
|
||||
.push((Arc::clone(&flag), waker));
|
||||
|
||||
RemoveFlag { inner: flag }
|
||||
}
|
||||
|
||||
fn check_timers(&mut self) {
|
||||
let now = self.now();
|
||||
|
||||
let expired = self
|
||||
.timers
|
||||
.keys()
|
||||
.copied()
|
||||
.take_while(|instant| *instant <= now)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for timer in expired {
|
||||
if let Some(wakers) = self.timers.remove(&timer) {
|
||||
for (flag, waker) in wakers {
|
||||
if !flag.load(Ordering::Acquire) {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
48
src/time.rs
Normal file
48
src/time.rs
Normal file
|
@ -0,0 +1,48 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
pub use crate::timer::Timer;
|
||||
|
||||
pub fn sleep(duration: Duration) -> Timer {
|
||||
Timer::new(duration)
|
||||
}
|
||||
|
||||
pub fn interval(duration: Duration) -> Interval {
|
||||
Interval {
|
||||
next: Instant::now(),
|
||||
duration,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn interval_at(instant: Instant, duration: Duration) -> Interval {
|
||||
Interval {
|
||||
next: instant,
|
||||
duration,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Interval {
|
||||
next: Instant,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
impl Interval {
|
||||
pub fn tick(&mut self) -> Timer {
|
||||
let now = Instant::now();
|
||||
|
||||
while self.next + self.duration <= now {
|
||||
self.next += self.duration;
|
||||
}
|
||||
|
||||
let duration = if let Some(duration) = now.checked_duration_since(self.next) {
|
||||
duration
|
||||
} else if let Some(duration) = self.next.checked_duration_since(now) {
|
||||
duration
|
||||
} else {
|
||||
Duration::from_secs(0)
|
||||
};
|
||||
|
||||
self.next += self.duration;
|
||||
|
||||
Timer::new(duration)
|
||||
}
|
||||
}
|
52
src/timer.rs
Normal file
52
src/timer.rs
Normal file
|
@ -0,0 +1,52 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{reactor::RemoveFlag, ReactorRef};
|
||||
|
||||
pub struct Timer {
|
||||
created: Instant,
|
||||
duration: Duration,
|
||||
remove_flag: Option<RemoveFlag>,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
pub fn new(duration: Duration) -> Self {
|
||||
Timer {
|
||||
duration,
|
||||
created: Instant::now(),
|
||||
remove_flag: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Timer {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let now = Instant::now();
|
||||
|
||||
let diff = now.checked_duration_since(self.created);
|
||||
|
||||
let duration = if let Some(diff) = diff {
|
||||
if self.duration <= diff {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
self.duration.checked_sub(diff)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let duration = duration.unwrap_or(self.duration);
|
||||
|
||||
self.remove_flag = Some(
|
||||
ReactorRef::with(|mut reactor| reactor.add_timer(duration, cx.waker().clone()))
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue