Try waking after spawn
This commit is contained in:
parent
c2600abd06
commit
0bc01de384
2 changed files with 12 additions and 31 deletions
|
@ -7,14 +7,12 @@ use std::time::Duration;
|
|||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let executor = Executor::new();
|
||||
|
||||
/*
|
||||
for _ in 0..4 {
|
||||
let execu2r = executor.clone();
|
||||
std::thread::spawn(move || {
|
||||
async_io::block_on(execu2r.into_runner());
|
||||
});
|
||||
}
|
||||
*/
|
||||
|
||||
let runner = executor.clone().into_runner();
|
||||
|
||||
|
|
41
src/lib.rs
41
src/lib.rs
|
@ -87,8 +87,6 @@ impl Task {
|
|||
}
|
||||
}
|
||||
Err(_) => {
|
||||
println!("Panic in poll");
|
||||
|
||||
if let Some(executor) = Weak::upgrade(&executor) {
|
||||
let mut inner = executor.lock().unwrap();
|
||||
// This will remove the last remaining Arc<Task> for this task, dropping after poll completes
|
||||
|
@ -120,8 +118,6 @@ impl Wake for SafeWaker {
|
|||
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
if let Some(task) = Weak::upgrade(&self.task) {
|
||||
println!("Waking {}", task.task_id);
|
||||
|
||||
if !task.woken.swap(true, Ordering::AcqRel) {
|
||||
if let Some(executor) = Weak::upgrade(&self.executor) {
|
||||
let mut inner = executor.lock().unwrap();
|
||||
|
@ -171,6 +167,7 @@ struct ThreadState {
|
|||
prune_time: Instant,
|
||||
unpark_time: Instant,
|
||||
last_steal: Option<ThreadId>,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
impl ThreadState {
|
||||
|
@ -184,6 +181,7 @@ impl ThreadState {
|
|||
prune_time: Instant::now(),
|
||||
unpark_time: Instant::now(),
|
||||
last_steal: None,
|
||||
waker: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,10 +292,8 @@ impl Drop for Runner {
|
|||
}
|
||||
|
||||
impl Runner {
|
||||
pub fn tick(&self, waker: &Waker) -> bool {
|
||||
println!("Ticking");
|
||||
pub fn tick(&self, waker: Waker) -> bool {
|
||||
if self.stopping() {
|
||||
println!("Stopping");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -308,17 +304,15 @@ impl Runner {
|
|||
};
|
||||
|
||||
if woken.is_empty() {
|
||||
println!("Nothing to do");
|
||||
return false;
|
||||
}
|
||||
|
||||
println!("poll ing");
|
||||
|
||||
for task in woken {
|
||||
task.poll(Arc::downgrade(&self.state), waker.clone());
|
||||
}
|
||||
|
||||
println!("Polled");
|
||||
self.state.lock().unwrap().waker = Some(waker);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
|
@ -334,14 +328,11 @@ impl Runner {
|
|||
.unwrap()
|
||||
.push_back(std::thread::current());
|
||||
|
||||
println!("I'm parking! {:?}", std::thread::current().id());
|
||||
std::thread::park();
|
||||
println!("I'm woken! {:?}", std::thread::current().id());
|
||||
}
|
||||
}
|
||||
|
||||
fn steal(&self) -> bool {
|
||||
println!("Steal ing");
|
||||
let read_guard = self.inner.threads.threads.read().unwrap();
|
||||
if read_guard.len() == 1 {
|
||||
return false;
|
||||
|
@ -366,7 +357,6 @@ impl Runner {
|
|||
if split_point > 0 {
|
||||
guard.woken.extend(state.woken.drain(split_point..));
|
||||
guard.last_steal = Some(*tid);
|
||||
println!("Stolen haha");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -381,14 +371,12 @@ impl Runner {
|
|||
if split_point > 0 {
|
||||
guard.woken.extend(state.woken.drain(split_point..));
|
||||
guard.last_steal = Some(*tid);
|
||||
println!("Stolen haha");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("No steal");
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -408,7 +396,6 @@ impl Runner {
|
|||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
println!("Spawning");
|
||||
let mut guard = self.state.lock().unwrap();
|
||||
|
||||
let task = guard
|
||||
|
@ -431,7 +418,6 @@ impl Runner {
|
|||
guard.update_unpark();
|
||||
}
|
||||
|
||||
println!("Spawned");
|
||||
JoinHandle { rx }
|
||||
}
|
||||
|
||||
|
@ -439,7 +425,6 @@ impl Runner {
|
|||
&self,
|
||||
f: impl Future<Output = T> + Send + 'static,
|
||||
) -> Result<T, JoinError> {
|
||||
println!("Blocking on");
|
||||
let mut join_handle = self.spawn(f);
|
||||
|
||||
let block_on_waker = Arc::new(BlockOnWaker {
|
||||
|
@ -450,16 +435,13 @@ impl Runner {
|
|||
let mut block_on_context = Context::from_waker(&block_on_waker);
|
||||
|
||||
loop {
|
||||
println!("Loop start");
|
||||
while self.tick(&block_on_waker) {
|
||||
println!("Ticked yes");
|
||||
while self.tick(block_on_waker.clone()) {
|
||||
if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) {
|
||||
return res;
|
||||
}
|
||||
|
||||
self.heuristic_prune();
|
||||
}
|
||||
println!("Ticked no");
|
||||
|
||||
if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) {
|
||||
return res;
|
||||
|
@ -527,10 +509,11 @@ impl Executor {
|
|||
% read_guard.len().max(1);
|
||||
|
||||
let state = read_guard.values().nth(last_spawn).unwrap();
|
||||
let res = {
|
||||
let mut guard = state.lock().unwrap();
|
||||
guard.spawn(&self.inner, future)
|
||||
};
|
||||
let mut guard = state.lock().unwrap();
|
||||
let res = guard.spawn(&self.inner, future);
|
||||
if let Some(waker) = guard.waker.as_ref() {
|
||||
waker.wake_by_ref();
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
@ -549,7 +532,7 @@ impl Future for Runner {
|
|||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut();
|
||||
|
||||
while this.tick(cx.waker()) {
|
||||
while this.tick(cx.waker().clone()) {
|
||||
// processing spawned tasks
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue