From 5220c16bd1e39130c43270dc7ce19565b5a328ce Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 24 Feb 2022 18:24:12 -0600 Subject: [PATCH] It works on stable now - Replace static SyncOnceCell with thread-locals, install & uninstall through block_on - Break runtime into own module --- src/lib.rs | 72 +----------------- src/runtime.rs | 197 +++++++++++++++++++++++++++++++++++++++++++++++++ src/task.rs | 14 ++++ 3 files changed, 215 insertions(+), 68 deletions(-) create mode 100644 src/runtime.rs create mode 100644 src/task.rs diff --git a/src/lib.rs b/src/lib.rs index 992245c..a482150 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,80 +1,16 @@ -#![feature(once_cell)] - -use jitterbug::Executor; -use std::{ - future::{pending, Future}, - lazy::SyncOnceCell, -}; - pub use foxtrot::{io, net, time}; pub mod sync { pub use jitterbug::{oneshot, Dropped as OneshotError, Receiver, Sender}; } -pub mod task { - pub use jitterbug::{JoinError, JoinHandle}; - use std::future::Future; - - pub fn spawn( - future: impl Future + Send + 'static, - ) -> JoinHandle { - super::Runtime::get_or_init().executor.spawn(future) - } - - pub fn spawn_blocking( - callback: impl FnOnce() -> T + Send + 'static, - ) -> JoinHandle { - super::Runtime::get_or_init() - .blocking - .spawn(async move { (callback)() }) - } -} +pub mod runtime; +pub mod task; pub use task::{spawn, spawn_blocking}; pub fn block_on( - future: impl Future + Send + 'static, + future: impl std::future::Future + Send + 'static, ) -> Result { - foxtrot::block_on(Runtime::get_or_init().executor.run_with(future)).unwrap() -} - -static RUNTIME: SyncOnceCell = SyncOnceCell::new(); - -struct Runtime { - executor: Executor, - blocking: Executor, -} - -impl Runtime { - fn get_or_init() -> &'static Self { - RUNTIME.get_or_init(Self::new) - } - - fn new() -> Self { - let executor = Executor::new(); - let blocking = Executor::new(); - - let base_threads = std::thread::available_parallelism() - .map(usize::from) - .unwrap_or(1); - - let blocking_threads = base_threads * 5; - - for _ in 0..base_threads { - let executor = executor.clone(); - - std::thread::spawn(move || { - let _ = foxtrot::block_on(executor.into_runner()); - }); - } - - for _ in 0..blocking_threads { - let blocking = blocking.clone(); - - std::thread::spawn(move || blocking.block_on(pending::<()>())); - } - - Runtime { executor, blocking } - } + runtime::Runtime::new().block_on(future) } diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..d3bb618 --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,197 @@ +use jitterbug::Executor; +use std::{ + cell::RefCell, + future::{pending, Future}, +}; + +thread_local! { + static RUNTIME: RefCell> = RefCell::new(None); +} + +struct RuntimeState { + executor: Executor, + blocking: Executor, +} + +pub struct Runtime { + runtime_handle: RuntimeHandle, + thread_handles: Vec>, +} + +#[derive(Clone)] +pub struct RuntimeHandle { + executor: Executor, + blocking: Executor, +} + +struct RuntimeToken; + +impl RuntimeHandle { + pub fn current() -> Self { + RUNTIME.with(|runtime| { + let runtime = runtime.borrow(); + let runtime = runtime + .as_ref() + .expect("Must be called from within a Jive context"); + + RuntimeHandle { + executor: runtime.executor.clone(), + blocking: runtime.blocking.clone(), + } + }) + } + + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> jitterbug::JoinHandle { + self.executor.spawn(future) + } + + pub fn spawn_blocking( + &self, + callback: impl FnOnce() -> T + Send + 'static, + ) -> jitterbug::JoinHandle { + self.blocking.spawn(async move { (callback)() }) + } + + pub fn block_on( + &self, + future: impl Future + Send + 'static, + ) -> Result { + let token = RuntimeState::install(&self.executor, &self.blocking); + + let res = foxtrot::block_on(self.executor.run_with(future)).unwrap(); + + drop(token); + + res + } + + pub fn stop(&self) { + self.executor.stop(); + self.blocking.stop(); + } +} + +impl RuntimeState { + fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken { + RUNTIME.with(|runtime| { + let prev = runtime.borrow_mut().replace(RuntimeState { + executor: executor.clone(), + blocking: blocking.clone(), + }); + if prev.is_some() { + panic!("Runtime already present on this thread"); + } + }); + + RuntimeToken + } + + fn uninstall() -> Option { + RUNTIME.with(|runtime| runtime.borrow_mut().take()) + } + + fn create() -> Runtime { + let executor = Executor::new(); + let blocking = Executor::new(); + + let base_threads = std::thread::available_parallelism() + .map(usize::from) + .unwrap_or(1); + + let blocking_threads = base_threads * 5; + + let mut thread_handles = Vec::new(); + for _ in 0..base_threads { + let executor = executor.clone(); + let blocking = blocking.clone(); + + thread_handles.push(std::thread::spawn(move || { + let token = RuntimeState::install(&executor, &blocking); + + let _ = foxtrot::block_on(executor.into_runner()).unwrap(); + + drop(token); + })); + } + + for _ in 0..blocking_threads { + let executor = executor.clone(); + let blocking = blocking.clone(); + + thread_handles.push(std::thread::spawn(move || { + let token = RuntimeState::install(&executor, &blocking); + + let _ = blocking.block_on(pending::<()>()); + + drop(token); + })); + } + + Runtime { + runtime_handle: RuntimeHandle { executor, blocking }, + thread_handles, + } + } +} + +impl Runtime { + pub fn new() -> Self { + RuntimeState::create() + } + + pub fn handle(&self) -> &RuntimeHandle { + &self.runtime_handle + } + + pub fn block_on( + &self, + future: impl Future + Send + 'static, + ) -> Result { + self.handle().block_on(future) + } + + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> jitterbug::JoinHandle { + self.handle().spawn(future) + } + + pub fn spawn_blocking( + &self, + callback: impl FnOnce() -> T + Send + 'static, + ) -> jitterbug::JoinHandle { + self.handle().spawn_blocking(callback) + } + + pub fn stop(&self) { + self.handle().stop(); + } +} + +impl Default for Runtime { + fn default() -> Self { + Self::new() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + RuntimeState::uninstall(); + + self.stop(); + + for handle in self.thread_handles.drain(..) { + handle.join().expect("Joined jive thread"); + } + } +} + +impl Drop for RuntimeToken { + fn drop(&mut self) { + RuntimeState::uninstall(); + } +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..ca7b9ef --- /dev/null +++ b/src/task.rs @@ -0,0 +1,14 @@ +use crate::runtime::RuntimeHandle; +use std::future::Future; + +pub use jitterbug::{JoinError, JoinHandle}; + +pub fn spawn(future: impl Future + Send + 'static) -> JoinHandle { + RuntimeHandle::current().spawn(future) +} + +pub fn spawn_blocking( + callback: impl FnOnce() -> T + Send + 'static, +) -> JoinHandle { + RuntimeHandle::current().spawn_blocking(callback) +}