Split poll fd from IO fd

This commit is contained in:
Aode (lion) 2022-03-05 15:17:18 -06:00
parent c4601b38e9
commit e8f6bc46f7
3 changed files with 46 additions and 108 deletions

2
Cargo.lock generated
View file

@ -84,7 +84,7 @@ checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"
[[package]]
name = "polldance"
version = "0.1.0"
source = "git+https://git.asonix.dog/safe-async/polldance#44c92daf0e1877d351c2b16e8dff07f817c221bd"
source = "git+https://git.asonix.dog/safe-async/polldance#0496ed7c131432d1d0b8b1326d6e1446e9aeb385"
dependencies = [
"rustix",
]

150
src/io.rs
View file

@ -9,9 +9,10 @@ use std::{
};
pub use polldance::{
fd::{AsFd, OwnedFd},
io::{Error, Nonblocking, ReadBytes, Result},
net::SocketAddrAny,
AsFd, Readiness,
Readiness,
};
macro_rules! poll_nonblocking {
@ -34,18 +35,13 @@ macro_rules! poll_std {
}};
}
enum MaybeArc<T> {
Arc(Arc<T>),
Owned(T),
Intermediate,
}
pub struct Async<T: AsFd + 'static> {
io: MaybeArc<T>,
source: Arc<OwnedFd>,
io: T,
}
struct Ready<'a, T: AsFd + 'static> {
io: &'a Arc<T>,
struct Ready<'a> {
source: &'a Arc<OwnedFd>,
interests: Readiness,
}
@ -77,7 +73,7 @@ struct Connect {
impl<T: AsFd + 'static> Drop for Async<T> {
fn drop(&mut self) {
let _ = ReactorRef::with(|mut reactor| reactor.deregister(self.io.ensure_arc()));
let _ = ReactorRef::with(|mut reactor| reactor.deregister(&self.source));
}
}
@ -87,24 +83,23 @@ impl Async<TcpListener> {
io: Some(polldance::net::TcpListener::bind(socket_address).map(Arc::new)),
}
.await
.map(Async::new)
}
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(Async<TcpStream>, Option<SocketAddrAny>)>> {
let poll = self
.io
.as_ref()
.try_accept()
.map(|nonblock| nonblock.map(|(stream, addr)| (Async::new(stream), addr)));
poll_nonblocking!(poll);
match self.io.try_accept() {
Ok(Nonblocking::Ready((stream, addr))) => {
return Poll::Ready(Async::new(stream).map(|stream| (stream, addr)))
}
Ok(Nonblocking::WouldBlock) => {}
Err(e) => return Poll::Ready(Err(e)),
};
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
Arc::clone(&self.source),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
@ -125,20 +120,18 @@ impl Async<TcpStream> {
io: Some(polldance::net::TcpStream::connect(socket_address).map(Arc::new)),
}
.await
.map(Async::new)
}
}
impl<T: AsFd + 'static> Async<T> {
pub fn new(io: T) -> Self {
Async {
io: MaybeArc::Owned(io),
}
pub fn new(io: T) -> Result<Self> {
let source = Arc::new(polldance::fd::try_clone(&io)?);
Ok(Async { source, io })
}
pub async fn ready(&mut self, interests: Readiness) -> Result<Readiness> {
Ready {
io: self.io.ensure_arc(),
source: &self.source,
interests,
}
.await
@ -155,15 +148,11 @@ impl<T: AsFd + 'static> Async<T> {
where
T: std::io::Read,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Read::read(io, buf))
} else {
println!("Read: Failed to own");
}
poll_std!(std::io::Read::read(&mut self.io, buf));
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
Arc::clone(&self.source),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
@ -177,7 +166,7 @@ impl<T: AsFd + 'static> Async<T> {
where
T: std::io::Read,
{
polldance::io::try_read(self.io.as_ref(), buf)
polldance::io::try_read(&self.io, buf)
}
pub async fn read_exact(&mut self, bytes: &mut [u8]) -> Result<usize>
@ -207,15 +196,11 @@ impl<T: AsFd + 'static> Async<T> {
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::write(io, buf))
} else {
println!("Write: Failed to own");
}
poll_std!(std::io::Write::write(&mut self.io, buf));
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
Arc::clone(&self.source),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
@ -229,7 +214,7 @@ impl<T: AsFd + 'static> Async<T> {
where
T: std::io::Write,
{
polldance::io::try_write(self.io.as_ref(), buf)
polldance::io::try_write(&self.io, buf)
}
pub async fn write_all(&mut self, bytes: &[u8]) -> Result<()>
@ -256,13 +241,11 @@ impl<T: AsFd + 'static> Async<T> {
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::flush(io))
}
poll_std!(std::io::Write::flush(&mut self.io));
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
Arc::clone(&self.source),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
@ -285,20 +268,19 @@ where
) -> Poll<Result<()>> {
let this = self.get_mut();
if let Some(io) = this.io.ensure_owned() {
let res = match std::io::Read::read(io, buf.initialize_unfilled()) {
Ok(n) => {
buf.advance(n);
Ok(())
}
Err(e) => Err(e),
};
poll_std!(res)
}
let res = match std::io::Read::read(&mut this.io, buf.initialize_unfilled()) {
Ok(n) => {
buf.advance(n);
Ok(())
}
Err(e) => Err(e),
};
poll_std!(res);
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(this.io.ensure_arc()),
Arc::clone(&this.source),
cx.waker().clone(),
Readiness::read() | Readiness::hangup(),
);
@ -360,7 +342,7 @@ where
}
impl Future for Bind {
type Output = Result<TcpListener>;
type Output = Result<Async<TcpListener>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
@ -373,7 +355,7 @@ impl Future for Bind {
};
match builder.try_finish() {
Ok(Ok(listener)) => Poll::Ready(Ok(listener)),
Ok(Ok(listener)) => Poll::Ready(Async::new(listener)),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
@ -397,7 +379,7 @@ impl Future for Bind {
}
impl Future for Connect {
type Output = Result<TcpStream>;
type Output = Result<Async<TcpStream>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
@ -410,7 +392,7 @@ impl Future for Connect {
};
match builder.try_finish() {
Ok(Ok(stream)) => Poll::Ready(Ok(stream)),
Ok(Ok(stream)) => Poll::Ready(Async::new(stream)),
Ok(Err(builder)) => {
let builder = Arc::new(builder);
@ -484,63 +466,19 @@ impl<'a> Future for Accept<'a> {
}
}
impl<'a, T> Future for Ready<'a, T>
where
T: AsFd + 'static,
{
impl<'a> Future for Ready<'a> {
type Output = Result<Readiness>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let interests = self.interests | Readiness::hangup();
poll_nonblocking!(polldance::io::try_ready(self.io.as_ref(), interests));
poll_nonblocking!(polldance::io::try_ready(self.source.as_ref(), interests));
ReactorRef::with(|mut reactor| {
reactor.register(Arc::clone(self.io), cx.waker().clone(), interests);
reactor.register(Arc::clone(self.source), cx.waker().clone(), interests);
})
.unwrap();
Poll::Pending
}
}
impl<T> MaybeArc<T> {
fn ensure_arc(&mut self) -> &Arc<T> {
*self = match std::mem::replace(self, MaybeArc::Intermediate) {
MaybeArc::Owned(owned) => MaybeArc::Arc(Arc::new(owned)),
MaybeArc::Arc(arc) => MaybeArc::Arc(arc),
MaybeArc::Intermediate => unreachable!("Should never be intermediate"),
};
if let MaybeArc::Arc(arc) = self {
return arc;
}
unreachable!("We should always have an Arc by the end")
}
fn ensure_owned(&mut self) -> Option<&mut T> {
*self = match std::mem::replace(self, MaybeArc::Intermediate) {
MaybeArc::Arc(arc) => match Arc::try_unwrap(arc) {
Ok(owned) => MaybeArc::Owned(owned),
Err(arc) => MaybeArc::Arc(arc),
},
MaybeArc::Owned(owned) => MaybeArc::Owned(owned),
MaybeArc::Intermediate => unreachable!("Should never be intermediate"),
};
if let MaybeArc::Owned(owned) = self {
return Some(owned);
}
None
}
fn as_ref(&self) -> &T {
match self {
MaybeArc::Arc(arc) => arc.as_ref(),
MaybeArc::Owned(owned) => owned,
_ => unreachable!("Should never have intermediate state"),
}
}
}

View file

@ -1,4 +1,4 @@
use polldance::{AsFd, Key, NotifyToken, PollManager, Readiness};
use polldance::{fd::AsFd, Key, NotifyToken, PollManager, Readiness};
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},