From 722c2d0f4c9e3210311a05f54d3d52b83f0a48b2 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Wed, 16 Feb 2022 21:21:32 -0500 Subject: [PATCH] Add a kinda ok http server --- Cargo.toml | 1 + examples/kinda_ok_http.rs | 198 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 examples/kinda_ok_http.rs diff --git a/Cargo.toml b/Cargo.toml index 8bc77e4..a91d1b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" } [dev-dependencies] read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" } +httparse = "1.6.0" diff --git a/examples/kinda_ok_http.rs b/examples/kinda_ok_http.rs new file mode 100644 index 0000000..87a29e2 --- /dev/null +++ b/examples/kinda_ok_http.rs @@ -0,0 +1,198 @@ +use jive::io::{Async, Nonblocking, ReadBytes, Readiness}; +use read_write_buf::ReadWriteBuf; + +const CHUNKED_HEAD: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nTransfer-Encoding: chunked\r\n\r\n"; +const EMPTY_HEAD: &[u8] = b"HTTP/1.1 204 No Content\r\n\r\n"; + +fn length_head(content_length: usize) -> Vec { + format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {}\r\n\r\n", + content_length + ) + .into_bytes() +} + +fn main() -> Result<(), jive::task::JoinError> { + jive::block_on(async move { + let listener = match Async::bind(([127, 0, 0, 1], 3456)).await { + Ok(listener) => listener, + Err(_) => return, + }; + + println!("Listening on port 3456"); + + loop { + let (stream, _addr) = match listener.accept().await { + Ok(tup) => tup, + Err(_) => return, + }; + + jive::spawn(async move { + println!("Accepted stream"); + + let mut req_bytes = vec![0u8; 512]; + let mut headers = [httparse::Header { + name: "", + value: &[], + }; 16]; + let mut req = httparse::Request::new(&mut headers); + let mut ring_buf = ReadWriteBuf::<1024>::new(); + let mut interests = Readiness::read(); + let mut total_read: usize = 0; + + let mut body_start = 'l1: loop { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + return Ok(()); + } + + match stream.read_nonblocking(&mut req_bytes[total_read..])? { + Nonblocking::Ready(ReadBytes::Read(n)) => { + let n = usize::from(n); + total_read += n; + match req.parse(&req_bytes[0..n]) { + Ok(httparse::Status::Complete(body_start)) => break 'l1 body_start, + Ok(httparse::Status::Partial) => { + if total_read == req_bytes.len() { + req_bytes.extend([0u8; 512]); + } + } + Err(e) => { + println!("Error parsing request: {}", e); + return Ok(()); + } + } + } + Nonblocking::Ready(ReadBytes::EOF) => return Ok(()), + Nonblocking::WouldBlock => continue, + } + + return Ok(()); + }; + + println!("Parsed request {:?}", req); + + let mut head_written = 0; + interests = Readiness::write(); + + if let Some(method) = req.method { + if method == "GET" || method == "DELETE" { + while head_written < EMPTY_HEAD.len() { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + return Ok(()); + } + + match stream.write_nonblocking(&EMPTY_HEAD[head_written..])? { + Nonblocking::Ready(n) => head_written += n, + Nonblocking::WouldBlock => {} + } + } + + return Ok(()); + } + } else { + return Ok(()); + } + + let head = if let Some(header) = req + .headers + .iter() + .find(|header| header.name == "Content-Length") + { + if let Ok(length) = String::from_utf8_lossy(header.value).parse() { + length_head(length) + } else { + return Ok(()); + } + } else { + CHUNKED_HEAD.to_vec() + }; + + while head_written < head.len() { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + return Ok(()); + } + + match stream.write_nonblocking(&head[head_written..])? { + Nonblocking::Ready(n) => head_written += n, + Nonblocking::WouldBlock => {} + } + } + + println!("Wrote response head"); + + while body_start < total_read { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + return Ok(()); + } + + match stream.write_nonblocking(&req_bytes[body_start..])? { + Nonblocking::Ready(n) => body_start += n, + Nonblocking::WouldBlock => {} + } + } + + println!("Wrote start of body"); + + interests = Readiness::read(); + 'l2: loop { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + break; + } + + if readiness.is_read() { + while let Some(buf) = ring_buf.for_reading() { + match stream.read_nonblocking(buf)? { + Nonblocking::Ready(ReadBytes::Read(n)) => { + let n: usize = n.into(); + let should_break = n < buf.len(); + ring_buf.advance_read(n); + if should_break { + break; + } + } + Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => { + break 'l2 + } + Nonblocking::Ready(ReadBytes::EOF) | Nonblocking::WouldBlock => { + break + } + } + } + + if !ring_buf.is_empty() { + interests = Readiness::read() | Readiness::write(); + } + } + + if readiness.is_write() { + while let Some(buf) = ring_buf.for_writing() { + match stream.write_nonblocking(buf)? { + Nonblocking::Ready(n) => { + ring_buf.advance_write(n); + if ring_buf.is_empty() { + interests = Readiness::read(); + } + } + Nonblocking::WouldBlock => break, + } + } + } + } + + println!("Stream closed"); + + Ok(()) as jive::io::Result<()> + }); + } + }) +}