Revert "Have waker wake the current thread"

Waking the current thread does not alter previous behavior

This reverts commit 727975f2ab.
This commit is contained in:
Aode (lion) 2022-02-15 13:18:22 -06:00
parent 727975f2ab
commit 43a1c133cb

View file

@ -1,11 +1,11 @@
use polldance::{Key, NotifyToken, PollManager, Readiness};
use polldance::{Key, PollManager, Readiness};
use rustix::fd::AsFd;
use std::{
collections::{BTreeMap, HashMap},
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, MutexGuard, RwLock,
Arc, Mutex, MutexGuard,
},
task::{Context, Poll, Wake, Waker},
time::{Duration, Instant},
@ -13,11 +13,6 @@ use std::{
thread_local! {
static REACTOR: Mutex<Option<Arc<Mutex<ReactorState>>>> = Mutex::new(None);
static NOTIFIER: RwLock<Option<NotifyToken>> = RwLock::new(None);
}
pub fn notifier() -> Option<NotifyToken> {
NOTIFIER.with(|notifier| notifier.read().unwrap().as_ref().cloned())
}
#[derive(Debug)]
@ -46,7 +41,9 @@ struct ReactorState {
timers: BTreeMap<Instant, Vec<(Arc<AtomicBool>, Waker)>>,
}
struct IoWaker;
struct IoWaker {
woken: AtomicBool,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -123,22 +120,16 @@ impl Drop for Reactor {
REACTOR.with(|reactor| {
reactor.lock().unwrap().take();
});
NOTIFIER.with(|notifier| {
notifier.write().unwrap().take();
})
}
}
impl Wake for IoWaker {
fn wake(self: Arc<Self>) {
self.wake_by_ref()
self.woken.store(true, Ordering::Release);
}
fn wake_by_ref(self: &Arc<Self>) {
if let Some(notifier) = notifier() {
let _ = notifier.notify();
}
self.woken.store(true, Ordering::Release);
}
}
@ -152,12 +143,17 @@ impl Reactor {
pub fn block_on<F: Future>(&self, future: F) -> Result<F::Output, Error> {
let mut pinned = Box::pin(future);
let waker = Arc::new(IoWaker).into();
let io_waker = Arc::new(IoWaker {
woken: AtomicBool::new(true),
});
let waker = Arc::clone(&io_waker).into();
let mut context = Context::from_waker(&waker);
loop {
if let Poll::Ready(output) = pinned.as_mut().poll(&mut context) {
return Ok(output);
if io_waker.woken.swap(false, Ordering::AcqRel) {
if let Poll::Ready(output) = pinned.as_mut().poll(&mut context) {
return Ok(output);
}
}
self.state.lock().unwrap().poll()?;
@ -173,16 +169,15 @@ impl Drop for RemoveFlag {
impl ReactorState {
fn init() -> Result<Arc<Mutex<Self>>, Error> {
let this = REACTOR.with(|reactor| {
REACTOR.with(|reactor| {
let mut guard = reactor.lock().unwrap();
if guard.is_some() {
Err(Error::AlreadyInitialized)
} else {
let poller = PollManager::new()?;
let this = Arc::new(Mutex::new(Self {
now: Instant::now(),
poller,
poller: PollManager::new()?,
io: HashMap::new(),
timers: BTreeMap::new(),
}));
@ -191,15 +186,7 @@ impl ReactorState {
Ok(this)
}
})?;
let notif = this.lock().unwrap().poller.notifier();
NOTIFIER.with(|notifier| {
*notifier.write().unwrap() = Some(notif);
});
Ok(this)
})
}
fn register<A: AsFd + 'static>(&mut self, io: Arc<A>, waker: Waker, interests: Readiness) {