Compare commits
12 commits
a1464e2f6f
...
72809cf0da
Author | SHA1 | Date | |
---|---|---|---|
Aode (lion) | 72809cf0da | ||
Aode (lion) | 21e79db6d6 | ||
Aode (lion) | 2f032f9e13 | ||
Aode (Lion) | b1d60cc8cd | ||
Aode (Lion) | 72c50bcaf6 | ||
Aode (Lion) | 4c3edb92cc | ||
Aode (Lion) | 5ba4dab3cf | ||
Aode (Lion) | 9d04b5e700 | ||
Aode (Lion) | fb209c7ff9 | ||
Aode (Lion) | 5b745553b0 | ||
Aode (Lion) | 0bc01de384 | ||
Aode (Lion) | c2600abd06 |
|
@ -1,5 +1,5 @@
|
|||
[package]
|
||||
name = "safe-executor"
|
||||
name = "jitterbug"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
|
@ -8,5 +8,6 @@ edition = "2021"
|
|||
[dependencies]
|
||||
|
||||
[dev-dependencies]
|
||||
# foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
|
||||
async-io = "1.6.0"
|
||||
futures-lite = "1.12.0"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use safe_executor::{oneshot, Executor, JoinError};
|
||||
use jitterbug::{oneshot, Executor, JoinError};
|
||||
use std::time::Duration;
|
||||
|
||||
fn main() -> Result<(), JoinError> {
|
||||
|
|
|
@ -1,24 +1,25 @@
|
|||
use std::sync::Arc;
|
||||
use std::task::{Wake, Waker};
|
||||
use jitterbug::Executor;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
task::{Wake, Waker},
|
||||
};
|
||||
|
||||
use safe_executor::Executor;
|
||||
|
||||
fn spawn(runtime: &Executor) {
|
||||
fn spawn(executor: &Executor) {
|
||||
println!("Spawning futures");
|
||||
|
||||
let task1 = runtime.spawn(async move {
|
||||
let task1 = executor.spawn(async move {
|
||||
println!("Henlo from first spawn");
|
||||
"A"
|
||||
});
|
||||
|
||||
let task2 = runtime.spawn(async move {
|
||||
let task2 = executor.spawn(async move {
|
||||
println!("Henlo from second spawn");
|
||||
"B"
|
||||
});
|
||||
|
||||
let run2m = runtime.clone();
|
||||
let run2m = executor.clone();
|
||||
|
||||
runtime.spawn(async move {
|
||||
executor.spawn(async move {
|
||||
let res1 = task1.await;
|
||||
let res2 = task2.await;
|
||||
println!("Henlo from third spawn, {:?}, {:?}", res1, res2);
|
||||
|
@ -41,43 +42,44 @@ impl Wake for DummyWaker {
|
|||
}
|
||||
|
||||
fn main() {
|
||||
let runtime = Executor::new();
|
||||
let executor = Executor::new();
|
||||
let runner = executor.clone().into_runner();
|
||||
|
||||
// This creates 3 new tasks
|
||||
spawn(&runtime);
|
||||
spawn(&executor);
|
||||
|
||||
let waker: Waker = Arc::new(DummyWaker).into();
|
||||
|
||||
while runtime.any_woken() {
|
||||
while runner.any_woken() {
|
||||
println!("Ticking");
|
||||
runtime.tick(&waker);
|
||||
runner.tick(waker.clone());
|
||||
}
|
||||
|
||||
// This reclaims the first 3 tasks
|
||||
println!("Pruning");
|
||||
runtime.prune();
|
||||
runner.prune();
|
||||
|
||||
// This creates 3 new tasks
|
||||
spawn(&runtime);
|
||||
while runtime.any_woken() {
|
||||
spawn(&executor);
|
||||
while runner.any_woken() {
|
||||
println!("Ticking");
|
||||
runtime.tick(&waker);
|
||||
runner.tick(waker.clone());
|
||||
}
|
||||
|
||||
// This re-uses the 3 tasks created prior
|
||||
spawn(&runtime);
|
||||
spawn(&executor);
|
||||
|
||||
// This doesn't reclaim any tasks, since we've spawned 3 more futures
|
||||
println!("Pruning");
|
||||
runtime.prune();
|
||||
runner.prune();
|
||||
|
||||
while runtime.any_woken() {
|
||||
while runner.any_woken() {
|
||||
println!("Ticking");
|
||||
runtime.tick(&waker);
|
||||
runner.tick(waker.clone());
|
||||
|
||||
// This reclaims tasks as their futures resolve on each tick
|
||||
println!("Pruning");
|
||||
runtime.prune();
|
||||
runner.prune();
|
||||
}
|
||||
|
||||
println!("Hewwo Mr Obama");
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
use async_io::{Async, Timer};
|
||||
use futures_lite::{AsyncReadExt, AsyncWriteExt};
|
||||
use safe_executor::Executor;
|
||||
use std::net::TcpListener;
|
||||
use std::time::Duration;
|
||||
use jitterbug::Executor;
|
||||
use std::{net::TcpListener, time::Duration};
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let executor = Executor::new();
|
||||
|
@ -10,10 +9,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
for _ in 0..4 {
|
||||
let execu2r = executor.clone();
|
||||
std::thread::spawn(move || {
|
||||
async_io::block_on(execu2r);
|
||||
async_io::block_on(execu2r.into_runner());
|
||||
});
|
||||
}
|
||||
|
||||
let runner = executor.clone().into_runner();
|
||||
|
||||
let execu2r = executor.clone();
|
||||
executor.spawn(async move {
|
||||
let listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)) {
|
||||
|
@ -37,9 +38,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
break;
|
||||
}
|
||||
|
||||
println!("{:?}", &buffer[0..n]);
|
||||
if let Ok(s) = std::str::from_utf8(&buffer[0..n]) {
|
||||
let mut string = s.trim().chars().rev().collect::<String>();
|
||||
println!("{:?}: {}", std::thread::current().id(), string);
|
||||
string.push('\n');
|
||||
|
||||
if stream.write_all(&buffer[0..n]).await.is_err() {
|
||||
if stream.write_all(string.as_bytes()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
} else if stream.write_all(&buffer[0..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -53,6 +60,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
execu2r.stop();
|
||||
});
|
||||
|
||||
async_io::block_on(executor);
|
||||
async_io::block_on(runner);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use safe_executor::{oneshot, Executor, JoinError};
|
||||
use jitterbug::{oneshot, Executor, JoinError};
|
||||
|
||||
fn main() -> Result<(), JoinError> {
|
||||
let executor = Executor::new();
|
||||
|
|
421
src/lib.rs
421
src/lib.rs
|
@ -1,10 +1,12 @@
|
|||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex, Weak,
|
||||
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, Mutex, RwLock, Weak,
|
||||
},
|
||||
task::{Context, Poll, Wake, Waker},
|
||||
thread::{Thread, ThreadId},
|
||||
|
@ -16,6 +18,7 @@ pub use oneshot::{oneshot, Dropped, Receiver, Sender};
|
|||
|
||||
const PRUNE_SPAWN_COUNT: u64 = 1000;
|
||||
const PRUNE_DURATION: Duration = Duration::from_secs(5);
|
||||
const UNPARK_DURATION: Duration = Duration::from_millis(50);
|
||||
|
||||
pub struct JoinHandle<T> {
|
||||
rx: Receiver<T>,
|
||||
|
@ -51,7 +54,7 @@ struct Task {
|
|||
}
|
||||
|
||||
impl Task {
|
||||
fn poll(self: Arc<Self>, executor: Weak<Mutex<Inner>>, waker: Waker) {
|
||||
fn poll(self: Arc<Self>, executor: Weak<Mutex<ThreadState>>, waker: Waker) {
|
||||
self.woken.store(false, Ordering::Release);
|
||||
|
||||
let res = std::panic::catch_unwind(|| {
|
||||
|
@ -84,8 +87,6 @@ impl Task {
|
|||
}
|
||||
}
|
||||
Err(_) => {
|
||||
println!("Panic in poll");
|
||||
|
||||
if let Some(executor) = Weak::upgrade(&executor) {
|
||||
let mut inner = executor.lock().unwrap();
|
||||
// This will remove the last remaining Arc<Task> for this task, dropping after poll completes
|
||||
|
@ -106,7 +107,7 @@ impl Task {
|
|||
|
||||
struct SafeWaker {
|
||||
task: Weak<Task>,
|
||||
executor: Weak<Mutex<Inner>>,
|
||||
executor: Weak<Mutex<ThreadState>>,
|
||||
inner: Waker,
|
||||
}
|
||||
|
||||
|
@ -117,9 +118,12 @@ impl Wake for SafeWaker {
|
|||
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
if let Some(task) = Weak::upgrade(&self.task) {
|
||||
if !task.woken.swap(true, Ordering::AcqRel) {
|
||||
if let Some(executor) = Weak::upgrade(&self.executor) {
|
||||
let mut inner = executor.lock().unwrap();
|
||||
if let Some(executor) = Weak::upgrade(&self.executor) {
|
||||
let mut inner = executor.lock().unwrap();
|
||||
|
||||
if inner.pending.contains_key(&task.task_id)
|
||||
&& !task.woken.swap(true, Ordering::AcqRel)
|
||||
{
|
||||
inner.wake(task);
|
||||
}
|
||||
}
|
||||
|
@ -143,26 +147,65 @@ impl Wake for BlockOnWaker {
|
|||
}
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
task_id_counter: u64,
|
||||
struct Threads {
|
||||
threads: RwLock<HashMap<ThreadId, Arc<Mutex<ThreadState>>>>,
|
||||
last_spawn: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Threads {
|
||||
fn new() -> Self {
|
||||
Threads {
|
||||
threads: RwLock::new(HashMap::new()),
|
||||
last_spawn: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ThreadState {
|
||||
available: VecDeque<Arc<Task>>,
|
||||
woken: VecDeque<Arc<Task>>,
|
||||
pending: HashMap<u64, Arc<Task>>,
|
||||
stopping: bool,
|
||||
threads: HashMap<ThreadId, Thread>,
|
||||
handle: Thread,
|
||||
spawn_count: u64,
|
||||
prune_time: Instant,
|
||||
unpark_time: Instant,
|
||||
last_steal: Option<ThreadId>,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn pop_available_head(&mut self) -> Option<Arc<Task>> {
|
||||
self.available.pop_front()
|
||||
impl ThreadState {
|
||||
fn new() -> Self {
|
||||
ThreadState {
|
||||
available: VecDeque::new(),
|
||||
woken: VecDeque::new(),
|
||||
pending: HashMap::new(),
|
||||
handle: std::thread::current(),
|
||||
spawn_count: 0,
|
||||
prune_time: Instant::now(),
|
||||
unpark_time: Instant::now(),
|
||||
last_steal: None,
|
||||
waker: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn next_task_id(&mut self) -> u64 {
|
||||
let id = self.task_id_counter;
|
||||
self.task_id_counter += 1;
|
||||
id
|
||||
fn steal_from(&mut self) -> Option<Vec<Arc<Task>>> {
|
||||
let split_point = (self.woken.len() - (self.woken.len() % 2)) / 2;
|
||||
|
||||
if split_point > 0 {
|
||||
let v = self.woken.drain(split_point..).collect::<Vec<_>>();
|
||||
|
||||
for task in &v {
|
||||
self.pending.remove(&task.task_id);
|
||||
}
|
||||
|
||||
return Some(v);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn pop_available_head(&mut self) -> Option<Arc<Task>> {
|
||||
self.available.pop_front()
|
||||
}
|
||||
|
||||
fn heuristic_prune(&mut self) {
|
||||
|
@ -172,6 +215,14 @@ impl Inner {
|
|||
}
|
||||
}
|
||||
|
||||
fn should_heuristic_unpark(&mut self) -> bool {
|
||||
self.spawn_count % 10 == 0 || self.unpark_time + UNPARK_DURATION < Instant::now()
|
||||
}
|
||||
|
||||
fn update_unpark(&mut self) {
|
||||
self.unpark_time = Instant::now();
|
||||
}
|
||||
|
||||
fn prune(&mut self) {
|
||||
self.available = VecDeque::new();
|
||||
self.prune_time = Instant::now();
|
||||
|
@ -181,31 +232,16 @@ impl Inner {
|
|||
fn wake(&mut self, task: Arc<Task>) {
|
||||
task.woken.store(true, Ordering::Release);
|
||||
self.woken.push_back(task);
|
||||
|
||||
if let Some(tid) = self.threads.keys().next() {
|
||||
let tid = *tid;
|
||||
|
||||
// If we're waking a future, ensure at least one thread is awake to process it
|
||||
if let Some(thread) = self.threads.remove(&tid) {
|
||||
thread.unpark();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&mut self) {
|
||||
self.stopping = true;
|
||||
for (_, thread) in self.threads.drain() {
|
||||
thread.unpark();
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn<T: Send + 'static>(
|
||||
&mut self,
|
||||
shared: &Arc<Inner>,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
let task = self
|
||||
.pop_available_head()
|
||||
.unwrap_or_else(|| Task::allocate(self.next_task_id()));
|
||||
.unwrap_or_else(|| Task::allocate(shared.next_task_id()));
|
||||
|
||||
self.pending.insert(task.task_id, Arc::clone(&task));
|
||||
|
||||
|
@ -216,96 +252,191 @@ impl Inner {
|
|||
self.wake(task);
|
||||
self.spawn_count += 1;
|
||||
|
||||
if self.should_heuristic_unpark() {
|
||||
if let Some(thread) = shared.parked.lock().unwrap().pop_front() {
|
||||
thread.unpark();
|
||||
}
|
||||
self.update_unpark();
|
||||
}
|
||||
|
||||
JoinHandle { rx }
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
task_id_counter: AtomicU64,
|
||||
stopping: AtomicBool,
|
||||
threads: Threads,
|
||||
parked: Mutex<VecDeque<Thread>>,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn next_task_id(&self) -> u64 {
|
||||
self.task_id_counter.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
self.stopping.store(true, Ordering::Release);
|
||||
let read_guard = self.threads.threads.read().unwrap();
|
||||
|
||||
for state in read_guard.values() {
|
||||
if let Some(waker) = state.lock().unwrap().waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Executor {
|
||||
inner: Arc<Mutex<Inner>>,
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
impl Default for Executor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
// SHOULD NOT IMPLEMENT CLONE !!!!!!!
|
||||
// SHOULD NOT IMPLEMENT SEND !!!!!!!
|
||||
pub struct Runner {
|
||||
inner: Arc<Inner>,
|
||||
state: Arc<Mutex<ThreadState>>,
|
||||
|
||||
// force Runner to be !Send
|
||||
phantom: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub fn new() -> Self {
|
||||
Executor {
|
||||
inner: Arc::new(Mutex::new(Inner {
|
||||
task_id_counter: 0,
|
||||
available: VecDeque::new(),
|
||||
woken: VecDeque::new(),
|
||||
pending: HashMap::new(),
|
||||
stopping: false,
|
||||
threads: HashMap::new(),
|
||||
spawn_count: 0,
|
||||
prune_time: Instant::now(),
|
||||
})),
|
||||
impl Drop for Runner {
|
||||
fn drop(&mut self) {
|
||||
let mut write_guard = self.inner.threads.threads.write().unwrap();
|
||||
let mut guard = self.state.lock().unwrap();
|
||||
|
||||
write_guard.remove(&guard.handle.id());
|
||||
|
||||
if let Some(state) = write_guard.values().next() {
|
||||
let mut state = state.lock().unwrap();
|
||||
state.woken.extend(guard.woken.drain(..));
|
||||
state.pending.extend(guard.pending.drain());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
self.inner.lock().unwrap().spawn(future)
|
||||
}
|
||||
|
||||
pub fn tick(&self, waker: &Waker) -> bool {
|
||||
let woken = {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
|
||||
if guard.stopping {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::mem::take(&mut guard.woken)
|
||||
};
|
||||
|
||||
if woken.is_empty() {
|
||||
impl Runner {
|
||||
pub fn tick(&self, waker: Waker) -> bool {
|
||||
if self.stopping() {
|
||||
return false;
|
||||
}
|
||||
|
||||
for task in woken {
|
||||
task.poll(Arc::downgrade(&self.inner), waker.clone());
|
||||
self.state.lock().unwrap().waker = Some(waker.clone());
|
||||
|
||||
let mut any_polled = false;
|
||||
|
||||
while let Some(task) = self.pop_front_woken() {
|
||||
any_polled = true;
|
||||
task.poll(Arc::downgrade(&self.state), waker.clone());
|
||||
}
|
||||
|
||||
any_polled
|
||||
}
|
||||
|
||||
fn pop_front_woken(&self) -> Option<Arc<Task>> {
|
||||
self.state.lock().unwrap().woken.pop_front()
|
||||
}
|
||||
|
||||
pub fn stopping(&self) -> bool {
|
||||
self.inner.stopping.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn park(&self) {
|
||||
if !self.steal() {
|
||||
self.inner
|
||||
.parked
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back(std::thread::current());
|
||||
|
||||
std::thread::park();
|
||||
}
|
||||
}
|
||||
|
||||
fn steal(&self) -> bool {
|
||||
let read_guard = self.inner.threads.threads.read().unwrap();
|
||||
if read_guard.len() == 1 {
|
||||
return false;
|
||||
}
|
||||
|
||||
let (last_steal, current_tid) = {
|
||||
let mut guard = self.state.lock().unwrap();
|
||||
(guard.last_steal.take(), guard.handle.id())
|
||||
};
|
||||
|
||||
let (tid, stolen) = match last_steal {
|
||||
Some(id) => {
|
||||
if read_guard.contains_key(&id) {
|
||||
let opt = read_guard
|
||||
.iter()
|
||||
.chain(read_guard.iter())
|
||||
.filter(|(tid, _)| **tid != current_tid)
|
||||
.skip_while(|(tid, _)| **tid != id)
|
||||
.nth(1);
|
||||
|
||||
if let Some((tid, thread_state)) = opt {
|
||||
let mut state = thread_state.lock().unwrap();
|
||||
|
||||
if let Some(v) = state.steal_from() {
|
||||
(*tid, v)
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let opt = read_guard.iter().find(|(tid, _)| **tid != current_tid);
|
||||
|
||||
if let Some((tid, thread_state)) = opt {
|
||||
let mut state = thread_state.lock().unwrap();
|
||||
if let Some(v) = state.steal_from() {
|
||||
(*tid, v)
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut guard = self.state.lock().unwrap();
|
||||
for task in &stolen {
|
||||
guard.pending.insert(task.task_id, Arc::clone(task));
|
||||
}
|
||||
guard.woken.extend(stolen);
|
||||
guard.last_steal = Some(tid);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub fn stopping(&self) -> bool {
|
||||
self.inner.lock().unwrap().stopping
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.inner.lock().unwrap().stop();
|
||||
}
|
||||
|
||||
pub fn park(&self) {
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let thread = std::thread::current();
|
||||
guard.threads.insert(thread.id(), thread);
|
||||
}
|
||||
std::thread::park();
|
||||
}
|
||||
|
||||
pub fn any_woken(&self) -> bool {
|
||||
!self.inner.lock().unwrap().woken.is_empty()
|
||||
!self.state.lock().unwrap().woken.is_empty()
|
||||
}
|
||||
|
||||
pub fn prune(&self) {
|
||||
self.inner.lock().unwrap().prune();
|
||||
self.state.lock().unwrap().prune();
|
||||
}
|
||||
|
||||
pub fn heuristic_prune(&self) {
|
||||
self.inner.lock().unwrap().heuristic_prune();
|
||||
self.state.lock().unwrap().heuristic_prune();
|
||||
}
|
||||
|
||||
pub fn block_on<T: Send + 'static>(
|
||||
fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
self.state.lock().unwrap().spawn(&self.inner, future)
|
||||
}
|
||||
|
||||
fn block_on<T: Send + 'static>(
|
||||
&self,
|
||||
f: impl Future<Output = T> + Send + 'static,
|
||||
) -> Result<T, JoinError> {
|
||||
|
@ -319,7 +450,7 @@ impl Executor {
|
|||
let mut block_on_context = Context::from_waker(&block_on_waker);
|
||||
|
||||
loop {
|
||||
while self.tick(&block_on_waker) {
|
||||
while self.tick(block_on_waker.clone()) {
|
||||
if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) {
|
||||
return res;
|
||||
}
|
||||
|
@ -340,20 +471,98 @@ impl Executor {
|
|||
}
|
||||
}
|
||||
|
||||
impl Future for Executor {
|
||||
impl Default for Executor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub fn new() -> Self {
|
||||
Executor {
|
||||
inner: Arc::new(Inner {
|
||||
task_id_counter: AtomicU64::new(0),
|
||||
stopping: AtomicBool::new(false),
|
||||
threads: Threads::new(),
|
||||
parked: Mutex::new(VecDeque::new()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_runner(self) -> Runner {
|
||||
let state = ThreadState::new();
|
||||
let id = state.handle.id();
|
||||
|
||||
let state = Arc::new(Mutex::new(state));
|
||||
|
||||
{
|
||||
let mut guard = self.inner.threads.threads.write().unwrap();
|
||||
guard.entry(id).or_insert_with(|| Arc::clone(&state));
|
||||
}
|
||||
|
||||
Runner {
|
||||
inner: self.inner,
|
||||
state,
|
||||
phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.inner.stop();
|
||||
}
|
||||
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> JoinHandle<T> {
|
||||
let read_guard = self.inner.threads.threads.read().unwrap();
|
||||
if read_guard.is_empty() {
|
||||
panic!("No running runner");
|
||||
}
|
||||
|
||||
let last_spawn = self
|
||||
.inner
|
||||
.threads
|
||||
.last_spawn
|
||||
.fetch_add(1, Ordering::Relaxed)
|
||||
% read_guard.len();
|
||||
|
||||
let state = read_guard.values().nth(last_spawn).unwrap();
|
||||
let mut guard = state.lock().unwrap();
|
||||
let res = guard.spawn(&self.inner, future);
|
||||
if let Some(waker) = guard.waker.as_ref() {
|
||||
waker.wake_by_ref();
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub fn block_on<T: Send + 'static>(
|
||||
&self,
|
||||
f: impl Future<Output = T> + Send + 'static,
|
||||
) -> Result<T, JoinError> {
|
||||
self.clone().into_runner().block_on(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Runner {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut();
|
||||
|
||||
while this.tick(cx.waker()) {
|
||||
// processing spawned tasks
|
||||
}
|
||||
loop {
|
||||
while this.tick(cx.waker().clone()) {
|
||||
// processing spawned tasks
|
||||
}
|
||||
|
||||
if this.stopping() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
if this.stopping() {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
if !this.steal() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue