It works on stable now

- Replace static SyncOnceCell with thread-locals, install & uninstall through block_on
- Break runtime into own module
This commit is contained in:
Aode (Lion) 2022-02-24 18:24:12 -06:00
parent 17a4125e78
commit 5220c16bd1
3 changed files with 215 additions and 68 deletions

View file

@ -1,80 +1,16 @@
#![feature(once_cell)]
use jitterbug::Executor;
use std::{
future::{pending, Future},
lazy::SyncOnceCell,
};
pub use foxtrot::{io, net, time};
pub mod sync {
pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender};
}
pub mod task {
pub use jitterbug::{JoinError, JoinHandle};
use std::future::Future;
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init().executor.spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> JoinHandle<T> {
super::Runtime::get_or_init()
.blocking
.spawn(async move { (callback)() })
}
}
pub mod runtime;
pub mod task;
pub use task::{spawn, spawn_blocking};
pub fn block_on<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
future: impl std::future::Future<Output = T> + Send + 'static,
) -> Result<T, jitterbug::JoinError> {
foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap()
}
static RUNTIME: SyncOnceCell<Runtime> = SyncOnceCell::new();
struct Runtime {
executor: Executor,
blocking: Executor,
}
impl Runtime {
fn get_or_init() -> &'static Self {
RUNTIME.get_or_init(Self::new)
}
fn new() -> Self {
let executor = Executor::new();
let blocking = Executor::new();
let base_threads = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let blocking_threads = base_threads * 5;
for _ in 0..base_threads {
let executor = executor.clone();
std::thread::spawn(move || {
let _ = foxtrot::block_on(executor.into_runner());
});
}
for _ in 0..blocking_threads {
let blocking = blocking.clone();
std::thread::spawn(move || blocking.block_on(pending::<()>()));
}
Runtime { executor, blocking }
}
runtime::Runtime::new().block_on(future)
}

197
src/runtime.rs Normal file
View file

@ -0,0 +1,197 @@
use jitterbug::Executor;
use std::{
cell::RefCell,
future::{pending, Future},
};
thread_local! {
static RUNTIME: RefCell<Option<RuntimeState>> = RefCell::new(None);
}
struct RuntimeState {
executor: Executor,
blocking: Executor,
}
pub struct Runtime {
runtime_handle: RuntimeHandle,
thread_handles: Vec<std::thread::JoinHandle<()>>,
}
#[derive(Clone)]
pub struct RuntimeHandle {
executor: Executor,
blocking: Executor,
}
struct RuntimeToken;
impl RuntimeHandle {
pub fn current() -> Self {
RUNTIME.with(|runtime| {
let runtime = runtime.borrow();
let runtime = runtime
.as_ref()
.expect("Must be called from within a Jive context");
RuntimeHandle {
executor: runtime.executor.clone(),
blocking: runtime.blocking.clone(),
}
})
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.executor.spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.blocking.spawn(async move { (callback)() })
}
pub fn block_on<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Result<T, jitterbug::JoinError> {
let token = RuntimeState::install(&self.executor, &self.blocking);
let res = foxtrot::block_on(self.executor.run_with(future)).unwrap();
drop(token);
res
}
pub fn stop(&self) {
self.executor.stop();
self.blocking.stop();
}
}
impl RuntimeState {
fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken {
RUNTIME.with(|runtime| {
let prev = runtime.borrow_mut().replace(RuntimeState {
executor: executor.clone(),
blocking: blocking.clone(),
});
if prev.is_some() {
panic!("Runtime already present on this thread");
}
});
RuntimeToken
}
fn uninstall() -> Option<RuntimeState> {
RUNTIME.with(|runtime| runtime.borrow_mut().take())
}
fn create() -> Runtime {
let executor = Executor::new();
let blocking = Executor::new();
let base_threads = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
let blocking_threads = base_threads * 5;
let mut thread_handles = Vec::new();
for _ in 0..base_threads {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(std::thread::spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = foxtrot::block_on(executor.into_runner()).unwrap();
drop(token);
}));
}
for _ in 0..blocking_threads {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(std::thread::spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = blocking.block_on(pending::<()>());
drop(token);
}));
}
Runtime {
runtime_handle: RuntimeHandle { executor, blocking },
thread_handles,
}
}
}
impl Runtime {
pub fn new() -> Self {
RuntimeState::create()
}
pub fn handle(&self) -> &RuntimeHandle {
&self.runtime_handle
}
pub fn block_on<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Result<T, jitterbug::JoinError> {
self.handle().block_on(future)
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_blocking(callback)
}
pub fn stop(&self) {
self.handle().stop();
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
RuntimeState::uninstall();
self.stop();
for handle in self.thread_handles.drain(..) {
handle.join().expect("Joined jive thread");
}
}
}
impl Drop for RuntimeToken {
fn drop(&mut self) {
RuntimeState::uninstall();
}
}

14
src/task.rs Normal file
View file

@ -0,0 +1,14 @@
use crate::runtime::RuntimeHandle;
use std::future::Future;
pub use jitterbug::{JoinError, JoinHandle};
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
RuntimeHandle::current().spawn(future)
}
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> JoinHandle<T> {
RuntimeHandle::current().spawn_blocking(callback)
}