Add .ready() future, hide manual futures, simplify io methods

This commit is contained in:
Aode (lion) 2022-02-16 11:11:55 -05:00
parent 49236c251a
commit e7ce722285
3 changed files with 137 additions and 81 deletions

6
Cargo.lock generated
View file

@ -51,9 +51,9 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "0.5.1"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "768dbad422f45f69c8f5ce59c0802e2681aa3e751c5db8217901607bb2bc24dd"
checksum = "ec58677acfea8a15352d42fc87d11d63596ade9239e0a7c9352914417515dbe6"
[[package]]
name = "libc"
@ -70,7 +70,7 @@ checksum = "5bdc16c6ce4c85d9b46b4e66f2a814be5b3f034dbd5131c268a24ca26d970db8"
[[package]]
name = "polldance"
version = "0.1.0"
source = "git+https://git.asonix.dog/safe-async/polldance#b17e24103b3ca66deeb86a3aa9aa55c0583c0dd2"
source = "git+https://git.asonix.dog/safe-async/polldance#ae196a2ecf1c92036b07ff9f5288bc135fa7f6e9"
dependencies = [
"rustix",
]

206
src/io.rs
View file

@ -1,8 +1,5 @@
use crate::reactor::ReactorRef;
use polldance::{
net::{TcpListener, TcpListenerBuilder, TcpStream, TcpStreamBuilder},
Readiness,
};
use polldance::net::{TcpListener, TcpListenerBuilder, TcpStream, TcpStreamBuilder};
use rustix::{fd::AsFd, net::SocketAddrAny};
use std::{
future::Future,
@ -12,29 +9,49 @@ use std::{
task::{Context, Poll},
};
pub use polldance::{
io::{Nonblocking, ReadBytes},
Readiness,
};
macro_rules! poll_nonblocking {
($expr:expr) => {{
match $expr {
Ok(Nonblocking::Ready(val)) => return Poll::Ready(Ok(val)),
Err(e) => return Poll::Ready(Err(e)),
Ok(Nonblocking::WouldBlock) => {}
}
}};
}
pub struct Async<T: AsFd + 'static> {
io: Arc<T>,
}
pub struct Read<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
struct Ready<'a, T: AsFd + 'static> {
io: &'a Arc<T>,
interests: Readiness,
}
struct Read<'a, T: AsFd + 'static> {
io: &'a Arc<T>,
bytes: &'a mut [u8],
}
pub struct Write<'a, T: AsFd + 'static> {
inner: &'a Async<T>,
struct Write<'a, T: AsFd + 'static> {
io: &'a Arc<T>,
bytes: &'a [u8],
}
pub struct Accept<'a> {
inner: &'a Async<TcpListener>,
struct Accept<'a> {
io: &'a Arc<TcpListener>,
}
pub struct Bind {
struct Bind {
io: Option<rustix::io::Result<Arc<TcpListenerBuilder>>>,
}
pub struct Connect {
struct Connect {
io: Option<rustix::io::Result<Arc<TcpStreamBuilder>>>,
}
@ -45,22 +62,28 @@ impl<T: AsFd + 'static> Drop for Async<T> {
}
impl Async<TcpListener> {
pub fn bind(ip: IpAddr, port: u16) -> Bind {
pub async fn bind(ip: IpAddr, port: u16) -> rustix::io::Result<Async<TcpListener>> {
Bind {
io: Some(polldance::net::TcpListener::bind(ip, port).map(Arc::new)),
}
.await
.map(Async::new)
}
pub fn accept(&self) -> Accept<'_> {
Accept { inner: self }
pub async fn accept(&self) -> rustix::io::Result<(Async<TcpStream>, SocketAddrAny)> {
let (stream, addr) = Accept { io: &self.io }.await?;
Ok((Async::new(stream), addr))
}
}
impl Async<TcpStream> {
pub fn connect(ip: IpAddr, port: u16) -> Connect {
pub async fn connect(ip: IpAddr, port: u16) -> rustix::io::Result<Async<TcpStream>> {
Connect {
io: Some(polldance::net::TcpStream::connect(ip, port).map(Arc::new)),
}
.await
.map(Async::new)
}
}
@ -69,28 +92,49 @@ impl<T: AsFd + 'static> Async<T> {
Async { io: Arc::new(io) }
}
pub fn read<'a>(&'a self, bytes: &'a mut [u8]) -> Read<'a, T> {
Read { inner: self, bytes }
pub async fn ready(&self, interests: Readiness) -> rustix::io::Result<Readiness> {
Ready {
io: &self.io,
interests,
}
.await
}
pub async fn read(&self, bytes: &mut [u8]) -> rustix::io::Result<ReadBytes> {
Read {
io: &self.io,
bytes,
}
.await
}
pub fn read_nonblocking(&self, buf: &mut [u8]) -> rustix::io::Result<Nonblocking<ReadBytes>> {
polldance::io::try_read(self.io.as_ref(), buf)
}
pub async fn read_exact(&self, bytes: &mut [u8]) -> rustix::io::Result<usize> {
let mut start = 0;
while start < bytes.len() {
let n = self.read(&mut bytes[start..]).await?;
if n == 0 {
break;
match self.read(&mut bytes[start..]).await? {
ReadBytes::EOF => break,
ReadBytes::Read(n) => start += usize::from(n),
}
start += n;
}
Ok(start)
}
pub fn write<'a>(&'a self, bytes: &'a [u8]) -> Write<'a, T> {
Write { inner: self, bytes }
pub async fn write(&self, bytes: &[u8]) -> rustix::io::Result<usize> {
Write {
io: &self.io,
bytes,
}
.await
}
pub fn write_nonblocking(&self, buf: &[u8]) -> rustix::io::Result<Nonblocking<usize>> {
polldance::io::try_write(self.io.as_ref(), buf)
}
pub async fn write_all(&self, bytes: &[u8]) -> rustix::io::Result<()> {
@ -105,7 +149,7 @@ impl<T: AsFd + 'static> Async<T> {
}
impl Future for Bind {
type Output = rustix::io::Result<Async<TcpListener>>;
type Output = rustix::io::Result<TcpListener>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
@ -118,7 +162,7 @@ impl Future for Bind {
};
match builder.try_finish() {
Ok(Ok(listener)) => Poll::Ready(Ok(Async::new(listener))),
Ok(Ok(listener)) => Poll::Ready(Ok(listener)),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
@ -142,7 +186,7 @@ impl Future for Bind {
}
impl Future for Connect {
type Output = rustix::io::Result<Async<TcpStream>>;
type Output = rustix::io::Result<TcpStream>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
@ -155,7 +199,7 @@ impl Future for Connect {
};
match builder.try_finish() {
Ok(Ok(stream)) => Poll::Ready(Ok(Async::new(stream))),
Ok(Ok(stream)) => Poll::Ready(Ok(stream)),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
@ -183,27 +227,23 @@ impl<'a, T> Future for Read<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<usize>;
type Output = rustix::io::Result<ReadBytes>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
match rustix::io::read(&*this.inner.io, &mut this.bytes[0..]) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&this.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
poll_nonblocking!(polldance::io::try_read(this.io.as_ref(), this.bytes));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(this.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
}
@ -214,44 +254,56 @@ where
type Output = rustix::io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match rustix::io::write(&*self.inner.io, self.bytes) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e == rustix::io::Error::WOULDBLOCK || e == rustix::io::Error::AGAIN => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
poll_nonblocking!(polldance::io::try_write(self.io.as_ref(), self.bytes));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
}
impl<'a> Future for Accept<'a> {
type Output = rustix::io::Result<(Async<TcpStream>, SocketAddrAny)>;
type Output = rustix::io::Result<(TcpStream, SocketAddrAny)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.io.try_accept() {
Ok(Some((stream, addr))) => Poll::Ready(Ok((Async::new(stream), addr))),
Ok(None) => {
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(&self.inner.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
poll_nonblocking!(self.io.try_accept());
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
}
impl<'a, T> Future for Ready<'a, T>
where
T: AsFd + 'static,
{
type Output = rustix::io::Result<Readiness>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let interests = self.interests | Readiness::hangup();
poll_nonblocking!(polldance::io::try_ready(self.io.as_ref(), interests));
ReactorRef::with(|mut reactor| {
reactor.register(Arc::clone(self.io), cx.waker().clone(), interests);
})
.unwrap();
Poll::Pending
}
}

View file

@ -1,4 +1,4 @@
mod io;
pub mod io;
mod reactor;
pub mod time;
mod timer;
@ -11,3 +11,7 @@ pub use reactor::{Error, Reactor, ReactorRef};
pub fn block_on<F: Future>(future: F) -> Result<F::Output, Error> {
Reactor::new()?.block_on(future)
}
pub mod net {
pub use polldance::net::{TcpListener, TcpStream};
}