Initial combined runtime

This commit is contained in:
Aode (lion) 2022-02-16 18:32:02 -05:00
commit 930bf80b21
4 changed files with 178 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

13
Cargo.toml Normal file
View file

@ -0,0 +1,13 @@
[package]
name = "jive"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" }
[dev-dependencies]
read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" }

83
examples/echo.rs Normal file
View file

@ -0,0 +1,83 @@
use jive::io::{Async, Nonblocking, ReadBytes, Readiness};
use read_write_buf::ReadWriteBuf;
use std::time::Duration;
fn main() -> Result<(), jive::task::JoinError> {
jive::block_on(async move {
jive::spawn(async move {
let listener = match Async::bind(([127, 0, 0, 1], 3456)).await {
Ok(listener) => listener,
Err(_) => return,
};
println!("Listening on port 3456");
loop {
let (stream, _addr) = match listener.accept().await {
Ok(tup) => tup,
Err(_) => return,
};
jive::spawn(async move {
println!("Accepted stream");
let mut ring_buf = ReadWriteBuf::<10>::new();
let mut interests = Readiness::read();
'l2: loop {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
break;
}
if readiness.is_read() {
while let Some(buf) = ring_buf.for_reading() {
match stream.read_nonblocking(buf)? {
Nonblocking::Ready(ReadBytes::Read(n)) => {
let n: usize = n.into();
let should_break = n < buf.len();
ring_buf.advance_read(n);
if should_break {
break;
}
}
Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => {
break 'l2
}
Nonblocking::Ready(ReadBytes::EOF)
| Nonblocking::WouldBlock => break,
}
}
if !ring_buf.is_empty() {
interests = Readiness::read() | Readiness::write();
}
}
if readiness.is_write() {
while let Some(buf) = ring_buf.for_writing() {
match stream.write_nonblocking(buf)? {
Nonblocking::Ready(n) => {
ring_buf.advance_write(n);
if ring_buf.is_empty() {
interests = Readiness::read();
}
}
Nonblocking::WouldBlock => break,
}
}
}
}
println!("Stream closed");
Ok(()) as jive::io::Result<()>
});
}
});
jive::time::sleep(Duration::from_secs(60 * 2)).await;
println!("Stopping");
})
}

80
src/lib.rs Normal file
View file

@ -0,0 +1,80 @@
#![feature(once_cell)]
use jitterbug::Executor;
use std::{
future::{pending, Future},
lazy::SyncOnceCell,
};
pub use foxtrot::{io, net, time};
pub mod sync {
pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender};
}
pub mod task {
pub use jitterbug::{JoinError, JoinHandle};
use std::future::Future;
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init().executor.spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init()
.blocking
.spawn(async move { (callback)() })
}
}
pub use task::{spawn, spawn_blocking};
pub fn block_on<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> Result<T, jitterbug::JoinError> {
foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap()
}
static RUNTIME: SyncOnceCell<Runtime> = SyncOnceCell::new();
struct Runtime {
executor: Executor,
blocking: Executor,
}
impl Runtime {
fn get_or_init() -> &'static Self {
RUNTIME.get_or_init(Self::new)
}
fn new() -> Self {
let executor = Executor::new();
let blocking = Executor::new();
let base_threads = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let blocking_threads = base_threads * 5;
for _ in 0..base_threads {
let executor = executor.clone();
std::thread::spawn(move || {
let _ = foxtrot::block_on(executor.into_runner());
});
}
for _ in 0..blocking_threads {
let blocking = blocking.clone();
std::thread::spawn(move || blocking.block_on(pending::<()>()));
}
Runtime { executor, blocking }
}
}