Add cooperative one-at-a-time mode

This commit is contained in:
Aode (lion) 2022-03-07 18:53:50 -06:00
parent b5b6c19679
commit 110a919385

View file

@ -297,6 +297,7 @@ pub struct Executor {
pub struct Runner {
inner: Arc<Inner>,
state: Arc<Mutex<ThreadState>>,
cooperative: bool,
// force Runner to be !Send
phantom: PhantomData<Rc<()>>,
@ -330,6 +331,9 @@ impl Runner {
while let Some(task) = self.pop_front_woken() {
any_polled = true;
task.poll(Arc::downgrade(&self.state), waker.clone());
if self.cooperative {
break;
}
}
any_polled
@ -490,6 +494,14 @@ impl Executor {
}
pub fn into_runner(self) -> Runner {
self._into_runner(false)
}
pub fn into_runner_cooperative(self) -> Runner {
self._into_runner(true)
}
fn _into_runner(self, cooperative: bool) -> Runner {
let state = ThreadState::new();
let id = state.handle.id();
@ -503,6 +515,7 @@ impl Executor {
Runner {
inner: self.inner,
state,
cooperative,
phantom: PhantomData,
}
}
@ -552,13 +565,19 @@ impl Executor {
let handle = self.spawn(f);
RunWith { handle, runner }.await
RunWith {
handle,
runner,
cooperative: false,
}
.await
}
}
struct RunWith<T> {
handle: JoinHandle<T>,
runner: Runner,
cooperative: bool,
}
impl Future for Runner {
@ -597,6 +616,11 @@ where
if let Poll::Ready(t) = Pin::new(&mut this.handle).poll(cx) {
return Poll::Ready(t);
}
if this.cooperative {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
if let Poll::Ready(t) = Pin::new(&mut this.handle).poll(cx) {