Select over local & work-stealing executors

This commit is contained in:
Aode (lion) 2022-03-07 20:19:11 -06:00
parent 98ab825479
commit 6f3288e357
7 changed files with 79 additions and 13 deletions

View file

@ -11,8 +11,10 @@ futures-io-compat = ["foxtrot/futures-compat"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bachata = { git = "https://git.asonix.dog/safe-async/bachata" }
foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
jitterbug = { git = "https://git.asonix.dog/safe-async/jitterbug" }
select = { git = "https://git.asonix.dog/safe-async/select" }
[dev-dependencies]
read-write-buf = { git = "https://git.asonix.dog/safe-async/read-write-buf" }

View file

@ -1,6 +1,6 @@
use std::time::Duration;
fn main() -> Result<(), jive::task::JoinError> {
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
jive::block_on(async move {
println!("hewwo");

View file

@ -5,7 +5,7 @@ use jive::{
use read_write_buf::ReadWriteBuf;
use std::time::Duration;
fn main() -> Result<(), jive::task::JoinError> {
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
jive::block_on(async move {
jive::spawn(async move {
let mut listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)).await {

View file

@ -1,20 +1,32 @@
use std::time::Duration;
fn main() -> Result<(), jive::task::JoinError> {
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
jive::block_on(async {
println!("hewwo");
let handles = (1..=50)
let handles = (0..10)
.map(|i| {
jive::spawn(async move {
jive::time::sleep(Duration::from_secs(2)).await;
println!("{} slept", i);
let handles = (1..=10)
.map(|j| {
jive::spawn_local(async move {
jive::time::sleep(Duration::from_secs(2)).await;
println!("{} slept", i * 10 + j);
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.await?;
}
Ok(()) as Result<_, jive::task::sync::JoinError>
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.await?;
handle.await??;
}
Ok(())

View file

@ -7,7 +7,7 @@ pub mod sync {
pub mod runtime;
pub mod task;
pub use task::{spawn, spawn_blocking};
pub use task::{spawn, spawn_blocking, spawn_local, spawn_local_unsend};
pub fn block_on<T>(future: impl std::future::Future<Output = T>) -> T {
runtime::Runtime::new().block_on(future)

View file

@ -80,6 +80,22 @@ impl RuntimeHandle {
self.executor.spawn(future)
}
pub fn spawn_local_unsend<T>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
bachata::spawn(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
let (tx, rx) = jitterbug::oneshot();
bachata::spawn(async move { tx.send(future.await) });
self.spawn(async move { rx.await.unwrap() })
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,
@ -90,7 +106,7 @@ impl RuntimeHandle {
pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
let token = RuntimeState::install(&self.executor, &self.blocking);
let res = foxtrot::block_on(future).unwrap();
let res = foxtrot::block_on(bachata::run_with(future)).unwrap();
drop(token);
@ -137,7 +153,11 @@ impl RuntimeState {
.spawn(move || {
let token = RuntimeState::install(&executor, &blocking);
let _ = foxtrot::block_on(executor.into_runner()).unwrap();
let _ = foxtrot::block_on(select::select(
bachata::run_with_cooperative(pending::<()>()),
executor.into_runner_cooperative(),
))
.unwrap();
drop(token);
})
@ -190,6 +210,20 @@ impl Runtime {
self.handle().spawn(future)
}
pub fn spawn_local_unsend<T: 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> bachata::JoinHandle<T> {
self.handle().spawn_local_unsend(future)
}
pub fn spawn_local<T: Send + 'static>(
&self,
future: impl Future<Output = T> + 'static,
) -> jitterbug::JoinHandle<T> {
self.handle().spawn_local(future)
}
pub fn spawn_blocking<T: Send + 'static>(
&self,
callback: impl FnOnce() -> T + Send + 'static,

View file

@ -1,14 +1,32 @@
use crate::runtime::RuntimeHandle;
use std::future::Future;
pub use jitterbug::{JoinError, JoinHandle};
pub mod sync {
pub use jitterbug::{JoinError, JoinHandle};
}
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
pub mod unsync {
pub use bachata::{JoinError, JoinHandle};
}
pub fn spawn<T: Send + 'static>(
future: impl Future<Output = T> + Send + 'static,
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn(future)
}
pub fn spawn_local_unsend<T>(future: impl Future<Output = T> + 'static) -> unsync::JoinHandle<T> {
RuntimeHandle::current().spawn_local_unsend(future)
}
pub fn spawn_local<T: Send + 'static>(
future: impl Future<Output = T> + 'static,
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn_local(future)
}
pub fn spawn_blocking<T: Send + 'static>(
callback: impl FnOnce() -> T + Send + 'static,
) -> JoinHandle<T> {
) -> sync::JoinHandle<T> {
RuntimeHandle::current().spawn_blocking(callback)
}