Put futures compat behind flag

This commit is contained in:
Aode (lion) 2022-03-03 20:45:51 -06:00
parent 97088874f7
commit 8ac154cec4
2 changed files with 68 additions and 41 deletions

View file

@ -4,10 +4,13 @@ version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
futures-compat = ["futures-io"]
[dependencies]
polldance = { git = "https://git.asonix.dog/safe-async/polldance" }
futures-io = "0.3.21"
futures-io = { version = "0.3.21", optional = true }
[dev-dependencies]
read-write-buf = { git = "https://git.asonix.dog/safe-async/read-write-buf" }

104
src/io.rs
View file

@ -59,6 +59,10 @@ struct Write<'a, T: AsFd + 'static> {
bytes: &'a [u8],
}
struct Flush<'a, T: AsFd + 'static> {
io: &'a mut Async<T>,
}
struct Accept<'a> {
io: &'a Arc<TcpListener>,
}
@ -148,46 +152,6 @@ impl<T: AsFd + 'static> Async<T> {
Poll::Pending
}
fn futures_poll_write(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::write(io, buf))
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
fn futures_poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::flush(io))
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
pub fn read_nonblocking(&self, buf: &mut [u8]) -> Result<Nonblocking<ReadBytes>>
where
T: std::io::Read,
@ -218,6 +182,26 @@ impl<T: AsFd + 'static> Async<T> {
Write { io: self, bytes }.await
}
fn futures_poll_write(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>>
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::write(io, buf))
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
pub fn write_nonblocking(&self, buf: &[u8]) -> Result<Nonblocking<usize>>
where
T: std::io::Write,
@ -237,8 +221,36 @@ impl<T: AsFd + 'static> Async<T> {
Ok(())
}
pub async fn flush(&mut self) -> Result<()>
where
T: std::io::Write + Unpin,
{
Flush { io: self }.await
}
fn futures_poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>
where
T: std::io::Write,
{
if let Some(io) = self.io.ensure_owned() {
poll_std!(std::io::Write::flush(io))
}
ReactorRef::with(|mut reactor| {
reactor.register(
Arc::clone(self.io.ensure_arc()),
cx.waker().clone(),
Readiness::write() | Readiness::hangup(),
);
})
.unwrap();
Poll::Pending
}
}
#[cfg(feature = "futures_compat")]
impl<T> futures_io::AsyncRead for Async<T>
where
T: std::io::Read + AsFd + Unpin,
@ -254,6 +266,7 @@ where
}
}
#[cfg(feature = "futures_compat")]
impl<'a, T> futures_io::AsyncWrite for Async<T>
where
T: std::io::Write + AsFd + Unpin,
@ -383,6 +396,17 @@ where
}
}
impl<'a, T> Future for Flush<'a, T>
where
T: std::io::Write + AsFd + Unpin + 'static,
{
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().io.futures_poll_flush(cx)
}
}
impl<'a> Future for Accept<'a> {
type Output = Result<(TcpStream, Option<SocketAddrAny>)>;