Compare commits
2 commits
f6ce03cdcf
...
e494e55ad7
Author | SHA1 | Date | |
---|---|---|---|
asonix | e494e55ad7 | ||
asonix | cfea159e97 |
|
@ -11,3 +11,8 @@ anyhow = "1"
|
|||
log = "0.4"
|
||||
once_cell = "1.7.2"
|
||||
tokio = { version = "1", features = ["macros", "rt", "sync", "time"] }
|
||||
|
||||
[dev-dependencies]
|
||||
log = "0.4"
|
||||
env_logger = "0.8"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
|
53
examples/basic.rs
Normal file
53
examples/basic.rs
Normal file
|
@ -0,0 +1,53 @@
|
|||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let mut root = tokio_actors::root();
|
||||
|
||||
if std::env::var("RUST_LOG").is_err() {
|
||||
std::env::set_var("RUST_LOG", "info");
|
||||
}
|
||||
env_logger::init();
|
||||
|
||||
for _ in 0..4 {
|
||||
let sender = root
|
||||
.spawn_child(0usize, |state, _msg: (), context| {
|
||||
Box::pin(async move {
|
||||
log::info!(
|
||||
"Hello from outer actor {}, current state: {}",
|
||||
context.actor_id(),
|
||||
*state
|
||||
);
|
||||
*state += 1;
|
||||
|
||||
let sender = context.spawn_child(*state, |state, _msg: (), context| {
|
||||
Box::pin(async move {
|
||||
log::info!(
|
||||
"Hello from inner actor {}, current state: {}",
|
||||
context.actor_id(),
|
||||
*state
|
||||
);
|
||||
*state -= 1;
|
||||
|
||||
if *state != 0 {
|
||||
context.send(()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
|
||||
sender.send(()).await?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
for _ in 0..4 {
|
||||
sender.send(()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::signal::ctrl_c().await?;
|
||||
root.close().await;
|
||||
|
||||
Ok(())
|
||||
}
|
66
src/lib.rs
66
src/lib.rs
|
@ -1,5 +1,6 @@
|
|||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
|
@ -39,6 +40,9 @@ where
|
|||
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
let (shutdown_notifier, shutdown) = ShutdownNotifier::new();
|
||||
let (close_notifier, close) = ShutdownNotifier::new();
|
||||
|
||||
let id = ACTOR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let mgr = FutureManager {
|
||||
state,
|
||||
|
@ -46,7 +50,8 @@ where
|
|||
rx,
|
||||
function,
|
||||
shutdown,
|
||||
id: ACTOR_ID_GENERATOR.fetch_add(1, Ordering::Relaxed),
|
||||
id,
|
||||
close_notifier,
|
||||
};
|
||||
|
||||
tokio::spawn(mgr.run());
|
||||
|
@ -54,11 +59,14 @@ where
|
|||
Handle {
|
||||
tx,
|
||||
shutdown_notifier,
|
||||
actor_id: id,
|
||||
close,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ActorMessage<M> {
|
||||
Closed(usize),
|
||||
Spawn(CloseHandle),
|
||||
Msg(M),
|
||||
}
|
||||
|
@ -76,12 +84,15 @@ struct FutureManager<State, F, M> {
|
|||
function: F,
|
||||
shutdown: Shutdown,
|
||||
id: usize,
|
||||
close_notifier: ShutdownNotifier,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Handle<M> {
|
||||
tx: ActorSender<M>,
|
||||
shutdown_notifier: ShutdownNotifier,
|
||||
actor_id: usize,
|
||||
close: Shutdown,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -92,12 +103,14 @@ pub struct SendHandle<M> {
|
|||
pub struct CloseHandle {
|
||||
close_fut: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
|
||||
shutdown_notifier: ShutdownNotifier,
|
||||
actor_id: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Context<M> {
|
||||
tx: ActorSender<M>,
|
||||
children: Vec<CloseHandle>,
|
||||
children: HashMap<usize, CloseHandle>,
|
||||
actor_id: usize,
|
||||
}
|
||||
|
||||
impl ShutdownNotifier {
|
||||
|
@ -128,13 +141,15 @@ impl<State, F, M> FutureManager<State, F, M> {
|
|||
function,
|
||||
mut shutdown,
|
||||
id,
|
||||
mut close_notifier,
|
||||
} = self;
|
||||
|
||||
log::info!("Actor {} starting", id);
|
||||
|
||||
let mut context = Context {
|
||||
tx,
|
||||
children: vec![],
|
||||
children: HashMap::new(),
|
||||
actor_id: id,
|
||||
};
|
||||
|
||||
loop {
|
||||
|
@ -145,7 +160,11 @@ impl<State, F, M> FutureManager<State, F, M> {
|
|||
Some((function)(&mut state, msg, &mut context).await)
|
||||
}
|
||||
ActorMessage::Spawn(close_handle) => {
|
||||
context.children.push(close_handle);
|
||||
context.children.insert(close_handle.actor_id, close_handle);
|
||||
Some(Ok(()))
|
||||
}
|
||||
ActorMessage::Closed(child_id) => {
|
||||
context.children.remove(&child_id);
|
||||
Some(Ok(()))
|
||||
}
|
||||
}
|
||||
|
@ -174,13 +193,15 @@ impl<State, F, M> FutureManager<State, F, M> {
|
|||
|
||||
let futs = context
|
||||
.children
|
||||
.iter_mut()
|
||||
.values_mut()
|
||||
.map(|child| child.close())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for fut in futs {
|
||||
fut.await;
|
||||
}
|
||||
|
||||
close_notifier.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -219,12 +240,18 @@ impl<M> Handle<M> {
|
|||
N: Send + 'static,
|
||||
{
|
||||
let handle = spawn(state, function);
|
||||
let (send_handle, close_handle) = handle.split();
|
||||
let actor_id = handle.actor_id;
|
||||
let (send_handle, close_handle, close) = handle.split();
|
||||
let sender = send_handle.tx.clone();
|
||||
tokio::spawn(async move {
|
||||
close.await;
|
||||
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
|
||||
});
|
||||
self.tx.send(ActorMessage::Spawn(close_handle)).await?;
|
||||
Ok(send_handle)
|
||||
}
|
||||
|
||||
fn split(self) -> (SendHandle<M>, CloseHandle)
|
||||
fn split(self) -> (SendHandle<M>, CloseHandle, Shutdown)
|
||||
where
|
||||
M: Send + 'static,
|
||||
{
|
||||
|
@ -237,7 +264,9 @@ impl<M> Handle<M> {
|
|||
CloseHandle {
|
||||
close_fut,
|
||||
shutdown_notifier: self.shutdown_notifier,
|
||||
actor_id: self.actor_id,
|
||||
},
|
||||
self.close,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -273,7 +302,13 @@ impl<M> SendHandle<M> {
|
|||
N: Send + 'static,
|
||||
{
|
||||
let handle = spawn(state, function);
|
||||
let (send_handle, close_handle) = handle.split();
|
||||
let actor_id = handle.actor_id;
|
||||
let (send_handle, close_handle, close) = handle.split();
|
||||
let sender = send_handle.tx.clone();
|
||||
tokio::spawn(async move {
|
||||
close.await;
|
||||
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
|
||||
});
|
||||
self.tx.send(ActorMessage::Spawn(close_handle)).await?;
|
||||
Ok(send_handle)
|
||||
}
|
||||
|
@ -315,6 +350,7 @@ impl std::fmt::Debug for CloseHandle {
|
|||
f.debug_struct("CloseHandle")
|
||||
.field("close_fut", &"Option<Pin<Box<dyn Future<Output = ()>>>>")
|
||||
.field("shutdown_notifier", &self.shutdown_notifier)
|
||||
.field("actor_id", &self.actor_id)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
@ -338,8 +374,18 @@ impl<M> Context<M> {
|
|||
N: Send + 'static,
|
||||
{
|
||||
let handle = spawn(state, function);
|
||||
let (send_handle, close_handle) = handle.split();
|
||||
self.children.push(close_handle);
|
||||
let actor_id = handle.actor_id;
|
||||
let (send_handle, close_handle, close) = handle.split();
|
||||
let sender = send_handle.tx.clone();
|
||||
tokio::spawn(async move {
|
||||
close.await;
|
||||
let _ = sender.send(ActorMessage::Closed(actor_id)).await;
|
||||
});
|
||||
self.children.insert(close_handle.actor_id, close_handle);
|
||||
send_handle
|
||||
}
|
||||
|
||||
pub fn actor_id(&self) -> usize {
|
||||
self.actor_id
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue