Compare commits
No commits in common. "main" and "5536aaf1c26173f135ef939c0f8e6ff9de096abe" have entirely different histories.
main
...
5536aaf1c2
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,4 +1,2 @@
|
|||
/target
|
||||
/.direnv
|
||||
/.envrc
|
||||
Cargo.lock
|
||||
|
|
10
Cargo.toml
10
Cargo.toml
|
@ -3,19 +3,11 @@ name = "jive"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
tokio-io-compat = ["foxtrot/tokio-compat"]
|
||||
futures-io-compat = ["foxtrot/futures-compat"]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bachata = { git = "https://git.asonix.dog/safe-async/bachata" }
|
||||
foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
|
||||
jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" }
|
||||
select = { git = "https://git.asonix.dog/safe-async/select" }
|
||||
|
||||
[dev-dependencies]
|
||||
read-write-buf = { git = "https://git.asonix.dog/safe-async/read-write-buf" }
|
||||
httparse = "1.6.0"
|
||||
read-write-buf = { git = "https://git.asonix.dog/asonix/read-write-buf" }
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::time::Duration;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
fn main() -> Result<(), jive::task::JoinError> {
|
||||
jive::block_on(async move {
|
||||
println!("hewwo");
|
||||
|
||||
|
@ -18,5 +18,5 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})?
|
||||
}
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
use jive::{
|
||||
io::{Async, Nonblocking, Readiness},
|
||||
net::TcpListener,
|
||||
};
|
||||
use jive::io::{Async, Nonblocking, ReadBytes, Readiness};
|
||||
use read_write_buf::ReadWriteBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
fn main() -> Result<(), jive::task::JoinError> {
|
||||
jive::block_on(async move {
|
||||
jive::spawn(async move {
|
||||
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {
|
||||
let listener = match Async::bind(([127, 0, 0, 1], 3456)).await {
|
||||
Ok(listener) => listener,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
@ -16,7 +13,7 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
println!("Listening on port 3456");
|
||||
|
||||
loop {
|
||||
let (mut stream, _addr) = match listener.accept().await {
|
||||
let (stream, _addr) = match listener.accept().await {
|
||||
Ok(tup) => tup,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
@ -37,15 +34,19 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
if readiness.is_read() {
|
||||
while let Some(buf) = ring_buf.for_reading() {
|
||||
match stream.read_nonblocking(buf)? {
|
||||
Nonblocking::Ready(n) if n > 0 => {
|
||||
Nonblocking::Ready(ReadBytes::Read(n)) => {
|
||||
let n: usize = n.into();
|
||||
let should_break = n < buf.len();
|
||||
ring_buf.advance_read(n);
|
||||
if should_break {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2,
|
||||
Nonblocking::Ready(_) | Nonblocking::WouldBlock => break,
|
||||
Nonblocking::Ready(ReadBytes::EOF) if ring_buf.is_empty() => {
|
||||
break 'l2
|
||||
}
|
||||
Nonblocking::Ready(ReadBytes::EOF)
|
||||
| Nonblocking::WouldBlock => break,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,6 +79,5 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
|
||||
jive::time::sleep(Duration::from_secs(60 * 2)).await;
|
||||
println!("Stopping");
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,195 +0,0 @@
|
|||
use jive::{
|
||||
io::{Async, Nonblocking, Readiness},
|
||||
net::TcpListener,
|
||||
};
|
||||
use read_write_buf::ReadWriteBuf;
|
||||
|
||||
const CHUNKED_HEAD: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nTransfer-Encoding: chunked\r\n\r\n";
|
||||
const EMPTY_HEAD: &[u8] = b"HTTP/1.1 204 No Content\r\n\r\n";
|
||||
|
||||
fn length_head(content_length: usize) -> Vec<u8> {
|
||||
format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: {}\r\n\r\n",
|
||||
content_length
|
||||
)
|
||||
.into_bytes()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
jive::block_on(async move {
|
||||
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {
|
||||
Ok(listener) => listener,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
println!("Listening on port 3456");
|
||||
|
||||
loop {
|
||||
let (mut stream, _addr) = match listener.accept().await {
|
||||
Ok(tup) => tup,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
jive::spawn(async move {
|
||||
println!("Accepted stream");
|
||||
|
||||
let mut req_bytes = vec![0u8; 512];
|
||||
let mut headers = [httparse::Header {
|
||||
name: "",
|
||||
value: &[],
|
||||
}; 16];
|
||||
let mut req = httparse::Request::new(&mut headers);
|
||||
let mut ring_buf = ReadWriteBuf::<1024>::new();
|
||||
let mut interests = Readiness::read();
|
||||
let mut total_read: usize = 0;
|
||||
|
||||
let mut body_start = 'l1: loop {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
||||
if readiness.is_hangup() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match stream.read_nonblocking(&mut req_bytes[total_read..])? {
|
||||
Nonblocking::Ready(n) if n > 0 => {
|
||||
total_read += n;
|
||||
match req.parse(&req_bytes[0..n]) {
|
||||
Ok(httparse::Status::Complete(body_start)) => break 'l1 body_start,
|
||||
Ok(httparse::Status::Partial) => {
|
||||
if total_read == req_bytes.len() {
|
||||
req_bytes.extend([0u8; 512]);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error parsing request: {}", e);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
Nonblocking::Ready(_) => return Ok(()),
|
||||
Nonblocking::WouldBlock => continue,
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
println!("Parsed request {:?}", req);
|
||||
|
||||
let mut head_written = 0;
|
||||
interests = Readiness::write();
|
||||
|
||||
if let Some(method) = req.method {
|
||||
if method == "GET" || method == "DELETE" {
|
||||
while head_written < EMPTY_HEAD.len() {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
||||
if readiness.is_hangup() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match stream.write_nonblocking(&EMPTY_HEAD[head_written..])? {
|
||||
Nonblocking::Ready(n) => head_written += n,
|
||||
Nonblocking::WouldBlock => {}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let head = if let Some(header) = req
|
||||
.headers
|
||||
.iter()
|
||||
.find(|header| header.name == "Content-Length")
|
||||
{
|
||||
if let Ok(length) = String::from_utf8_lossy(header.value).parse() {
|
||||
length_head(length)
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
CHUNKED_HEAD.to_vec()
|
||||
};
|
||||
|
||||
while head_written < head.len() {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
||||
if readiness.is_hangup() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match stream.write_nonblocking(&head[head_written..])? {
|
||||
Nonblocking::Ready(n) => head_written += n,
|
||||
Nonblocking::WouldBlock => {}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Wrote response head");
|
||||
|
||||
while body_start < total_read {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
||||
if readiness.is_hangup() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match stream.write_nonblocking(&req_bytes[body_start..])? {
|
||||
Nonblocking::Ready(n) => body_start += n,
|
||||
Nonblocking::WouldBlock => {}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Wrote start of body");
|
||||
|
||||
interests = Readiness::read();
|
||||
'l2: loop {
|
||||
let readiness = stream.ready(interests).await?;
|
||||
|
||||
if readiness.is_hangup() {
|
||||
break;
|
||||
}
|
||||
|
||||
if readiness.is_read() {
|
||||
while let Some(buf) = ring_buf.for_reading() {
|
||||
match stream.read_nonblocking(buf)? {
|
||||
Nonblocking::Ready(n) if n > 0 => {
|
||||
let should_break = n < buf.len();
|
||||
ring_buf.advance_read(n);
|
||||
if should_break {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Nonblocking::Ready(_) if ring_buf.is_empty() => break 'l2,
|
||||
Nonblocking::Ready(_) | Nonblocking::WouldBlock => break,
|
||||
}
|
||||
}
|
||||
|
||||
if !ring_buf.is_empty() {
|
||||
interests = Readiness::read() | Readiness::write();
|
||||
}
|
||||
}
|
||||
|
||||
if readiness.is_write() {
|
||||
while let Some(buf) = ring_buf.for_writing() {
|
||||
match stream.write_nonblocking(buf)? {
|
||||
Nonblocking::Ready(n) => {
|
||||
ring_buf.advance_write(n);
|
||||
if ring_buf.is_empty() {
|
||||
interests = Readiness::read();
|
||||
}
|
||||
}
|
||||
Nonblocking::WouldBlock => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Stream closed");
|
||||
|
||||
Ok(()) as jive::io::Result<()>
|
||||
});
|
||||
}
|
||||
})
|
||||
}
|
|
@ -1,38 +1,22 @@
|
|||
use std::time::Duration;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
fn main() -> Result<(), jive::task::JoinError> {
|
||||
jive::block_on(async {
|
||||
println!("hewwo");
|
||||
|
||||
let handles = (0..10)
|
||||
let handles = (1..=50)
|
||||
.map(|i| {
|
||||
jive::spawn(async move {
|
||||
let handles = (1..=10)
|
||||
.map(|j| {
|
||||
jive::spawn_local(async move {
|
||||
jive::time::sleep(Duration::from_secs(2)).await;
|
||||
println!("{} slept", i * 10 + j);
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for handle in handles {
|
||||
handle.await?;
|
||||
}
|
||||
|
||||
println!("{i} joined");
|
||||
|
||||
Ok(()) as Result<_, jive::task::sync::JoinError>
|
||||
jive::time::sleep(Duration::from_secs(2)).await;
|
||||
println!("{} slept", i);
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for handle in handles {
|
||||
handle.await??;
|
||||
handle.await?;
|
||||
}
|
||||
|
||||
println!("all joined");
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})?
|
||||
}
|
||||
|
|
61
flake.lock
61
flake.lock
|
@ -1,61 +0,0 @@
|
|||
{
|
||||
"nodes": {
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1692799911,
|
||||
"narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1692734709,
|
||||
"narHash": "sha256-SCFnyHCyYjwEmgUsHDDuU0TsbVMKeU1vwkR+r7uS2Rg=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "b85ed9dcbf187b909ef7964774f8847d554fab3b",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixos-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
25
flake.nix
25
flake.nix
|
@ -1,25 +0,0 @@
|
|||
{
|
||||
description = "jive";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
|
||||
flake-utils.url = "github:numtide/flake-utils";
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-utils }:
|
||||
flake-utils.lib.eachDefaultSystem (system:
|
||||
let
|
||||
pkgs = import nixpkgs {
|
||||
inherit system;
|
||||
};
|
||||
in
|
||||
{
|
||||
packages.default = pkgs.hello;
|
||||
|
||||
devShell = with pkgs; mkShell {
|
||||
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
|
||||
|
||||
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
|
||||
};
|
||||
});
|
||||
}
|
76
src/lib.rs
76
src/lib.rs
|
@ -1,14 +1,80 @@
|
|||
#![feature(once_cell)]
|
||||
|
||||
use jitterbug::Executor;
|
||||
use std::{
|
||||
future::{pending, Future},
|
||||
lazy::SyncOnceCell,
|
||||
};
|
||||
|
||||
pub use foxtrot::{io, net, time};
|
||||
|
||||
pub mod sync {
|
||||
pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender};
|
||||
}
|
||||
|
||||
pub mod runtime;
|
||||
pub mod task;
|
||||
pub mod task {
|
||||
pub use jitterbug::{JoinError, JoinHandle};
|
||||
use std::future::Future;
|
||||
|
||||
pub use task::{spawn, spawn_blocking, spawn_local, spawn_local_unsend};
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
super::Runtime::get_or_init().executor.spawn(future)
|
||||
}
|
||||
|
||||
pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T {
|
||||
runtime::Runtime::new().block_on(future)
|
||||
pub fn spawn_blocking<T: Send + 'static>(
|
||||
callback: impl FnOnce() -> T + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
super::Runtime::get_or_init()
|
||||
.blocking
|
||||
.spawn(async move { (callback)() })
|
||||
}
|
||||
}
|
||||
|
||||
pub use task::{spawn, spawn_blocking};
|
||||
|
||||
pub fn block_on<T: Send + 'static>(
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Result<T, jitterbug::JoinError> {
|
||||
foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap()
|
||||
}
|
||||
|
||||
static RUNTIME: SyncOnceCell<Runtime> = SyncOnceCell::new();
|
||||
|
||||
struct Runtime {
|
||||
executor: Executor,
|
||||
blocking: Executor,
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
fn get_or_init() -> &'static Self {
|
||||
RUNTIME.get_or_init(Self::new)
|
||||
}
|
||||
|
||||
fn new() -> Self {
|
||||
let executor = Executor::new();
|
||||
let blocking = Executor::new();
|
||||
|
||||
let base_threads = std::thread::available_parallelism()
|
||||
.map(usize::from)
|
||||
.unwrap_or(1);
|
||||
|
||||
let blocking_threads = base_threads * 5;
|
||||
|
||||
for _ in 0..base_threads {
|
||||
let executor = executor.clone();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let _ = foxtrot::block_on(executor.into_runner());
|
||||
});
|
||||
}
|
||||
|
||||
for _ in 0..blocking_threads {
|
||||
let blocking = blocking.clone();
|
||||
|
||||
std::thread::spawn(move || blocking.block_on(pending::<()>()));
|
||||
}
|
||||
|
||||
Runtime { executor, blocking }
|
||||
}
|
||||
}
|
||||
|
|
282
src/runtime.rs
282
src/runtime.rs
|
@ -1,282 +0,0 @@
|
|||
use jitterbug::Executor;
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
future::{pending, Future},
|
||||
};
|
||||
|
||||
thread_local! {
|
||||
static RUNTIME: RefCell<Option<RuntimeState>> = RefCell::new(None);
|
||||
}
|
||||
|
||||
struct RuntimeState {
|
||||
executor: Executor,
|
||||
blocking: Executor,
|
||||
}
|
||||
|
||||
pub struct Runtime {
|
||||
runtime_handle: RuntimeHandle,
|
||||
thread_handles: Vec<std::thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RuntimeHandle {
|
||||
executor: Executor,
|
||||
blocking: Executor,
|
||||
}
|
||||
|
||||
pub struct RuntimeBuilder {
|
||||
executor_count: usize,
|
||||
blocking_count: usize,
|
||||
}
|
||||
|
||||
struct RuntimeToken;
|
||||
|
||||
impl RuntimeBuilder {
|
||||
pub fn new() -> Self {
|
||||
let executor_count = std::thread::available_parallelism()
|
||||
.map(usize::from)
|
||||
.unwrap_or(1);
|
||||
|
||||
Self {
|
||||
executor_count,
|
||||
blocking_count: executor_count * 5,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn executor_count(mut self, count: usize) -> Self {
|
||||
self.executor_count = count;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn blocking_count(mut self, count: usize) -> Self {
|
||||
self.blocking_count = count;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> Runtime {
|
||||
RuntimeState::create(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeHandle {
|
||||
pub fn current() -> Self {
|
||||
RUNTIME.with(|runtime| {
|
||||
let runtime = runtime.borrow();
|
||||
let runtime = runtime
|
||||
.as_ref()
|
||||
.expect("Must be called from within a Jive context");
|
||||
|
||||
RuntimeHandle {
|
||||
executor: runtime.executor.clone(),
|
||||
blocking: runtime.blocking.clone(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
self.executor.spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local_unsend<T>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + 'static,
|
||||
) -> bachata::JoinHandle<T> {
|
||||
bachata::spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
let (tx, rx) = jitterbug::oneshot();
|
||||
let (cancel_tx, cancel_rx) = jitterbug::oneshot::<()>();
|
||||
|
||||
bachata::spawn(async move {
|
||||
let future = std::pin::pin!(future);
|
||||
|
||||
match select::select(cancel_rx, future).await {
|
||||
select::Either::Left(_) => (), // canceled
|
||||
select::Either::Right(item) => {
|
||||
let _ = tx.send(item);
|
||||
}
|
||||
}
|
||||
});
|
||||
self.spawn(async move {
|
||||
let out = rx.await.unwrap();
|
||||
drop(cancel_tx);
|
||||
out
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spawn_blocking<T: Send + 'static>(
|
||||
&self,
|
||||
callback: impl FnOnce() -> T + Send + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
self.blocking.spawn(async move { (callback)() })
|
||||
}
|
||||
|
||||
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
|
||||
let token = RuntimeState::install(&self.executor, &self.blocking);
|
||||
|
||||
let res = foxtrot::block_on(bachata::run_with(future)).unwrap();
|
||||
|
||||
drop(token);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.executor.stop();
|
||||
self.blocking.stop();
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeState {
|
||||
fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken {
|
||||
RUNTIME.with(|runtime| {
|
||||
let prev = runtime.borrow_mut().replace(RuntimeState {
|
||||
executor: executor.clone(),
|
||||
blocking: blocking.clone(),
|
||||
});
|
||||
if prev.is_some() {
|
||||
panic!("Runtime already present on this thread");
|
||||
}
|
||||
});
|
||||
|
||||
RuntimeToken
|
||||
}
|
||||
|
||||
fn uninstall() -> Option<RuntimeState> {
|
||||
RUNTIME.with(|runtime| runtime.borrow_mut().take())
|
||||
}
|
||||
|
||||
fn create(builder: RuntimeBuilder) -> Runtime {
|
||||
let executor = Executor::new();
|
||||
let blocking = Executor::new();
|
||||
|
||||
let mut thread_handles = Vec::new();
|
||||
for _ in 0..builder.executor_count {
|
||||
let executor = executor.clone();
|
||||
let blocking = blocking.clone();
|
||||
|
||||
thread_handles.push(
|
||||
std::thread::Builder::new()
|
||||
.name("jive:executor".into())
|
||||
.spawn(move || {
|
||||
let token = RuntimeState::install(&executor, &blocking);
|
||||
|
||||
let _ = foxtrot::block_on(select::select(
|
||||
bachata::run_with_cooperative(pending::<()>()),
|
||||
executor.into_runner_cooperative(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
drop(token);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
for _ in 0..builder.blocking_count {
|
||||
let executor = executor.clone();
|
||||
let blocking = blocking.clone();
|
||||
|
||||
thread_handles.push(
|
||||
std::thread::Builder::new()
|
||||
.name("jive:blocking".into())
|
||||
.spawn(move || {
|
||||
let token = RuntimeState::install(&executor, &blocking);
|
||||
|
||||
let _ = blocking.block_on(pending::<()>());
|
||||
|
||||
drop(token);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
Runtime {
|
||||
runtime_handle: RuntimeHandle { executor, blocking },
|
||||
thread_handles,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
pub fn new() -> Self {
|
||||
RuntimeState::create(RuntimeBuilder::new())
|
||||
}
|
||||
|
||||
pub fn handle(&self) -> &RuntimeHandle {
|
||||
&self.runtime_handle
|
||||
}
|
||||
|
||||
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
|
||||
self.handle().block_on(future)
|
||||
}
|
||||
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
self.handle().spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local_unsend<T: 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + 'static,
|
||||
) -> bachata::JoinHandle<T> {
|
||||
self.handle().spawn_local_unsend(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
self.handle().spawn_local(future)
|
||||
}
|
||||
|
||||
pub fn spawn_blocking<T: Send + 'static>(
|
||||
&self,
|
||||
callback: impl FnOnce() -> T + Send + 'static,
|
||||
) -> jitterbug::JoinHandle<T> {
|
||||
self.handle().spawn_blocking(callback)
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.handle().stop();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RuntimeBuilder {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Runtime {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Runtime {
|
||||
fn drop(&mut self) {
|
||||
RuntimeState::uninstall();
|
||||
|
||||
self.stop();
|
||||
|
||||
for handle in self.thread_handles.drain(..) {
|
||||
handle.join().expect("Joined jive thread");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RuntimeToken {
|
||||
fn drop(&mut self) {
|
||||
RuntimeState::uninstall();
|
||||
}
|
||||
}
|
32
src/task.rs
32
src/task.rs
|
@ -1,32 +0,0 @@
|
|||
use crate::runtime::RuntimeHandle;
|
||||
use std::future::Future;
|
||||
|
||||
pub mod sync {
|
||||
pub use jitterbug::{AbortHandle, JoinError, JoinHandle};
|
||||
}
|
||||
|
||||
pub mod unsync {
|
||||
pub use bachata::{JoinError, JoinHandle};
|
||||
}
|
||||
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> sync::JoinHandle<T> {
|
||||
RuntimeHandle::current().spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local_unsend<T>(future: impl Future<Output = T> + 'static) -> unsync::JoinHandle<T> {
|
||||
RuntimeHandle::current().spawn_local_unsend(future)
|
||||
}
|
||||
|
||||
pub fn spawn_local<T: Send + 'static>(
|
||||
future: impl Future<Output = T> + 'static,
|
||||
) -> sync::JoinHandle<T> {
|
||||
RuntimeHandle::current().spawn_local(future)
|
||||
}
|
||||
|
||||
pub fn spawn_blocking<T: Send + 'static>(
|
||||
callback: impl FnOnce() -> T + Send + 'static,
|
||||
) -> sync::JoinHandle<T> {
|
||||
RuntimeHandle::current().spawn_blocking(callback)
|
||||
}
|
Loading…
Reference in a new issue