From 0bc01de384e2954f0de9a63fb16c19ed50dba1b6 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Thu, 10 Feb 2022 22:31:44 -0600 Subject: [PATCH] Try waking after spawn --- examples/io.rs | 2 -- src/lib.rs | 41 ++++++++++++----------------------------- 2 files changed, 12 insertions(+), 31 deletions(-) diff --git a/examples/io.rs b/examples/io.rs index 5f790f6..0ea3b2f 100644 --- a/examples/io.rs +++ b/examples/io.rs @@ -7,14 +7,12 @@ use std::time::Duration; fn main() -> Result<(), Box> { let executor = Executor::new(); - /* for _ in 0..4 { let execu2r = executor.clone(); std::thread::spawn(move || { async_io::block_on(execu2r.into_runner()); }); } - */ let runner = executor.clone().into_runner(); diff --git a/src/lib.rs b/src/lib.rs index 9211129..5d15afe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,8 +87,6 @@ impl Task { } } Err(_) => { - println!("Panic in poll"); - if let Some(executor) = Weak::upgrade(&executor) { let mut inner = executor.lock().unwrap(); // This will remove the last remaining Arc for this task, dropping after poll completes @@ -120,8 +118,6 @@ impl Wake for SafeWaker { fn wake_by_ref(self: &Arc) { if let Some(task) = Weak::upgrade(&self.task) { - println!("Waking {}", task.task_id); - if !task.woken.swap(true, Ordering::AcqRel) { if let Some(executor) = Weak::upgrade(&self.executor) { let mut inner = executor.lock().unwrap(); @@ -171,6 +167,7 @@ struct ThreadState { prune_time: Instant, unpark_time: Instant, last_steal: Option, + waker: Option, } impl ThreadState { @@ -184,6 +181,7 @@ impl ThreadState { prune_time: Instant::now(), unpark_time: Instant::now(), last_steal: None, + waker: None, } } @@ -294,10 +292,8 @@ impl Drop for Runner { } impl Runner { - pub fn tick(&self, waker: &Waker) -> bool { - println!("Ticking"); + pub fn tick(&self, waker: Waker) -> bool { if self.stopping() { - println!("Stopping"); return false; } @@ -308,17 +304,15 @@ impl Runner { }; if woken.is_empty() { - println!("Nothing to do"); return false; } - println!("poll ing"); - for task in woken { task.poll(Arc::downgrade(&self.state), waker.clone()); } - println!("Polled"); + self.state.lock().unwrap().waker = Some(waker); + true } @@ -334,14 +328,11 @@ impl Runner { .unwrap() .push_back(std::thread::current()); - println!("I'm parking! {:?}", std::thread::current().id()); std::thread::park(); - println!("I'm woken! {:?}", std::thread::current().id()); } } fn steal(&self) -> bool { - println!("Steal ing"); let read_guard = self.inner.threads.threads.read().unwrap(); if read_guard.len() == 1 { return false; @@ -366,7 +357,6 @@ impl Runner { if split_point > 0 { guard.woken.extend(state.woken.drain(split_point..)); guard.last_steal = Some(*tid); - println!("Stolen haha"); return true; } } @@ -381,14 +371,12 @@ impl Runner { if split_point > 0 { guard.woken.extend(state.woken.drain(split_point..)); guard.last_steal = Some(*tid); - println!("Stolen haha"); return true; } } } } - println!("No steal"); false } @@ -408,7 +396,6 @@ impl Runner { &self, future: impl Future + Send + 'static, ) -> JoinHandle { - println!("Spawning"); let mut guard = self.state.lock().unwrap(); let task = guard @@ -431,7 +418,6 @@ impl Runner { guard.update_unpark(); } - println!("Spawned"); JoinHandle { rx } } @@ -439,7 +425,6 @@ impl Runner { &self, f: impl Future + Send + 'static, ) -> Result { - println!("Blocking on"); let mut join_handle = self.spawn(f); let block_on_waker = Arc::new(BlockOnWaker { @@ -450,16 +435,13 @@ impl Runner { let mut block_on_context = Context::from_waker(&block_on_waker); loop { - println!("Loop start"); - while self.tick(&block_on_waker) { - println!("Ticked yes"); + while self.tick(block_on_waker.clone()) { if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) { return res; } self.heuristic_prune(); } - println!("Ticked no"); if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) { return res; @@ -527,10 +509,11 @@ impl Executor { % read_guard.len().max(1); let state = read_guard.values().nth(last_spawn).unwrap(); - let res = { - let mut guard = state.lock().unwrap(); - guard.spawn(&self.inner, future) - }; + let mut guard = state.lock().unwrap(); + let res = guard.spawn(&self.inner, future); + if let Some(waker) = guard.waker.as_ref() { + waker.wake_by_ref(); + } res } @@ -549,7 +532,7 @@ impl Future for Runner { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut(); - while this.tick(cx.waker()) { + while this.tick(cx.waker().clone()) { // processing spawned tasks }