use jive::{ io::{Async, Nonblocking, Readiness}, net::TcpListener, }; use read_write_buf::ReadWriteBuf; const CHUNKED_HEAD: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nTransfer-Encoding: chunked\r\n\r\n"; const EMPTY_HEAD: &[u8] = b"HTTP/1.1 204 No Content\r\n\r\n"; fn length_head(content_length: usize) -> Vec { format!( "HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {}\r\n\r\n", content_length ) .into_bytes() } fn main() { jive::block_on(async move { let mut listener = match Async::::bind(([127, 0, 0, 1], 3456)).await { Ok(listener) => listener, Err(_) => return, }; println!("Listening on port 3456"); loop { let (mut stream, _addr) = match listener.accept().await { Ok(tup) => tup, Err(_) => return, }; jive::spawn(async move { println!("Accepted stream"); let mut req_bytes = vec![0u8; 512]; let mut headers = [httparse::Header { name: "", value: &[], }; 16]; let mut req = httparse::Request::new(&mut headers); let mut ring_buf = ReadWriteBuf::<1024>::new(); let mut interests = Readiness::read(); let mut total_read: usize = 0; let mut body_start = 'l1: loop { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { return Ok(()); } match stream.read_nonblocking(&mut req_bytes[total_read..])? { Nonblocking::Ready(n) if n > 0 => { total_read += n; match req.parse(&req_bytes[0..n]) { Ok(httparse::Status::Complete(body_start)) => break 'l1 body_start, Ok(httparse::Status::Partial) => { if total_read == req_bytes.len() { req_bytes.extend([0u8; 512]); } } Err(e) => { println!("Error parsing request: {}", e); return Ok(()); } } } Nonblocking::Ready(_) => return Ok(()), Nonblocking::WouldBlock => continue, } return Ok(()); }; println!("Parsed request {:?}", req); let mut head_written = 0; interests = Readiness::write(); if let Some(method) = req.method { if method == "GET" || method == "DELETE" { while head_written < EMPTY_HEAD.len() { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { return Ok(()); } match stream.write_nonblocking(&EMPTY_HEAD[head_written..])? { Nonblocking::Ready(n) => head_written += n, Nonblocking::WouldBlock => {} } } return Ok(()); } } else { return Ok(()); } let head = if let Some(header) = req .headers .iter() .find(|header| header.name == "Content-Length") { if let Ok(length) = String::from_utf8_lossy(header.value).parse() { length_head(length) } else { return Ok(()); } } else { CHUNKED_HEAD.to_vec() }; while head_written < head.len() { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { return Ok(()); } match stream.write_nonblocking(&head[head_written..])? { Nonblocking::Ready(n) => head_written += n, Nonblocking::WouldBlock => {} } } println!("Wrote response head"); while body_start < total_read { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { return Ok(()); } match stream.write_nonblocking(&req_bytes[body_start..])? { Nonblocking::Ready(n) => body_start += n, Nonblocking::WouldBlock => {} } } println!("Wrote start of body"); interests = Readiness::read(); 'l2: loop { let readiness = stream.ready(interests).await?; if readiness.is_hangup() { break; } if readiness.is_read() { while let Some(buf) = ring_buf.for_reading() { match stream.read_nonblocking(buf)? { Nonblocking::Ready(n) if n > 0 => { let should_break = n < buf.len(); ring_buf.advance_read(n); if should_break { break; } } Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2, Nonblocking::Ready(_) | Nonblocking::WouldBlock => break, } } if !ring_buf.is_empty() { interests = Readiness::read() | Readiness::write(); } } if readiness.is_write() { while let Some(buf) = ring_buf.for_writing() { match stream.write_nonblocking(buf)? { Nonblocking::Ready(n) => { ring_buf.advance_write(n); if ring_buf.is_empty() { interests = Readiness::read(); } } Nonblocking::WouldBlock => break, } } } } println!("Stream closed"); Ok(()) as jive::io::Result<()> }); } }) }