jitterbug/examples/io.rs
2022-03-07 14:28:16 -06:00

99 lines
3.4 KiB
Rust

use foxtrot::{
io::{Nonblocking, Readiness},
net::TcpListener,
Async,
};
use jitterbug::Executor;
use read_write_buf::ReadWriteBuf;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let executor = Executor::new();
for _ in 0..4 {
let execu2r = executor.clone();
std::thread::spawn(move || {
let _ = foxtrot::block_on(execu2r.into_runner());
});
}
let execu2r = executor.clone();
foxtrot::block_on(executor.run_with(async move {
let executor = execu2r.clone();
executor.spawn(async move {
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {
Ok(listener) => listener,
Err(_) => return,
};
println!("Listening on port 3456");
loop {
let (mut stream, _addr) = match listener.accept().await {
Ok(tup) => tup,
Err(_) => return,
};
execu2r.spawn(async move {
println!("Accepted stream");
let mut ring_buf = ReadWriteBuf::<10>::new();
let mut interests = Readiness::read();
'l2: loop {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
break;
}
if readiness.is_read() {
while let Some(buf) = ring_buf.for_reading() {
match stream.read_nonblocking(buf)? {
Nonblocking::Ready(n) if n > 0 => {
let should_break = n < buf.len();
ring_buf.advance_read(n);
if should_break {
break;
}
}
Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2,
Nonblocking::Ready(_) | Nonblocking::WouldBlock => break,
}
}
if !ring_buf.is_empty() {
interests = Readiness::read() | Readiness::write();
}
}
if readiness.is_write() {
while let Some(buf) = ring_buf.for_writing() {
match stream.write_nonblocking(buf)? {
Nonblocking::Ready(n) => {
ring_buf.advance_write(n);
if ring_buf.is_empty() {
interests = Readiness::read();
}
}
Nonblocking::WouldBlock => break,
}
}
}
}
println!("Stream closed");
Ok(()) as foxtrot::io::Result<()>
});
}
});
foxtrot::time::sleep(Duration::from_secs(60 * 2)).await;
executor.stop();
println!("Stopping");
}))??;
Ok(())
}