240 lines
5.7 KiB
Rust
240 lines
5.7 KiB
Rust
use jitterbug::Executor;
|
|
use std::{
|
|
cell::RefCell,
|
|
future::{pending, Future},
|
|
};
|
|
|
|
thread_local! {
|
|
static RUNTIME: RefCell<Option<RuntimeState>> = RefCell::new(None);
|
|
}
|
|
|
|
struct RuntimeState {
|
|
executor: Executor,
|
|
blocking: Executor,
|
|
}
|
|
|
|
pub struct Runtime {
|
|
runtime_handle: RuntimeHandle,
|
|
thread_handles: Vec<std::thread::JoinHandle<()>>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct RuntimeHandle {
|
|
executor: Executor,
|
|
blocking: Executor,
|
|
}
|
|
|
|
pub struct RuntimeBuilder {
|
|
executor_count: usize,
|
|
blocking_count: usize,
|
|
}
|
|
|
|
struct RuntimeToken;
|
|
|
|
impl RuntimeBuilder {
|
|
pub fn new() -> Self {
|
|
let executor_count = std::thread::available_parallelism()
|
|
.map(usize::from)
|
|
.unwrap_or(1);
|
|
|
|
Self {
|
|
executor_count,
|
|
blocking_count: executor_count * 5,
|
|
}
|
|
}
|
|
|
|
pub fn executor_count(mut self, count: usize) -> Self {
|
|
self.executor_count = count;
|
|
self
|
|
}
|
|
|
|
pub fn blocking_count(mut self, count: usize) -> Self {
|
|
self.blocking_count = count;
|
|
self
|
|
}
|
|
|
|
pub fn build(self) -> Runtime {
|
|
RuntimeState::create(self)
|
|
}
|
|
}
|
|
|
|
impl RuntimeHandle {
|
|
pub fn current() -> Self {
|
|
RUNTIME.with(|runtime| {
|
|
let runtime = runtime.borrow();
|
|
let runtime = runtime
|
|
.as_ref()
|
|
.expect("Must be called from within a Jive context");
|
|
|
|
RuntimeHandle {
|
|
executor: runtime.executor.clone(),
|
|
blocking: runtime.blocking.clone(),
|
|
}
|
|
})
|
|
}
|
|
|
|
pub fn spawn<T: Send + 'static>(
|
|
&self,
|
|
future: impl Future<Output = T> + Send + 'static,
|
|
) -> jitterbug::JoinHandle<T> {
|
|
self.executor.spawn(future)
|
|
}
|
|
|
|
pub fn spawn_blocking<T: Send + 'static>(
|
|
&self,
|
|
callback: impl FnOnce() -> T + Send + 'static,
|
|
) -> jitterbug::JoinHandle<T> {
|
|
self.blocking.spawn(async move { (callback)() })
|
|
}
|
|
|
|
pub fn block_on<T: Send + 'static>(
|
|
&self,
|
|
future: impl Future<Output = T> + Send + 'static,
|
|
) -> Result<T, jitterbug::JoinError> {
|
|
let token = RuntimeState::install(&self.executor, &self.blocking);
|
|
|
|
let res = foxtrot::block_on(self.executor.run_with(future)).unwrap();
|
|
|
|
drop(token);
|
|
|
|
res
|
|
}
|
|
|
|
pub fn stop(&self) {
|
|
self.executor.stop();
|
|
self.blocking.stop();
|
|
}
|
|
}
|
|
|
|
impl RuntimeState {
|
|
fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken {
|
|
RUNTIME.with(|runtime| {
|
|
let prev = runtime.borrow_mut().replace(RuntimeState {
|
|
executor: executor.clone(),
|
|
blocking: blocking.clone(),
|
|
});
|
|
if prev.is_some() {
|
|
panic!("Runtime already present on this thread");
|
|
}
|
|
});
|
|
|
|
RuntimeToken
|
|
}
|
|
|
|
fn uninstall() -> Option<RuntimeState> {
|
|
RUNTIME.with(|runtime| runtime.borrow_mut().take())
|
|
}
|
|
|
|
fn create(builder: RuntimeBuilder) -> Runtime {
|
|
let executor = Executor::new();
|
|
let blocking = Executor::new();
|
|
|
|
let mut thread_handles = Vec::new();
|
|
for _ in 0..builder.executor_count {
|
|
let executor = executor.clone();
|
|
let blocking = blocking.clone();
|
|
|
|
thread_handles.push(
|
|
std::thread::Builder::new()
|
|
.name("jive:executor".into())
|
|
.spawn(move || {
|
|
let token = RuntimeState::install(&executor, &blocking);
|
|
|
|
let _ = foxtrot::block_on(executor.into_runner()).unwrap();
|
|
|
|
drop(token);
|
|
})
|
|
.unwrap(),
|
|
);
|
|
}
|
|
|
|
for _ in 0..builder.blocking_count {
|
|
let executor = executor.clone();
|
|
let blocking = blocking.clone();
|
|
|
|
thread_handles.push(
|
|
std::thread::Builder::new()
|
|
.name("jive:blocking".into())
|
|
.spawn(move || {
|
|
let token = RuntimeState::install(&executor, &blocking);
|
|
|
|
let _ = blocking.block_on(pending::<()>());
|
|
|
|
drop(token);
|
|
})
|
|
.unwrap(),
|
|
);
|
|
}
|
|
|
|
Runtime {
|
|
runtime_handle: RuntimeHandle { executor, blocking },
|
|
thread_handles,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Runtime {
|
|
pub fn new() -> Self {
|
|
RuntimeState::create(RuntimeBuilder::new())
|
|
}
|
|
|
|
pub fn handle(&self) -> &RuntimeHandle {
|
|
&self.runtime_handle
|
|
}
|
|
|
|
pub fn block_on<T: Send + 'static>(
|
|
&self,
|
|
future: impl Future<Output = T> + Send + 'static,
|
|
) -> Result<T, jitterbug::JoinError> {
|
|
self.handle().block_on(future)
|
|
}
|
|
|
|
pub fn spawn<T: Send + 'static>(
|
|
&self,
|
|
future: impl Future<Output = T> + Send + 'static,
|
|
) -> jitterbug::JoinHandle<T> {
|
|
self.handle().spawn(future)
|
|
}
|
|
|
|
pub fn spawn_blocking<T: Send + 'static>(
|
|
&self,
|
|
callback: impl FnOnce() -> T + Send + 'static,
|
|
) -> jitterbug::JoinHandle<T> {
|
|
self.handle().spawn_blocking(callback)
|
|
}
|
|
|
|
pub fn stop(&self) {
|
|
self.handle().stop();
|
|
}
|
|
}
|
|
|
|
impl Default for RuntimeBuilder {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl Default for Runtime {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl Drop for Runtime {
|
|
fn drop(&mut self) {
|
|
RuntimeState::uninstall();
|
|
|
|
self.stop();
|
|
|
|
for handle in self.thread_handles.drain(..) {
|
|
handle.join().expect("Joined jive thread");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for RuntimeToken {
|
|
fn drop(&mut self) {
|
|
RuntimeState::uninstall();
|
|
}
|
|
}
|