Compare commits

...

12 commits

6 changed files with 357 additions and 138 deletions

View file

@ -1,5 +1,5 @@
[package]
name = "safe-executor"
name = "jitterbug"
version = "0.1.0"
edition = "2021"
@ -8,5 +8,6 @@ edition = "2021"
[dependencies]
[dev-dependencies]
# foxtrot = { git = "https://git.asonix.dog/safe-async/foxtrot" }
async-io = "1.6.0"
futures-lite = "1.12.0"

View file

@ -1,4 +1,4 @@
use safe_executor::{oneshot, Executor, JoinError};
use jitterbug::{oneshot, Executor, JoinError};
use std::time::Duration;
fn main() -> Result<(), JoinError> {

View file

@ -1,24 +1,25 @@
use std::sync::Arc;
use std::task::{Wake, Waker};
use jitterbug::Executor;
use std::{
sync::Arc,
task::{Wake, Waker},
};
use safe_executor::Executor;
fn spawn(runtime: &Executor) {
fn spawn(executor: &Executor) {
println!("Spawning futures");
let task1 = runtime.spawn(async move {
let task1 = executor.spawn(async move {
println!("Henlo from first spawn");
"A"
});
let task2 = runtime.spawn(async move {
let task2 = executor.spawn(async move {
println!("Henlo from second spawn");
"B"
});
let run2m = runtime.clone();
let run2m = executor.clone();
runtime.spawn(async move {
executor.spawn(async move {
let res1 = task1.await;
let res2 = task2.await;
println!("Henlo from third spawn, {:?}, {:?}", res1, res2);
@ -41,43 +42,44 @@ impl Wake for DummyWaker {
}
fn main() {
let runtime = Executor::new();
let executor = Executor::new();
let runner = executor.clone().into_runner();
// This creates 3 new tasks
spawn(&runtime);
spawn(&executor);
let waker: Waker = Arc::new(DummyWaker).into();
while runtime.any_woken() {
while runner.any_woken() {
println!("Ticking");
runtime.tick(&waker);
runner.tick(waker.clone());
}
// This reclaims the first 3 tasks
println!("Pruning");
runtime.prune();
runner.prune();
// This creates 3 new tasks
spawn(&runtime);
while runtime.any_woken() {
spawn(&executor);
while runner.any_woken() {
println!("Ticking");
runtime.tick(&waker);
runner.tick(waker.clone());
}
// This re-uses the 3 tasks created prior
spawn(&runtime);
spawn(&executor);
// This doesn't reclaim any tasks, since we've spawned 3 more futures
println!("Pruning");
runtime.prune();
runner.prune();
while runtime.any_woken() {
while runner.any_woken() {
println!("Ticking");
runtime.tick(&waker);
runner.tick(waker.clone());
// This reclaims tasks as their futures resolve on each tick
println!("Pruning");
runtime.prune();
runner.prune();
}
println!("Hewwo Mr Obama");

View file

@ -1,8 +1,7 @@
use async_io::{Async, Timer};
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use safe_executor::Executor;
use std::net::TcpListener;
use std::time::Duration;
use jitterbug::Executor;
use std::{net::TcpListener, time::Duration};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let executor = Executor::new();
@ -10,10 +9,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
for _ in 0..4 {
let execu2r = executor.clone();
std::thread::spawn(move || {
async_io::block_on(execu2r);
async_io::block_on(execu2r.into_runner());
});
}
let runner = executor.clone().into_runner();
let execu2r = executor.clone();
executor.spawn(async move {
let listener = match Async::<TcpListener>::bind(([127, 0, 0, 1], 3456)) {
@ -37,9 +38,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
break;
}
println!("{:?}", &buffer[0..n]);
if let Ok(s) = std::str::from_utf8(&buffer[0..n]) {
let mut string = s.trim().chars().rev().collect::<String>();
println!("{:?}: {}", std::thread::current().id(), string);
string.push('\n');
if stream.write_all(&buffer[0..n]).await.is_err() {
if stream.write_all(string.as_bytes()).await.is_err() {
break;
}
} else if stream.write_all(&buffer[0..n]).await.is_err() {
break;
}
}
@ -53,6 +60,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
execu2r.stop();
});
async_io::block_on(executor);
async_io::block_on(runner);
Ok(())
}

View file

@ -1,4 +1,4 @@
use safe_executor::{oneshot, Executor, JoinError};
use jitterbug::{oneshot, Executor, JoinError};
fn main() -> Result<(), JoinError> {
let executor = Executor::new();

View file

@ -1,10 +1,12 @@
use std::{
collections::{HashMap, VecDeque},
future::Future,
marker::PhantomData,
pin::Pin,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, Weak,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock, Weak,
},
task::{Context, Poll, Wake, Waker},
thread::{Thread, ThreadId},
@ -16,6 +18,7 @@ pub use oneshot::{oneshot, Dropped, Receiver, Sender};
const PRUNE_SPAWN_COUNT: u64 = 1000;
const PRUNE_DURATION: Duration = Duration::from_secs(5);
const UNPARK_DURATION: Duration = Duration::from_millis(50);
pub struct JoinHandle<T> {
rx: Receiver<T>,
@ -51,7 +54,7 @@ struct Task {
}
impl Task {
fn poll(self: Arc<Self>, executor: Weak<Mutex<Inner>>, waker: Waker) {
fn poll(self: Arc<Self>, executor: Weak<Mutex<ThreadState>>, waker: Waker) {
self.woken.store(false, Ordering::Release);
let res = std::panic::catch_unwind(|| {
@ -84,8 +87,6 @@ impl Task {
}
}
Err(_) => {
println!("Panic in poll");
if let Some(executor) = Weak::upgrade(&executor) {
let mut inner = executor.lock().unwrap();
// This will remove the last remaining Arc<Task> for this task, dropping after poll completes
@ -106,7 +107,7 @@ impl Task {
struct SafeWaker {
task: Weak<Task>,
executor: Weak<Mutex<Inner>>,
executor: Weak<Mutex<ThreadState>>,
inner: Waker,
}
@ -117,9 +118,12 @@ impl Wake for SafeWaker {
fn wake_by_ref(self: &Arc<Self>) {
if let Some(task) = Weak::upgrade(&self.task) {
if !task.woken.swap(true, Ordering::AcqRel) {
if let Some(executor) = Weak::upgrade(&self.executor) {
let mut inner = executor.lock().unwrap();
if let Some(executor) = Weak::upgrade(&self.executor) {
let mut inner = executor.lock().unwrap();
if inner.pending.contains_key(&task.task_id)
&& !task.woken.swap(true, Ordering::AcqRel)
{
inner.wake(task);
}
}
@ -143,26 +147,65 @@ impl Wake for BlockOnWaker {
}
}
struct Inner {
task_id_counter: u64,
struct Threads {
threads: RwLock<HashMap<ThreadId, Arc<Mutex<ThreadState>>>>,
last_spawn: AtomicUsize,
}
impl Threads {
fn new() -> Self {
Threads {
threads: RwLock::new(HashMap::new()),
last_spawn: AtomicUsize::new(0),
}
}
}
struct ThreadState {
available: VecDeque<Arc<Task>>,
woken: VecDeque<Arc<Task>>,
pending: HashMap<u64, Arc<Task>>,
stopping: bool,
threads: HashMap<ThreadId, Thread>,
handle: Thread,
spawn_count: u64,
prune_time: Instant,
unpark_time: Instant,
last_steal: Option<ThreadId>,
waker: Option<Waker>,
}
impl Inner {
fn pop_available_head(&mut self) -> Option<Arc<Task>> {
self.available.pop_front()
impl ThreadState {
fn new() -> Self {
ThreadState {
available: VecDeque::new(),
woken: VecDeque::new(),
pending: HashMap::new(),
handle: std::thread::current(),
spawn_count: 0,
prune_time: Instant::now(),
unpark_time: Instant::now(),
last_steal: None,
waker: None,
}
}
fn next_task_id(&mut self) -> u64 {
let id = self.task_id_counter;
self.task_id_counter += 1;
id
fn steal_from(&mut self) -> Option<Vec<Arc<Task>>> {
let split_point = (self.woken.len() - (self.woken.len() % 2)) / 2;
if split_point > 0 {
let v = self.woken.drain(split_point..).collect::<Vec<_>>();
for task in &v {
self.pending.remove(&task.task_id);
}
return Some(v);
}
None
}
fn pop_available_head(&mut self) -> Option<Arc<Task>> {
self.available.pop_front()
}
fn heuristic_prune(&mut self) {
@ -172,6 +215,14 @@ impl Inner {
}
}
fn should_heuristic_unpark(&mut self) -> bool {
self.spawn_count % 10 == 0 || self.unpark_time + UNPARK_DURATION < Instant::now()
}
fn update_unpark(&mut self) {
self.unpark_time = Instant::now();
}
fn prune(&mut self) {
self.available = VecDeque::new();
self.prune_time = Instant::now();
@ -181,31 +232,16 @@ impl Inner {
fn wake(&mut self, task: Arc<Task>) {
task.woken.store(true, Ordering::Release);
self.woken.push_back(task);
if let Some(tid) = self.threads.keys().next() {
let tid = *tid;
// If we're waking a future, ensure at least one thread is awake to process it
if let Some(thread) = self.threads.remove(&tid) {
thread.unpark();
}
}
}
fn stop(&mut self) {
self.stopping = true;
for (_, thread) in self.threads.drain() {
thread.unpark();
}
}
fn spawn<T: Send + 'static>(
&mut self,
shared: &Arc<Inner>,
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
let task = self
.pop_available_head()
.unwrap_or_else(|| Task::allocate(self.next_task_id()));
.unwrap_or_else(|| Task::allocate(shared.next_task_id()));
self.pending.insert(task.task_id, Arc::clone(&task));
@ -216,96 +252,191 @@ impl Inner {
self.wake(task);
self.spawn_count += 1;
if self.should_heuristic_unpark() {
if let Some(thread) = shared.parked.lock().unwrap().pop_front() {
thread.unpark();
}
self.update_unpark();
}
JoinHandle { rx }
}
}
struct Inner {
task_id_counter: AtomicU64,
stopping: AtomicBool,
threads: Threads,
parked: Mutex<VecDeque<Thread>>,
}
impl Inner {
fn next_task_id(&self) -> u64 {
self.task_id_counter.fetch_add(1, Ordering::Relaxed)
}
fn stop(&self) {
self.stopping.store(true, Ordering::Release);
let read_guard = self.threads.threads.read().unwrap();
for state in read_guard.values() {
if let Some(waker) = state.lock().unwrap().waker.take() {
waker.wake();
}
}
}
}
#[derive(Clone)]
pub struct Executor {
inner: Arc<Mutex<Inner>>,
inner: Arc<Inner>,
}
impl Default for Executor {
fn default() -> Self {
Self::new()
}
// SHOULD NOT IMPLEMENT CLONE !!!!!!!
// SHOULD NOT IMPLEMENT SEND !!!!!!!
pub struct Runner {
inner: Arc<Inner>,
state: Arc<Mutex<ThreadState>>,
// force Runner to be !Send
phantom: PhantomData<Rc<()>>,
}
impl Executor {
pub fn new() -> Self {
Executor {
inner: Arc::new(Mutex::new(Inner {
task_id_counter: 0,
available: VecDeque::new(),
woken: VecDeque::new(),
pending: HashMap::new(),
stopping: false,
threads: HashMap::new(),
spawn_count: 0,
prune_time: Instant::now(),
})),
impl Drop for Runner {
fn drop(&mut self) {
let mut write_guard = self.inner.threads.threads.write().unwrap();
let mut guard = self.state.lock().unwrap();
write_guard.remove(&guard.handle.id());
if let Some(state) = write_guard.values().next() {
let mut state = state.lock().unwrap();
state.woken.extend(guard.woken.drain(..));
state.pending.extend(guard.pending.drain());
}
}
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
self.inner.lock().unwrap().spawn(future)
}
pub fn tick(&self, waker: &Waker) -> bool {
let woken = {
let mut guard = self.inner.lock().unwrap();
if guard.stopping {
return false;
}
std::mem::take(&mut guard.woken)
};
if woken.is_empty() {
impl Runner {
pub fn tick(&self, waker: Waker) -> bool {
if self.stopping() {
return false;
}
for task in woken {
task.poll(Arc::downgrade(&self.inner), waker.clone());
self.state.lock().unwrap().waker = Some(waker.clone());
let mut any_polled = false;
while let Some(task) = self.pop_front_woken() {
any_polled = true;
task.poll(Arc::downgrade(&self.state), waker.clone());
}
any_polled
}
fn pop_front_woken(&self) -> Option<Arc<Task>> {
self.state.lock().unwrap().woken.pop_front()
}
pub fn stopping(&self) -> bool {
self.inner.stopping.load(Ordering::Acquire)
}
pub fn park(&self) {
if !self.steal() {
self.inner
.parked
.lock()
.unwrap()
.push_back(std::thread::current());
std::thread::park();
}
}
fn steal(&self) -> bool {
let read_guard = self.inner.threads.threads.read().unwrap();
if read_guard.len() == 1 {
return false;
}
let (last_steal, current_tid) = {
let mut guard = self.state.lock().unwrap();
(guard.last_steal.take(), guard.handle.id())
};
let (tid, stolen) = match last_steal {
Some(id) => {
if read_guard.contains_key(&id) {
let opt = read_guard
.iter()
.chain(read_guard.iter())
.filter(|(tid, _)| **tid != current_tid)
.skip_while(|(tid, _)| **tid != id)
.nth(1);
if let Some((tid, thread_state)) = opt {
let mut state = thread_state.lock().unwrap();
if let Some(v) = state.steal_from() {
(*tid, v)
} else {
return false;
}
} else {
return false;
}
} else {
return false;
}
}
None => {
let opt = read_guard.iter().find(|(tid, _)| **tid != current_tid);
if let Some((tid, thread_state)) = opt {
let mut state = thread_state.lock().unwrap();
if let Some(v) = state.steal_from() {
(*tid, v)
} else {
return false;
}
} else {
return false;
}
}
};
let mut guard = self.state.lock().unwrap();
for task in &stolen {
guard.pending.insert(task.task_id, Arc::clone(task));
}
guard.woken.extend(stolen);
guard.last_steal = Some(tid);
true
}
pub fn stopping(&self) -> bool {
self.inner.lock().unwrap().stopping
}
pub fn stop(&self) {
self.inner.lock().unwrap().stop();
}
pub fn park(&self) {
{
let mut guard = self.inner.lock().unwrap();
let thread = std::thread::current();
guard.threads.insert(thread.id(), thread);
}
std::thread::park();
}
pub fn any_woken(&self) -> bool {
!self.inner.lock().unwrap().woken.is_empty()
!self.state.lock().unwrap().woken.is_empty()
}
pub fn prune(&self) {
self.inner.lock().unwrap().prune();
self.state.lock().unwrap().prune();
}
pub fn heuristic_prune(&self) {
self.inner.lock().unwrap().heuristic_prune();
self.state.lock().unwrap().heuristic_prune();
}
pub fn block_on<T: Send + 'static>(
fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
self.state.lock().unwrap().spawn(&self.inner, future)
}
fn block_on<T: Send + 'static>(
&self,
f: impl Future<Output = T> + Send + 'static,
) -> Result<T, JoinError> {
@ -319,7 +450,7 @@ impl Executor {
let mut block_on_context = Context::from_waker(&block_on_waker);
loop {
while self.tick(&block_on_waker) {
while self.tick(block_on_waker.clone()) {
if let Poll::Ready(res) = Pin::new(&mut join_handle).poll(&mut block_on_context) {
return res;
}
@ -340,20 +471,98 @@ impl Executor {
}
}
impl Future for Executor {
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}
impl Executor {
pub fn new() -> Self {
Executor {
inner: Arc::new(Inner {
task_id_counter: AtomicU64::new(0),
stopping: AtomicBool::new(false),
threads: Threads::new(),
parked: Mutex::new(VecDeque::new()),
}),
}
}
pub fn into_runner(self) -> Runner {
let state = ThreadState::new();
let id = state.handle.id();
let state = Arc::new(Mutex::new(state));
{
let mut guard = self.inner.threads.threads.write().unwrap();
guard.entry(id).or_insert_with(|| Arc::clone(&state));
}
Runner {
inner: self.inner,
state,
phantom: PhantomData,
}
}
pub fn stop(&self) {
self.inner.stop();
}
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> JoinHandle<T> {
let read_guard = self.inner.threads.threads.read().unwrap();
if read_guard.is_empty() {
panic!("No running runner");
}
let last_spawn = self
.inner
.threads
.last_spawn
.fetch_add(1, Ordering::Relaxed)
% read_guard.len();
let state = read_guard.values().nth(last_spawn).unwrap();
let mut guard = state.lock().unwrap();
let res = guard.spawn(&self.inner, future);
if let Some(waker) = guard.waker.as_ref() {
waker.wake_by_ref();
}
res
}
pub fn block_on<T: Send + 'static>(
&self,
f: impl Future<Output = T> + Send + 'static,
) -> Result<T, JoinError> {
self.clone().into_runner().block_on(f)
}
}
impl Future for Runner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut();
while this.tick(cx.waker()) {
// processing spawned tasks
}
loop {
while this.tick(cx.waker().clone()) {
// processing spawned tasks
}
if this.stopping() {
Poll::Ready(())
} else {
Poll::Pending
if this.stopping() {
return Poll::Ready(());
}
if !this.steal() {
return Poll::Pending;
}
}
}
}