113 lines
3.8 KiB
Rust
113 lines
3.8 KiB
Rust
use polldance::{
|
|
io::{Nonblocking, ReadBytes},
|
|
net::TcpStream,
|
|
PollManager, Readiness,
|
|
};
|
|
use read_write_buf::ReadWriteBuf;
|
|
use std::sync::Arc;
|
|
|
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
let mut poller = PollManager::new()?;
|
|
|
|
let mut builder = Arc::new(TcpStream::connect(([127, 0, 0, 1], 4444))?);
|
|
let mut builder_key = poller.register(Arc::clone(&builder), Readiness::write());
|
|
|
|
let stream = 'l: loop {
|
|
for (key, readiness) in poller.poll(None)? {
|
|
if builder_key.is_key(key) && readiness.is_write() {
|
|
poller.deregister(&builder);
|
|
|
|
let bldr = match Arc::try_unwrap(builder) {
|
|
Ok(bldr) => bldr,
|
|
Err(_) => unreachable!("Only one Arc clone now"),
|
|
};
|
|
|
|
println!("Trying connect");
|
|
match bldr.try_finish()? {
|
|
Ok(stream) => break 'l Arc::new(stream),
|
|
Err(bldr) => {
|
|
builder = Arc::new(bldr);
|
|
|
|
builder_key = poller.register(Arc::clone(&builder), Readiness::write());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
println!("Connected");
|
|
|
|
let mut notify_token = poller.notifier();
|
|
let handle = std::thread::spawn(move || {
|
|
loop {
|
|
std::thread::sleep(std::time::Duration::from_secs(5));
|
|
match notify_token.notify() {
|
|
Ok(c) => notify_token = c,
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
println!("broken");
|
|
});
|
|
|
|
let stream_key = poller.register(Arc::clone(&stream), Readiness::read() | Readiness::hangup());
|
|
let mut ring_buf = ReadWriteBuf::<10>::new();
|
|
|
|
'l2: loop {
|
|
for (key, readiness) in poller.poll(None)? {
|
|
if stream_key.is_key(key) {
|
|
if readiness.is_hangup() {
|
|
break 'l2;
|
|
}
|
|
|
|
if readiness.is_read() {
|
|
while let Some(buf) = ring_buf.for_reading() {
|
|
match stream.try_read(buf)? {
|
|
Nonblocking::Ready(ReadBytes::Read(n)) => {
|
|
let n: usize = n.into();
|
|
let should_break = n < buf.len();
|
|
ring_buf.advance_read(n);
|
|
if should_break {
|
|
break;
|
|
}
|
|
}
|
|
Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => break 'l2,
|
|
Nonblocking::Ready(ReadBytes::EOF) | Nonblocking::WouldBlock => break,
|
|
}
|
|
}
|
|
|
|
if !ring_buf.is_empty() {
|
|
poller
|
|
.update_interests(&stream, |interests| Readiness::write() | interests);
|
|
}
|
|
}
|
|
|
|
if readiness.is_write() {
|
|
while let Some(buf) = ring_buf.for_writing() {
|
|
match stream.try_write(buf)? {
|
|
Nonblocking::Ready(n) => {
|
|
ring_buf.advance_write(n);
|
|
if ring_buf.is_empty() {
|
|
poller.update_interests(&stream, |_| {
|
|
Readiness::read() | Readiness::hangup()
|
|
});
|
|
}
|
|
}
|
|
Nonblocking::WouldBlock => break,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
println!("Woken");
|
|
}
|
|
|
|
poller.deregister(&stream);
|
|
drop(poller);
|
|
|
|
println!("Disconnected");
|
|
|
|
handle.join().unwrap();
|
|
|
|
Ok(())
|
|
}
|