Add ring buffer to echo example
This commit is contained in:
parent
53ef4552f6
commit
4afb537985
12
Cargo.lock
generated
12
Cargo.lock
generated
|
@ -15,9 +15,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
|||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.0.72"
|
||||
version = "1.0.73"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee"
|
||||
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
|
@ -46,6 +46,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"async-join",
|
||||
"polldance",
|
||||
"read-write-buf",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -69,11 +70,16 @@ checksum = "5bdc16c6ce4c85d9b46b4e66f2a814be5b3f034dbd5131c268a24ca26d970db8"
|
|||
[[package]]
|
||||
name = "polldance"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.asonix.dog/safe-async/polldance#fc3bdc92c53625487af776b7edd1a1b7f4bc587b"
|
||||
source = "git+https://git.asonix.dog/safe-async/polldance#28909cce76e42d5c6310db780fb254f08e9627ef"
|
||||
dependencies = [
|
||||
"rustix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "read-write-buf"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.asonix.dog/asonix/read-write-buf#c3a7fc23ed51f12d3cd961f2be5a45585c196db5"
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.33.2"
|
||||
|
|
|
@ -9,4 +9,5 @@ edition = "2021"
|
|||
polldance = { git = "https://git.asonix.dog/safe-async/polldance" }
|
||||
|
||||
[dev-dependencies]
|
||||
read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" }
|
||||
async-join = { git = "https://git.asonix.dog/safe-async/async-join" }
|
||||
|
|
|
@ -3,6 +3,7 @@ use foxtrot::{
|
|||
io::{Nonblocking, ReadBytes, Readiness},
|
||||
Async,
|
||||
};
|
||||
use read_write_buf::ReadWriteBuf;
|
||||
|
||||
async fn echo(port: u16) -> Result<(), foxtrot::Error> {
|
||||
let listener = Async::bind([127, 0, 0, 1].into(), port).await?;
|
||||
|
@ -12,9 +13,8 @@ async fn echo(port: u16) -> Result<(), foxtrot::Error> {
|
|||
let (stream, _addr) = listener.accept().await?;
|
||||
println!("Accepted connection");
|
||||
|
||||
let mut bytes = vec![];
|
||||
let mut interests = Readiness::read();
|
||||
let mut buf = [0; 1024];
|
||||
let mut ring_buf = ReadWriteBuf::<10>::new();
|
||||
|
||||
'l2: loop {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
@ -24,34 +24,37 @@ async fn echo(port: u16) -> Result<(), foxtrot::Error> {
|
|||
}
|
||||
|
||||
if readiness.is_read() {
|
||||
loop {
|
||||
match stream.read_nonblocking(&mut buf)? {
|
||||
while let Some(buf) = ring_buf.for_reading() {
|
||||
match stream.read_nonblocking(buf)? {
|
||||
Nonblocking::Ready(ReadBytes::Read(n)) => {
|
||||
let n: usize = n.into();
|
||||
bytes.extend_from_slice(&buf[0..n]);
|
||||
if n < buf.len() {
|
||||
let should_break = n < buf.len();
|
||||
ring_buf.advance_read(n);
|
||||
if should_break {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Nonblocking::Ready(ReadBytes::EOF) if bytes.is_empty() => break 'l2,
|
||||
Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => break 'l2,
|
||||
Nonblocking::Ready(ReadBytes::EOF) | Nonblocking::WouldBlock => break,
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.is_empty() {
|
||||
if !ring_buf.is_empty() {
|
||||
interests = Readiness::read() | Readiness::write();
|
||||
}
|
||||
}
|
||||
|
||||
if readiness.is_write() {
|
||||
match stream.write_nonblocking(&bytes)? {
|
||||
Nonblocking::Ready(n) => {
|
||||
bytes = bytes.split_off(n);
|
||||
if bytes.is_empty() {
|
||||
interests = Readiness::read();
|
||||
while let Some(buf) = ring_buf.for_writing() {
|
||||
match stream.write_nonblocking(buf)? {
|
||||
Nonblocking::Ready(n) => {
|
||||
ring_buf.advance_write(n);
|
||||
if ring_buf.is_empty() {
|
||||
interests = Readiness::read();
|
||||
}
|
||||
}
|
||||
Nonblocking::WouldBlock => {}
|
||||
}
|
||||
Nonblocking::WouldBlock => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue