use foxtrot::{ io::{Nonblocking, ReadBytes, Readiness}, Async, }; use jitterbug::Executor; use read_write_buf::ReadWriteBuf; use std::time::Duration; fn main() -> Result<(), Box> { let executor = Executor::new(); for _ in 0..4 { let execu2r = executor.clone(); std::thread::spawn(move || { let _ = foxtrot::block_on(execu2r.into_runner()); }); } let runner = executor.clone().into_runner(); let execu2r = executor.clone(); executor.spawn(async move { let listener = match Async::bind([127, 0, 0, 1].into(), 3456).await { Ok(listener) => listener, Err(_) => return, }; println!("Listening on port 3456"); loop { let (stream, _addr) = match listener.accept().await { Ok(tup) => tup, Err(_) => return, }; execu2r.spawn(async move { println!("Accepted stream"); let mut ring_buf = ReadWriteBuf::<10>::new(); let mut interests = Readiness::read(); 'l2: loop { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { break; } if readiness.is_read() { while let Some(buf) = ring_buf.for_reading() { match stream.read_nonblocking(buf)? { Nonblocking::Ready(ReadBytes::Read(n)) => { let n: usize = n.into(); let should_break = n < buf.len(); ring_buf.advance_read(n); if should_break { break; } } Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => { break 'l2 } Nonblocking::Ready(ReadBytes::EOF) | Nonblocking::WouldBlock => { break } } } if !ring_buf.is_empty() { interests = Readiness::read() | Readiness::write(); } } if readiness.is_write() { while let Some(buf) = ring_buf.for_writing() { match stream.write_nonblocking(buf)? { Nonblocking::Ready(n) => { ring_buf.advance_write(n); if ring_buf.is_empty() { interests = Readiness::read(); } } Nonblocking::WouldBlock => break, } } } } println!("Stream closed"); Ok(()) as foxtrot::io::Result<()> }); } }); let execu2r = executor.clone(); executor.spawn(async move { foxtrot::time::sleep(Duration::from_secs(60 * 2)).await; execu2r.stop(); println!("Stopping"); }); foxtrot::block_on(runner)?; Ok(()) }