Make use of start and stop hooks

This commit is contained in:
Aode (Lion) 2021-10-02 23:00:34 -05:00
parent a5dadd584f
commit b72b8e0b9f
8 changed files with 175 additions and 132 deletions

59
Cargo.lock generated
View file

@ -41,12 +41,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
[[package]]
name = "async-broadcast"
version = "0.3.4"
@ -632,26 +626,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@ -944,7 +918,6 @@ dependencies = [
name = "streamdeck-dbus"
version = "0.1.0"
dependencies = [
"anyhow",
"streamdeck-commands",
"streamdeck-handshake",
"tokio",
@ -952,9 +925,7 @@ dependencies = [
"tokio-serial",
"tracing",
"tracing-error",
"tracing-futures",
"tracing-subscriber",
"uuid",
"zbus",
]
@ -972,9 +943,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.77"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0"
checksum = "a4eac2e6c19f5c3abc0c229bea31ff0b9b091c7b14990e8924b92902a303a0c0"
dependencies = [
"proc-macro2",
"quote",
@ -1033,12 +1004,12 @@ dependencies = [
[[package]]
name = "tokio-actors"
version = "0.1.0"
source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#5d7500f4c201a564c67e36b5ce3d4cd41b74e3c4"
source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#5d33b8af249f4b9bd9fc3cae05355a5d48819328"
dependencies = [
"anyhow",
"log",
"once_cell",
"tokio",
"tracing",
"tracing-error",
]
[[package]]
@ -1116,16 +1087,6 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
@ -1175,16 +1136,6 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom",
"serde",
]
[[package]]
name = "version_check"
version = "0.9.3"

View file

@ -6,7 +6,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
streamdeck-commands = { version = "0.1.0", git = "https://git.asonix.dog/asonix/streamdeck-commands", branch = "main" }
streamdeck-handshake = { version = "0.1.0", git = "https://git.asonix.dog/asonix/streamdeck-handshake", branch = "main" }
tokio = { version = "1", features = ["full"] }
@ -14,7 +13,5 @@ tokio-serial = "5.4.1"
tokio-actors = { version = "0.1", git = "https://git.asonix.dog/asonix/tokio-actors", branch = "main" }
tracing = "0.1.28"
tracing-error = "0.1.2"
tracing-futures = "0.2.5"
tracing-subscriber = { version = "0.2.24", features = ["fmt"] }
uuid = { version = "0.8.2", features = ["serde", "v4"]}
zbus = "2.0.0-beta.7"

View file

@ -5,7 +5,7 @@ mod manager;
use manager::Manager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() -> tokio_actors::Result<()> {
init_tracing()?;
let mut root = tokio_actors::root();
@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
fn init_tracing() -> anyhow::Result<()> {
fn init_tracing() -> tokio_actors::Result<()> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let formatter = tracing_subscriber::fmt::layer()

View file

@ -4,7 +4,8 @@ use std::{
time::Duration,
};
use tokio::sync::RwLock;
use tokio_actors::{BoxFuture, Context, Handle, SendHandle};
use tokio_actors::{Actor, BoxFuture, Context, Handle, SendHandle};
use tracing::{Instrument, Span};
use zbus::SignalContext;
mod connector;
@ -33,6 +34,7 @@ struct ConnectedDeck {
struct Inner {
pending_ports: HashSet<String>,
connected_decks: HashMap<String, ConnectedDeck>,
children: HashMap<usize, String>,
ignored_ports: HashSet<String>,
}
@ -69,7 +71,7 @@ impl Manager {
}
impl Manager {
pub(super) async fn start(root_handle: &mut Handle<()>) -> anyhow::Result<()> {
pub(super) async fn start(root_handle: &mut Handle<()>) -> tokio_actors::Result<()> {
let connection = zbus::Connection::session().await?;
connection.request_name("dog.asonix.DeckManager").await?;
@ -83,7 +85,11 @@ impl Manager {
.await
.at(manager.object_name(), manager.clone())?;
let manager = root_handle.spawn_child(manager, Manager::turn).await?;
let manager = root_handle
.spawn_child_with_hooks(
Actor::new(manager, Manager::turn).on_remove(Manager::on_remove),
)
.await?;
let searcher = Searcher::new(manager);
let searcher = root_handle.spawn_child(searcher, Searcher::turn).await?;
searcher.every(Duration::from_secs(1), || ());
@ -98,12 +104,35 @@ impl Manager {
msg: ManagerMessage,
ctx: &'a mut Context<ManagerMessage>,
) -> BoxFuture<'a> {
match msg {
ManagerMessage::Found(port_name) => Box::pin(self.found(port_name, ctx)),
ManagerMessage::Removed(port_name) => Box::pin(self.removed(port_name)),
ManagerMessage::Connected(deck) => Box::pin(self.connected(deck, ctx)),
ManagerMessage::Ignored(port_name) => Box::pin(self.ignored(port_name)),
}
Box::pin(
async move {
match msg {
ManagerMessage::Found(port_name) => self.found(port_name, ctx).await,
ManagerMessage::Removed(port_name) => self.removed(port_name).await,
ManagerMessage::Connected(deck) => self.connected(deck, ctx).await,
ManagerMessage::Ignored(port_name) => self.ignored(port_name).await,
}
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(_ctx))]
fn on_remove<'a>(
&'a mut self,
child_id: usize,
_ctx: &'a mut Context<ManagerMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
let mut inner = self.inner.write().await;
if let Some(port_name) = inner.children.remove(&child_id) {
inner.connected_decks.remove(&port_name);
}
Ok(())
}
.instrument(Span::current()),
)
}
fn object_name(&self) -> String {
@ -111,7 +140,7 @@ impl Manager {
}
#[tracing::instrument]
async fn ignored(&mut self, port_name: String) -> anyhow::Result<()> {
async fn ignored(&mut self, port_name: String) -> tokio_actors::Result<()> {
tracing::debug!("ignored");
self.inner.write().await.ignored_ports.insert(port_name);
Ok(())
@ -122,7 +151,7 @@ impl Manager {
&mut self,
deck: Deck,
ctx: &'_ mut Context<ManagerMessage>,
) -> anyhow::Result<()> {
) -> tokio_actors::Result<()> {
tracing::debug!("connected");
let was_pending = self
@ -138,19 +167,27 @@ impl Manager {
let port_name = deck.port_name().to_string();
let object_name = deck.object_name();
let deck_handle = ctx.spawn_child(deck, Deck::turn);
deck_handle.send(DeckMessage::Start).await?;
let deck_handle = ctx.spawn_child_with_hooks(
Actor::new(deck, Deck::turn)
.on_remove(Deck::on_remove)
.on_start(Deck::on_start)
.on_stop(Deck::on_stop),
);
let actor_id = deck_handle.actor_id();
let connected = ConnectedDeck {
object_name,
handle: deck_handle,
};
self.inner
.write()
.await
.connected_decks
.insert(port_name, connected);
{
let mut inner = self.inner.write().await;
inner.connected_decks.insert(port_name.clone(), connected);
inner.children.insert(actor_id, port_name);
drop(inner);
}
// auto generated
let signal = SignalContext::new(&self.connection, self.object_name())?;
@ -159,15 +196,15 @@ impl Manager {
}
#[tracing::instrument]
async fn removed(&mut self, port_name: String) -> anyhow::Result<()> {
async fn removed(&mut self, port_name: String) -> tokio_actors::Result<()> {
tracing::debug!("removed");
let mut inner = self.inner.write().await;
inner.pending_ports.remove(&port_name);
inner.ignored_ports.remove(&port_name);
if let Some(connected) = inner.connected_decks.remove(&port_name) {
if let Some(mut connected) = inner.connected_decks.remove(&port_name) {
tracing::info!("Sending stop");
connected.handle.send(DeckMessage::Stop).await?;
connected.handle.stop();
}
Ok(())
@ -178,7 +215,7 @@ impl Manager {
&mut self,
port_name: String,
ctx: &'_ mut Context<ManagerMessage>,
) -> anyhow::Result<()> {
) -> tokio_actors::Result<()> {
tracing::debug!("found");
let inner = self.inner.read().await;

View file

@ -3,6 +3,7 @@ use crate::manager::{
ManagerMessage,
};
use tokio_actors::{BoxFuture, Context, SendHandle};
use tracing::{Instrument, Span};
use zbus::Connection;
#[derive(Clone)]
@ -30,26 +31,29 @@ impl Connector {
ctx: &'a mut Context<String>,
) -> BoxFuture<'a> {
tracing::debug!("Connector");
Box::pin(async move {
ctx.stop();
Box::pin(
async move {
ctx.stop();
match self.connect(port_name.clone()).await {
Ok(deck) => {
self.parent.send(ManagerMessage::Connected(deck)).await?;
Ok(())
}
Err(e) => {
tracing::info!("Error: {}", e);
tracing::info!("Details: {:?}", e);
self.parent.send(ManagerMessage::Ignored(port_name)).await?;
Err(e)
match self.connect(port_name.clone()).await {
Ok(deck) => {
self.parent.send(ManagerMessage::Connected(deck)).await?;
Ok(())
}
Err(e) => {
tracing::info!("Error: {}", e);
tracing::info!("Details: {:?}", e);
self.parent.send(ManagerMessage::Ignored(port_name)).await?;
Err(e)
}
}
}
})
.instrument(Span::current()),
)
}
#[tracing::instrument]
async fn connect(&mut self, port_name: String) -> anyhow::Result<Deck> {
async fn connect(&mut self, port_name: String) -> tokio_actors::Result<Deck> {
let mut port = streamdeck_handshake::handshake(port_name.as_str().into()).await?;
let identity = streamdeck_commands::identity(&mut port).await?;

View file

@ -1,13 +1,14 @@
use crate::manager::ManagerMessage;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_actors::{BoxFuture, Context, SendHandle};
use tokio_actors::{Actor, BoxFuture, Context, SendHandle};
use tokio_serial::SerialStream;
use tracing::{Instrument, Span};
use zbus::{Connection, SignalContext};
mod port;
use port::{Port, PortMessage};
use port::Port;
#[derive(Debug, Clone)]
pub(super) struct DeckInfo {
@ -19,9 +20,7 @@ pub(super) struct DeckInfo {
#[derive(Clone, Debug)]
pub(super) enum DeckMessage {
Start,
Press(u8),
Stop,
}
#[derive(Clone)]
@ -49,6 +48,7 @@ struct Id(Vec<u8>);
struct Inner {
name: String,
port: Option<SerialStream>,
child: Option<usize>,
}
impl std::fmt::Debug for Inner {
@ -112,13 +112,14 @@ impl Deck {
info: DeckInfo,
port_name: String,
parent: SendHandle<ManagerMessage>,
) -> anyhow::Result<Self> {
) -> tokio_actors::Result<Self> {
let deck = Deck {
connection: connection.clone(),
info,
port_name,
parent,
inner: Arc::new(RwLock::new(Inner {
child: None,
name,
port: Some(port),
})),
@ -137,48 +138,84 @@ impl Deck {
format!("/dog/asonix/DeckManager/{}", self.info.id)
}
#[tracing::instrument(skip(ctx))]
#[tracing::instrument(skip(_ctx))]
pub(super) fn turn<'a>(
&'a mut self,
msg: DeckMessage,
ctx: &'a mut Context<DeckMessage>,
_ctx: &'a mut Context<DeckMessage>,
) -> BoxFuture<'a> {
match msg {
DeckMessage::Start => Box::pin(self.start(ctx)),
DeckMessage::Press(key) => Box::pin(self.press(key)),
DeckMessage::Stop => Box::pin(self.stop(ctx)),
}
Box::pin(
async move {
match msg {
DeckMessage::Press(key) => self.press(key).await,
}
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
async fn start(&mut self, ctx: &'_ mut Context<DeckMessage>) -> anyhow::Result<()> {
pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context<DeckMessage>) -> BoxFuture<'a> {
Box::pin(self.start(ctx).instrument(Span::current()))
}
#[tracing::instrument(skip(_ctx))]
pub(super) fn on_stop<'a>(&'a mut self, _ctx: &'a mut Context<DeckMessage>) -> BoxFuture<'a> {
Box::pin(
async move {
self.connection
.object_server_mut()
.await
.remove::<Deck, _>(self.object_name())?;
Ok(())
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
pub(super) fn on_remove<'a>(
&'a mut self,
child_id: usize,
ctx: &'a mut Context<DeckMessage>,
) -> BoxFuture<'a> {
Box::pin(
async move {
let mut inner = self.inner.write().await;
if let Some(id) = inner.child.take() {
if id == child_id {
ctx.stop();
} else {
inner.child = Some(id);
}
}
Ok(())
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
async fn start(&mut self, ctx: &'_ mut Context<DeckMessage>) -> tokio_actors::Result<()> {
if let Some(serial) = self.inner.write().await.port.take() {
let port = Port::new(ctx.handle(), serial);
let port_handle = ctx.spawn_child(port, Port::turn);
port_handle.send(PortMessage::Start).await?;
let port_handle =
ctx.spawn_child_with_hooks(Actor::new(port, Port::turn).on_start(Port::on_start));
self.inner.write().await.child = Some(port_handle.actor_id());
}
Ok(())
}
#[tracing::instrument]
async fn press(&mut self, key: u8) -> anyhow::Result<()> {
async fn press(&mut self, key: u8) -> tokio_actors::Result<()> {
tracing::debug!("{}", key);
let ctx = SignalContext::new(&self.connection, self.object_name())?;
self.key_pressed(&ctx, key).await?;
Ok(())
}
#[tracing::instrument(skip(ctx))]
async fn stop(&mut self, ctx: &'_ mut Context<DeckMessage>) -> anyhow::Result<()> {
ctx.stop();
self.connection
.object_server_mut()
.await
.remove::<Deck, _>(self.object_name())?;
Ok(())
}
pub(super) fn port_name(&self) -> &str {
&self.port_name
}

View file

@ -2,6 +2,7 @@ use super::DeckMessage;
use tokio::io::AsyncReadExt;
use tokio_actors::{BoxFuture, Context, SendHandle};
use tokio_serial::SerialStream;
use tracing::{Instrument, Span};
#[derive(Clone, Debug)]
pub(super) enum PortMessage {
@ -24,23 +25,38 @@ impl Port {
Port { parent, serial }
}
#[tracing::instrument(skip(ctx))]
pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context<PortMessage>) -> BoxFuture<'a> {
Box::pin(
async move {
ctx.stop_on_error();
ctx.handle().send(PortMessage::Start).await?;
Ok(())
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
pub(super) fn turn<'a>(
&'a mut self,
msg: PortMessage,
ctx: &'a mut Context<PortMessage>,
) -> BoxFuture<'a> {
match msg {
PortMessage::Start => Box::pin(self.start(ctx)),
}
Box::pin(
async move {
match msg {
PortMessage::Start => self.start(ctx).await,
}
}
.instrument(Span::current()),
)
}
#[tracing::instrument(skip(ctx))]
async fn start(&mut self, ctx: &'_ mut Context<PortMessage>) -> anyhow::Result<()> {
if let Err(e) = ctx.handle().send(PortMessage::Start).await {
ctx.stop();
return Err(e.into());
}
async fn start(&mut self, ctx: &'_ mut Context<PortMessage>) -> tokio_actors::Result<()> {
ctx.handle().send(PortMessage::Start).await?;
loop {
let mut key = [0u8; 1];

View file

@ -4,6 +4,7 @@ use std::{
path::{Path, PathBuf},
};
use tokio_actors::{BoxFuture, Context, SendHandle};
use tracing::{Instrument, Span};
#[derive(Clone)]
pub(super) struct Searcher {
@ -30,11 +31,11 @@ impl Searcher {
#[tracing::instrument(skip(_ctx))]
pub(super) fn turn<'a>(&'a mut self, _: (), _ctx: &'a mut Context<()>) -> BoxFuture<'a> {
Box::pin(self.search())
Box::pin(self.search().instrument(Span::current()))
}
#[tracing::instrument]
async fn search(&mut self) -> anyhow::Result<()> {
async fn search(&mut self) -> tokio_actors::Result<()> {
let ports = tokio::task::block_in_place(|| tokio_serial::available_ports())?;
let port_names = ports