Compare commits

..

No commits in common. "main" and "5536aaf1c26173f135ef939c0f8e6ff9de096abe" have entirely different histories.

11 changed files with 91 additions and 646 deletions

2
.gitignore vendored
View file

@ -1,4 +1,2 @@
/target
/.direnv
/.envrc
Cargo.lock

View file

@ -3,19 +3,11 @@ name = "jive"
version = "0.1.0"
edition = "2021"
[features]
default = []
tokio-io-compat = ["foxtrot/tokio-compat"]
futures-io-compat = ["foxtrot/futures-compat"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bachata = { git = "https://git.asonix.dog/safe-async/bachata" }
foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" }
select = { git = "https://git.asonix.dog/safe-async/select" }
[dev-dependencies]
read-write-buf = { git = "https://git.asonix.dog/safe-async/read-write-buf" }
httparse = "1.6.0"
read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" }

View file

@ -1,6 +1,6 @@
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fn main() -> Result<(), jive::task::JoinError> {
jive::block_on(async move {
println!("hewwo");
@ -18,5 +18,5 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
Ok(())
})
})?
}

View file

@ -1,14 +1,11 @@
use jive::{
io::{Async, Nonblocking, Readiness},
net::TcpListener,
};
use jive::io::{Async, Nonblocking, ReadBytes, Readiness};
use read_write_buf::ReadWriteBuf;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fn main() -> Result<(), jive::task::JoinError> {
jive::block_on(async move {
jive::spawn(async move {
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {
let listener = match Async::bind(([127, 0, 0, 1], 3456)).await {
Ok(listener) => listener,
Err(_) => return,
};
@ -16,7 +13,7 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Listening on port 3456");
loop {
let (mut stream, _addr) = match listener.accept().await {
let (stream, _addr) = match listener.accept().await {
Ok(tup) => tup,
Err(_) => return,
};
@ -37,15 +34,19 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if readiness.is_read() {
while let Some(buf) = ring_buf.for_reading() {
match stream.read_nonblocking(buf)? {
Nonblocking::Ready(n) if n > 0 => {
Nonblocking::Ready(ReadBytes::Read(n)) => {
let n: usize = n.into();
let should_break = n < buf.len();
ring_buf.advance_read(n);
if should_break {
break;
}
}
Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2,
Nonblocking::Ready(_) | Nonblocking::WouldBlock => break,
Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => {
break 'l2
}
Nonblocking::Ready(ReadBytes::EOF)
| Nonblocking::WouldBlock => break,
}
}
@ -78,6 +79,5 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
jive::time::sleep(Duration::from_secs(60 * 2)).await;
println!("Stopping");
Ok(())
})
}

View file

@ -1,195 +0,0 @@
use jive::{
io::{Async, Nonblocking, Readiness},
net::TcpListener,
};
use read_write_buf::ReadWriteBuf;
const CHUNKED_HEAD: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nTransfer-Encoding: chunked\r\n\r\n";
const EMPTY_HEAD: &[u8] = b"HTTP/1.1 204 No Content\r\n\r\n";
fn length_head(content_length: usize) -> Vec<u8> {
format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {}\r\n\r\n",
content_length
)
.into_bytes()
}
fn main() {
jive::block_on(async move {
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {
Ok(listener) => listener,
Err(_) => return,
};
println!("Listening on port 3456");
loop {
let (mut stream, _addr) = match listener.accept().await {
Ok(tup) => tup,
Err(_) => return,
};
jive::spawn(async move {
println!("Accepted stream");
let mut req_bytes = vec![0u8; 512];
let mut headers = [httparse::Header {
name: "",
value: &[],
}; 16];
let mut req = httparse::Request::new(&mut headers);
let mut ring_buf = ReadWriteBuf::<1024>::new();
let mut interests = Readiness::read();
let mut total_read: usize = 0;
let mut body_start = 'l1: loop {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
return Ok(());
}
match stream.read_nonblocking(&mut req_bytes[total_read..])? {
Nonblocking::Ready(n) if n > 0 => {
total_read += n;
match req.parse(&req_bytes[0..n]) {
Ok(httparse::Status::Complete(body_start)) => break 'l1 body_start,
Ok(httparse::Status::Partial) => {
if total_read == req_bytes.len() {
req_bytes.extend([0u8; 512]);
}
}
Err(e) => {
println!("Error parsing request: {}", e);
return Ok(());
}
}
}
Nonblocking::Ready(_) => return Ok(()),
Nonblocking::WouldBlock => continue,
}
return Ok(());
};
println!("Parsed request {:?}", req);
let mut head_written = 0;
interests = Readiness::write();
if let Some(method) = req.method {
if method == "GET" || method == "DELETE" {
while head_written < EMPTY_HEAD.len() {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
return Ok(());
}
match stream.write_nonblocking(&EMPTY_HEAD[head_written..])? {
Nonblocking::Ready(n) => head_written += n,
Nonblocking::WouldBlock => {}
}
}
return Ok(());
}
} else {
return Ok(());
}
let head = if let Some(header) = req
.headers
.iter()
.find(|header| header.name == "Content-Length")
{
if let Ok(length) = String::from_utf8_lossy(header.value).parse() {
length_head(length)
} else {
return Ok(());
}
} else {
CHUNKED_HEAD.to_vec()
};
while head_written < head.len() {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
return Ok(());
}
match stream.write_nonblocking(&head[head_written..])? {
Nonblocking::Ready(n) => head_written += n,
Nonblocking::WouldBlock => {}
}
}
println!("Wrote response head");
while body_start < total_read {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
return Ok(());
}
match stream.write_nonblocking(&req_bytes[body_start..])? {
Nonblocking::Ready(n) => body_start += n,
Nonblocking::WouldBlock => {}
}
}
println!("Wrote start of body");
interests = Readiness::read();
'l2: loop {
let readiness = stream.ready(interests).await?;
if readiness.is_hangup() {
break;
}
if readiness.is_read() {
while let Some(buf) = ring_buf.for_reading() {
match stream.read_nonblocking(buf)? {
Nonblocking::Ready(n) if n > 0 => {
let should_break = n < buf.len();
ring_buf.advance_read(n);
if should_break {
break;
}
}
Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2,
Nonblocking::Ready(_) | Nonblocking::WouldBlock => break,
}
}
if !ring_buf.is_empty() {
interests = Readiness::read() | Readiness::write();
}
}
if readiness.is_write() {
while let Some(buf) = ring_buf.for_writing() {
match stream.write_nonblocking(buf)? {
Nonblocking::Ready(n) => {
ring_buf.advance_write(n);
if ring_buf.is_empty() {
interests = Readiness::read();
}
}
Nonblocking::WouldBlock => break,
}
}
}
}
println!("Stream closed");
Ok(()) as jive::io::Result<()>
});
}
})
}

View file

@ -1,38 +1,22 @@
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fn main() -> Result<(), jive::task::JoinError> {
jive::block_on(async {
println!("hewwo");
let handles = (0..10)
let handles = (1..=50)
.map(|i| {
jive::spawn(async move {
let handles = (1..=10)
.map(|j| {
jive::spawn_local(async move {
jive::time::sleep(Duration::from_secs(2)).await;
println!("{} slept", i * 10 + j);
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.await?;
}
println!("{i} joined");
Ok(()) as Result<_, jive::task::sync::JoinError>
jive::time::sleep(Duration::from_secs(2)).await;
println!("{} slept", i);
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.await??;
handle.await?;
}
println!("all joined");
Ok(())
})
})?
}

View file

@ -1,61 +0,0 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1692799911,
"narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1692734709,
"narHash": "sha256-SCFnyHCyYjwEmgUsHDDuU0TsbVMKeU1vwkR+r7uS2Rg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b85ed9dcbf187b909ef7964774f8847d554fab3b",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,25 +0,0 @@
{
description = "jive";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
packages.default = pkgs.hello;
devShell = with pkgs; mkShell {
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
};
});
}

View file

@ -1,14 +1,80 @@
#![feature(once_cell)]
use jitterbug::Executor;
use std::{
future::{pending, Future},
lazy::SyncOnceCell,
};
pub use foxtrot::{io, net, time};
pub mod sync {
pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender};
}
pub mod runtime;
pub mod task;
pub mod task {
pub use jitterbug::{JoinError, JoinHandle};
use std::future::Future;
pub use task::{spawn, spawn_blocking, spawn_local, spawn_local_unsend};
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init().executor.spawn(future)
}
pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T {
runtime::Runtime::new().block_on(future)
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init()
.blocking
.spawn(async move { (callback)() })
}
}
pub use task::{spawn, spawn_blocking};
pub fn block_on<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> Result<T, jitterbug::JoinError> {
foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap()
}
static RUNTIME: SyncOnceCell<Runtime> = SyncOnceCell::new();
struct Runtime {
executor: Executor,
blocking: Executor,
}
impl Runtime {
fn get_or_init() -> &'static Self {
RUNTIME.get_or_init(Self::new)
}
fn new() -> Self {
let executor = Executor::new();
let blocking = Executor::new();
let base_threads = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let blocking_threads = base_threads * 5;
for _ in 0..base_threads {
let executor = executor.clone();
std::thread::spawn(move || {
let _ = foxtrot::block_on(executor.into_runner());
});
}
for _ in 0..blocking_threads {
let blocking = blocking.clone();
std::thread::spawn(move || blocking.block_on(pending::<()>()));
}
Runtime { executor, blocking }
}
}

View file

@ -1,282 +0,0 @@
use jitterbug::Executor;
use std::{
cell::RefCell,
future::{pending, Future},
};
thread_local! {
static RUNTIME: RefCell<Option<RuntimeState>> = RefCell::new(None);
}
struct RuntimeState {
executor: Executor,
blocking: Executor,
}
pub struct Runtime {
runtime_handle: RuntimeHandle,
thread_handles: Vec<std::thread::JoinHandle<()>>,
}
#[derive(Clone)]
pub struct RuntimeHandle {
executor: Executor,
blocking: Executor,
}
pub struct RuntimeBuilder {
executor_count: usize,
blocking_count: usize,
}
struct RuntimeToken;
impl RuntimeBuilder {
pub fn new() -> Self {
let executor_count = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
Self {
executor_count,
blocking_count: executor_count * 5,
}
}
pub fn executor_count(mut self, count: usize) -> Self {
self.executor_count = count;
self
}
pub fn blocking_count(mut self, count: usize) -> Self {
self.blocking_count = count;
self
}
pub fn build(self) -> Runtime {
RuntimeState::create(self)
}
}
impl RuntimeHandle {
pub fn current() -> Self {
RUNTIME.with(|runtime| {
let runtime = runtime.borrow();
let runtime = runtime
.as_ref()
.expect("Must be called from within a Jive context");
RuntimeHandle {
executor: runtime.executor.clone(),
blocking: runtime.blocking.clone(),
}
})
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.executor.spawn(future)
}
pub fn spawn_local_unsend<T>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
bachata::spawn(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
let (tx, rx) = jitterbug::oneshot();
let (cancel_tx, cancel_rx) = jitterbug::oneshot::<()>();
bachata::spawn(async move {
let future = std::pin::pin!(future);
match select::select(cancel_rx, future).await {
select::Either::Left(_) => (), // canceled
select::Either::Right(item) => {
let _ = tx.send(item);
}
}
});
self.spawn(async move {
let out = rx.await.unwrap();
drop(cancel_tx);
out
})
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.blocking.spawn(async move { (callback)() })
}
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
let token = RuntimeState::install(&self.executor, &self.blocking);
let res = foxtrot::block_on(bachata::run_with(future)).unwrap();
drop(token);
res
}
pub fn stop(&self) {
self.executor.stop();
self.blocking.stop();
}
}
impl RuntimeState {
fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken {
RUNTIME.with(|runtime| {
let prev = runtime.borrow_mut().replace(RuntimeState {
executor: executor.clone(),
blocking: blocking.clone(),
});
if prev.is_some() {
panic!("Runtime already present on this thread");
}
});
RuntimeToken
}
fn uninstall() -> Option<RuntimeState> {
RUNTIME.with(|runtime| runtime.borrow_mut().take())
}
fn create(builder: RuntimeBuilder) -> Runtime {
let executor = Executor::new();
let blocking = Executor::new();
let mut thread_handles = Vec::new();
for _ in 0..builder.executor_count {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(
std::thread::Builder::new()
.name("jive:executor".into())
.spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = foxtrot::block_on(select::select(
bachata::run_with_cooperative(pending::<()>()),
executor.into_runner_cooperative(),
))
.unwrap();
drop(token);
})
.unwrap(),
);
}
for _ in 0..builder.blocking_count {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(
std::thread::Builder::new()
.name("jive:blocking".into())
.spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = blocking.block_on(pending::<()>());
drop(token);
})
.unwrap(),
);
}
Runtime {
runtime_handle: RuntimeHandle { executor, blocking },
thread_handles,
}
}
}
impl Runtime {
pub fn new() -> Self {
RuntimeState::create(RuntimeBuilder::new())
}
pub fn handle(&self) -> &RuntimeHandle {
&self.runtime_handle
}
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.handle().block_on(future)
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn(future)
}
pub fn spawn_local_unsend<T: 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
self.handle().spawn_local_unsend(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_local(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_blocking(callback)
}
pub fn stop(&self) {
self.handle().stop();
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
RuntimeState::uninstall();
self.stop();
for handle in self.thread_handles.drain(..) {
handle.join().expect("Joined jive thread");
}
}
}
impl Drop for RuntimeToken {
fn drop(&mut self) {
RuntimeState::uninstall();
}
}

View file

@ -1,32 +0,0 @@
use crate::runtime::RuntimeHandle;
use std::future::Future;
pub mod sync {
pub use jitterbug::{AbortHandle, JoinError, JoinHandle};
}
pub mod unsync {
pub use bachata::{JoinError, JoinHandle};
}
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn(future)
}
pub fn spawn_local_unsend<T>(future: impl Future<Output = T> + 'static) -> unsync::JoinHandle<T> {
RuntimeHandle::current().spawn_local_unsend(future)
}
pub fn spawn_local<T: Send + 'static>(
future: impl Future<Output = T> + 'static,
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn_local(future)
}
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn_blocking(callback)
}