Compare commits

...

2 commits

Author SHA1 Message Date
asonix e494e55ad7 Add example that counts up and down 2021-06-07 22:33:37 -05:00
asonix cfea159e97 Enable cleanup for exiting actors
Actors can't currently exit on their own, but that could be a feature to add later
2021-06-07 22:31:54 -05:00
3 changed files with 114 additions and 10 deletions

View file

@ -11,3 +11,8 @@ anyhow = "1"
log = "0.4"
once_cell = "1.7.2"
tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }
[dev-dependencies]
log = "0.4"
env_logger = "0.8"
tokio = { version = "1", features = ["full"] }

53
examples/basic.rs Normal file
View file

@ -0,0 +1,53 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut root = tokio_actors::root();
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
for _ in 0..4 {
let sender = root
.spawn_child(0usize, |state, _msg: (), context| {
Box::pin(async move {
log::info!(
"Hello from outer actor {}, current state: {}",
context.actor_id(),
*state
);
*state += 1;
let sender = context.spawn_child(*state, |state, _msg: (), context| {
Box::pin(async move {
log::info!(
"Hello from inner actor {}, current state: {}",
context.actor_id(),
*state
);
*state -= 1;
if *state != 0 {
context.send(()).await?;
}
Ok(())
})
});
sender.send(()).await?;
Ok(())
})
})
.await?;
for _ in 0..4 {
sender.send(()).await?;
}
}
tokio::signal::ctrl_c().await?;
root.close().await;
Ok(())
}

View file

@ -1,5 +1,6 @@
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{
@ -39,6 +40,9 @@ where
let (tx, rx) = tokio::sync::mpsc::channel(16);
let (shutdown_notifier, shutdown) = ShutdownNotifier::new();
let (close_notifier, close) = ShutdownNotifier::new();
let id = ACTOR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
let mgr = FutureManager {
state,
@ -46,7 +50,8 @@ where
rx,
function,
shutdown,
id: ACTOR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
id,
close_notifier,
};
tokio::spawn(mgr.run());
@ -54,11 +59,14 @@ where
Handle {
tx,
shutdown_notifier,
actor_id: id,
close,
}
}
#[derive(Debug)]
pub enum ActorMessage<M> {
Closed(usize),
Spawn(CloseHandle),
Msg(M),
}
@ -76,12 +84,15 @@ struct FutureManager<State, F, M> {
function: F,
shutdown: Shutdown,
id: usize,
close_notifier: ShutdownNotifier,
}
#[derive(Debug)]
pub struct Handle<M> {
tx: ActorSender<M>,
shutdown_notifier: ShutdownNotifier,
actor_id: usize,
close: Shutdown,
}
#[derive(Clone, Debug)]
@ -92,12 +103,14 @@ pub struct SendHandle<M> {
pub struct CloseHandle {
close_fut: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
shutdown_notifier: ShutdownNotifier,
actor_id: usize,
}
#[derive(Debug)]
pub struct Context<M> {
tx: ActorSender<M>,
children: Vec<CloseHandle>,
children: HashMap<usize, CloseHandle>,
actor_id: usize,
}
impl ShutdownNotifier {
@ -128,13 +141,15 @@ impl<State, F, M> FutureManager<State, F, M> {
function,
mut shutdown,
id,
mut close_notifier,
} = self;
log::info!("Actor {} starting", id);
let mut context = Context {
tx,
children: vec![],
children: HashMap::new(),
actor_id: id,
};
loop {
@ -145,7 +160,11 @@ impl<State, F, M> FutureManager<State, F, M> {
Some((function)(&mut state, msg, &mut context).await)
}
ActorMessage::Spawn(close_handle) => {
context.children.push(close_handle);
context.children.insert(close_handle.actor_id, close_handle);
Some(Ok(()))
}
ActorMessage::Closed(child_id) => {
context.children.remove(&child_id);
Some(Ok(()))
}
}
@ -174,13 +193,15 @@ impl<State, F, M> FutureManager<State, F, M> {
let futs = context
.children
.iter_mut()
.values_mut()
.map(|child| child.close())
.collect::<Vec<_>>();
for fut in futs {
fut.await;
}
close_notifier.shutdown();
}
}
@ -219,12 +240,18 @@ impl<M> Handle<M> {
N: Send + 'static,
{
let handle = spawn(state, function);
let (send_handle, close_handle) = handle.split();
let actor_id = handle.actor_id;
let (send_handle, close_handle, close) = handle.split();
let sender = send_handle.tx.clone();
tokio::spawn(async move {
close.await;
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
});
self.tx.send(ActorMessage::Spawn(close_handle)).await?;
Ok(send_handle)
}
fn split(self) -> (SendHandle<M>, CloseHandle)
fn split(self) -> (SendHandle<M>, CloseHandle, Shutdown)
where
M: Send + 'static,
{
@ -237,7 +264,9 @@ impl<M> Handle<M> {
CloseHandle {
close_fut,
shutdown_notifier: self.shutdown_notifier,
actor_id: self.actor_id,
},
self.close,
)
}
@ -273,7 +302,13 @@ impl<M> SendHandle<M> {
N: Send + 'static,
{
let handle = spawn(state, function);
let (send_handle, close_handle) = handle.split();
let actor_id = handle.actor_id;
let (send_handle, close_handle, close) = handle.split();
let sender = send_handle.tx.clone();
tokio::spawn(async move {
close.await;
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
});
self.tx.send(ActorMessage::Spawn(close_handle)).await?;
Ok(send_handle)
}
@ -315,6 +350,7 @@ impl std::fmt::Debug for CloseHandle {
f.debug_struct("CloseHandle")
.field("close_fut", &"Option<Pin<Box<dyn Future<Output = ()>>>>")
.field("shutdown_notifier", &self.shutdown_notifier)
.field("actor_id", &self.actor_id)
.finish()
}
}
@ -338,8 +374,18 @@ impl<M> Context<M> {
N: Send + 'static,
{
let handle = spawn(state, function);
let (send_handle, close_handle) = handle.split();
self.children.push(close_handle);
let actor_id = handle.actor_id;
let (send_handle, close_handle, close) = handle.split();
let sender = send_handle.tx.clone();
tokio::spawn(async move {
close.await;
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
});
self.children.insert(close_handle.actor_id, close_handle);
send_handle
}
pub fn actor_id(&self) -> usize {
self.actor_id
}
}