jive/src/runtime.rs

283 lines
7 KiB
Rust

use jitterbug::Executor;
use std::{
cell::RefCell,
future::{pending, Future},
};
thread_local! {
static RUNTIME: RefCell<Option<RuntimeState>> = RefCell::new(None);
}
struct RuntimeState {
executor: Executor,
blocking: Executor,
}
pub struct Runtime {
runtime_handle: RuntimeHandle,
thread_handles: Vec<std::thread::JoinHandle<()>>,
}
#[derive(Clone)]
pub struct RuntimeHandle {
executor: Executor,
blocking: Executor,
}
pub struct RuntimeBuilder {
executor_count: usize,
blocking_count: usize,
}
struct RuntimeToken;
impl RuntimeBuilder {
pub fn new() -> Self {
let executor_count = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1);
Self {
executor_count,
blocking_count: executor_count * 5,
}
}
pub fn executor_count(mut self, count: usize) -> Self {
self.executor_count = count;
self
}
pub fn blocking_count(mut self, count: usize) -> Self {
self.blocking_count = count;
self
}
pub fn build(self) -> Runtime {
RuntimeState::create(self)
}
}
impl RuntimeHandle {
pub fn current() -> Self {
RUNTIME.with(|runtime| {
let runtime = runtime.borrow();
let runtime = runtime
.as_ref()
.expect("Must be called from within a Jive context");
RuntimeHandle {
executor: runtime.executor.clone(),
blocking: runtime.blocking.clone(),
}
})
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.executor.spawn(future)
}
pub fn spawn_local_unsend<T>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
bachata::spawn(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
let (tx, rx) = jitterbug::oneshot();
let (cancel_tx, cancel_rx) = jitterbug::oneshot::<()>();
bachata::spawn(async move {
let future = std::pin::pin!(future);
match select::select(cancel_rx, future).await {
select::Either::Left(_) => (), // canceled
select::Either::Right(item) => {
let _ = tx.send(item);
}
}
});
self.spawn(async move {
let out = rx.await.unwrap();
drop(cancel_tx);
out
})
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.blocking.spawn(async move { (callback)() })
}
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
let token = RuntimeState::install(&self.executor, &self.blocking);
let res = foxtrot::block_on(bachata::run_with(future)).unwrap();
drop(token);
res
}
pub fn stop(&self) {
self.executor.stop();
self.blocking.stop();
}
}
impl RuntimeState {
fn install(executor: &Executor, blocking: &Executor) -> RuntimeToken {
RUNTIME.with(|runtime| {
let prev = runtime.borrow_mut().replace(RuntimeState {
executor: executor.clone(),
blocking: blocking.clone(),
});
if prev.is_some() {
panic!("Runtime already present on this thread");
}
});
RuntimeToken
}
fn uninstall() -> Option<RuntimeState> {
RUNTIME.with(|runtime| runtime.borrow_mut().take())
}
fn create(builder: RuntimeBuilder) -> Runtime {
let executor = Executor::new();
let blocking = Executor::new();
let mut thread_handles = Vec::new();
for _ in 0..builder.executor_count {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(
std::thread::Builder::new()
.name("jive:executor".into())
.spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = foxtrot::block_on(select::select(
bachata::run_with_cooperative(pending::<()>()),
executor.into_runner_cooperative(),
))
.unwrap();
drop(token);
})
.unwrap(),
);
}
for _ in 0..builder.blocking_count {
let executor = executor.clone();
let blocking = blocking.clone();
thread_handles.push(
std::thread::Builder::new()
.name("jive:blocking".into())
.spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = blocking.block_on(pending::<()>());
drop(token);
})
.unwrap(),
);
}
Runtime {
runtime_handle: RuntimeHandle { executor, blocking },
thread_handles,
}
}
}
impl Runtime {
pub fn new() -> Self {
RuntimeState::create(RuntimeBuilder::new())
}
pub fn handle(&self) -> &RuntimeHandle {
&self.runtime_handle
}
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
self.handle().block_on(future)
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn(future)
}
pub fn spawn_local_unsend<T: 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
self.handle().spawn_local_unsend(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_local(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_blocking(callback)
}
pub fn stop(&self) {
self.handle().stop();
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
RuntimeState::uninstall();
self.stop();
for handle in self.thread_handles.drain(..) {
handle.join().expect("Joined jive thread");
}
}
}
impl Drop for RuntimeToken {
fn drop(&mut self) {
RuntimeState::uninstall();
}
}