From 930bf80b21429caa51e5096c38292d2aff197afb Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Wed, 16 Feb 2022 18:32:02 -0500 Subject: [PATCH] Initial combined runtime --- .gitignore | 2 ++ Cargo.toml | 13 ++++++++ examples/echo.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 examples/echo.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..8bc77e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "jive" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" } +jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" } + +[dev-dependencies] +read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" } diff --git a/examples/echo.rs b/examples/echo.rs new file mode 100644 index 0000000..961b3aa --- /dev/null +++ b/examples/echo.rs @@ -0,0 +1,83 @@ +use jive::io::{Async, Nonblocking, ReadBytes, Readiness}; +use read_write_buf::ReadWriteBuf; +use std::time::Duration; + +fn main() -> Result<(), jive::task::JoinError> { + jive::block_on(async move { + jive::spawn(async move { + let listener = match Async::bind(([127, 0, 0, 1], 3456)).await { + Ok(listener) => listener, + Err(_) => return, + }; + + println!("Listening on port 3456"); + + loop { + let (stream, _addr) = match listener.accept().await { + Ok(tup) => tup, + Err(_) => return, + }; + + jive::spawn(async move { + println!("Accepted stream"); + + let mut ring_buf = ReadWriteBuf::<10>::new(); + let mut interests = Readiness::read(); + + 'l2: loop { + let readiness = stream.ready(interests).await?; + + if readiness.is_hangup() { + break; + } + + if readiness.is_read() { + while let Some(buf) = ring_buf.for_reading() { + match stream.read_nonblocking(buf)? { + Nonblocking::Ready(ReadBytes::Read(n)) => { + let n: usize = n.into(); + let should_break = n < buf.len(); + ring_buf.advance_read(n); + if should_break { + break; + } + } + Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => { + break 'l2 + } + Nonblocking::Ready(ReadBytes::EOF) + | Nonblocking::WouldBlock => break, + } + } + + if !ring_buf.is_empty() { + interests = Readiness::read() | Readiness::write(); + } + } + + if readiness.is_write() { + while let Some(buf) = ring_buf.for_writing() { + match stream.write_nonblocking(buf)? { + Nonblocking::Ready(n) => { + ring_buf.advance_write(n); + if ring_buf.is_empty() { + interests = Readiness::read(); + } + } + Nonblocking::WouldBlock => break, + } + } + } + } + + println!("Stream closed"); + + Ok(()) as jive::io::Result<()> + }); + } + }); + + jive::time::sleep(Duration::from_secs(60 * 2)).await; + println!("Stopping"); + }) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..992245c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,80 @@ +#![feature(once_cell)] + +use jitterbug::Executor; +use std::{ + future::{pending, Future}, + lazy::SyncOnceCell, +}; + +pub use foxtrot::{io, net, time}; + +pub mod sync { + pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender}; +} + +pub mod task { + pub use jitterbug::{JoinError, JoinHandle}; + use std::future::Future; + + pub fn spawn( + future: impl Future + Send + 'static, + ) -> JoinHandle { + super::Runtime::get_or_init().executor.spawn(future) + } + + pub fn spawn_blocking( + callback: impl FnOnce() -> T + Send + 'static, + ) -> JoinHandle { + super::Runtime::get_or_init() + .blocking + .spawn(async move { (callback)() }) + } +} + +pub use task::{spawn, spawn_blocking}; + +pub fn block_on( + future: impl Future + Send + 'static, +) -> Result { + foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap() +} + +static RUNTIME: SyncOnceCell = SyncOnceCell::new(); + +struct Runtime { + executor: Executor, + blocking: Executor, +} + +impl Runtime { + fn get_or_init() -> &'static Self { + RUNTIME.get_or_init(Self::new) + } + + fn new() -> Self { + let executor = Executor::new(); + let blocking = Executor::new(); + + let base_threads = std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1); + + let blocking_threads = base_threads * 5; + + for _ in 0..base_threads { + let executor = executor.clone(); + + std::thread::spawn(move || { + let _ = foxtrot::block_on(executor.into_runner()); + }); + } + + for _ in 0..blocking_threads { + let blocking = blocking.clone(); + + std::thread::spawn(move || blocking.block_on(pending::<()>())); + } + + Runtime { executor, blocking } + } +}