diff --git a/Cargo.lock b/Cargo.lock index a721e48..53ec181 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,12 +41,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "anyhow" -version = "1.0.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" - [[package]] name = "async-broadcast" version = "0.3.4" @@ -632,26 +626,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pin-project" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.7" @@ -944,7 +918,6 @@ dependencies = [ name = "streamdeck-dbus" version = "0.1.0" dependencies = [ - "anyhow", "streamdeck-commands", "streamdeck-handshake", "tokio", @@ -952,9 +925,7 @@ dependencies = [ "tokio-serial", "tracing", "tracing-error", - "tracing-futures", "tracing-subscriber", - "uuid", "zbus", ] @@ -972,9 +943,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.77" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0" +checksum = "a4eac2e6c19f5c3abc0c229bea31ff0b9b091c7b14990e8924b92902a303a0c0" dependencies = [ "proc-macro2", "quote", @@ -1033,12 +1004,12 @@ dependencies = [ [[package]] name = "tokio-actors" version = "0.1.0" -source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#5d7500f4c201a564c67e36b5ce3d4cd41b74e3c4" +source = "git+https://git.asonix.dog/asonix/tokio-actors?branch=main#5d33b8af249f4b9bd9fc3cae05355a5d48819328" dependencies = [ - "anyhow", - "log", "once_cell", "tokio", + "tracing", + "tracing-error", ] [[package]] @@ -1116,16 +1087,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "tracing-log" version = "0.1.2" @@ -1175,16 +1136,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" -[[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" -dependencies = [ - "getrandom", - "serde", -] - [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index 4cd4efb..e9a109d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1" streamdeck-commands = { version = "0.1.0", git = "https://git.asonix.dog/asonix/streamdeck-commands", branch = "main" } streamdeck-handshake = { version = "0.1.0", git = "https://git.asonix.dog/asonix/streamdeck-handshake", branch = "main" } tokio = { version = "1", features = ["full"] } @@ -14,7 +13,5 @@ tokio-serial = "5.4.1" tokio-actors = { version = "0.1", git = "https://git.asonix.dog/asonix/tokio-actors", branch = "main" } tracing = "0.1.28" tracing-error = "0.1.2" -tracing-futures = "0.2.5" tracing-subscriber = { version = "0.2.24", features = ["fmt"] } -uuid = { version = "0.8.2", features = ["serde", "v4"]} zbus = "2.0.0-beta.7" diff --git a/src/main.rs b/src/main.rs index 0817df1..6fc270d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod manager; use manager::Manager; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> tokio_actors::Result<()> { init_tracing()?; let mut root = tokio_actors::root(); @@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -fn init_tracing() -> anyhow::Result<()> { +fn init_tracing() -> tokio_actors::Result<()> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); let formatter = tracing_subscriber::fmt::layer() diff --git a/src/manager.rs b/src/manager.rs index da86fef..9d5fdc4 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -4,7 +4,8 @@ use std::{ time::Duration, }; use tokio::sync::RwLock; -use tokio_actors::{BoxFuture, Context, Handle, SendHandle}; +use tokio_actors::{Actor, BoxFuture, Context, Handle, SendHandle}; +use tracing::{Instrument, Span}; use zbus::SignalContext; mod connector; @@ -33,6 +34,7 @@ struct ConnectedDeck { struct Inner { pending_ports: HashSet, connected_decks: HashMap, + children: HashMap, ignored_ports: HashSet, } @@ -69,7 +71,7 @@ impl Manager { } impl Manager { - pub(super) async fn start(root_handle: &mut Handle<()>) -> anyhow::Result<()> { + pub(super) async fn start(root_handle: &mut Handle<()>) -> tokio_actors::Result<()> { let connection = zbus::Connection::session().await?; connection.request_name("dog.asonix.DeckManager").await?; @@ -83,7 +85,11 @@ impl Manager { .await .at(manager.object_name(), manager.clone())?; - let manager = root_handle.spawn_child(manager, Manager::turn).await?; + let manager = root_handle + .spawn_child_with_hooks( + Actor::new(manager, Manager::turn).on_remove(Manager::on_remove), + ) + .await?; let searcher = Searcher::new(manager); let searcher = root_handle.spawn_child(searcher, Searcher::turn).await?; searcher.every(Duration::from_secs(1), || ()); @@ -98,12 +104,35 @@ impl Manager { msg: ManagerMessage, ctx: &'a mut Context, ) -> BoxFuture<'a> { - match msg { - ManagerMessage::Found(port_name) => Box::pin(self.found(port_name, ctx)), - ManagerMessage::Removed(port_name) => Box::pin(self.removed(port_name)), - ManagerMessage::Connected(deck) => Box::pin(self.connected(deck, ctx)), - ManagerMessage::Ignored(port_name) => Box::pin(self.ignored(port_name)), - } + Box::pin( + async move { + match msg { + ManagerMessage::Found(port_name) => self.found(port_name, ctx).await, + ManagerMessage::Removed(port_name) => self.removed(port_name).await, + ManagerMessage::Connected(deck) => self.connected(deck, ctx).await, + ManagerMessage::Ignored(port_name) => self.ignored(port_name).await, + } + } + .instrument(Span::current()), + ) + } + + #[tracing::instrument(skip(_ctx))] + fn on_remove<'a>( + &'a mut self, + child_id: usize, + _ctx: &'a mut Context, + ) -> BoxFuture<'a> { + Box::pin( + async move { + let mut inner = self.inner.write().await; + if let Some(port_name) = inner.children.remove(&child_id) { + inner.connected_decks.remove(&port_name); + } + Ok(()) + } + .instrument(Span::current()), + ) } fn object_name(&self) -> String { @@ -111,7 +140,7 @@ impl Manager { } #[tracing::instrument] - async fn ignored(&mut self, port_name: String) -> anyhow::Result<()> { + async fn ignored(&mut self, port_name: String) -> tokio_actors::Result<()> { tracing::debug!("ignored"); self.inner.write().await.ignored_ports.insert(port_name); Ok(()) @@ -122,7 +151,7 @@ impl Manager { &mut self, deck: Deck, ctx: &'_ mut Context, - ) -> anyhow::Result<()> { + ) -> tokio_actors::Result<()> { tracing::debug!("connected"); let was_pending = self @@ -138,19 +167,27 @@ impl Manager { let port_name = deck.port_name().to_string(); let object_name = deck.object_name(); - let deck_handle = ctx.spawn_child(deck, Deck::turn); - deck_handle.send(DeckMessage::Start).await?; + let deck_handle = ctx.spawn_child_with_hooks( + Actor::new(deck, Deck::turn) + .on_remove(Deck::on_remove) + .on_start(Deck::on_start) + .on_stop(Deck::on_stop), + ); + + let actor_id = deck_handle.actor_id(); let connected = ConnectedDeck { object_name, handle: deck_handle, }; - self.inner - .write() - .await - .connected_decks - .insert(port_name, connected); + { + let mut inner = self.inner.write().await; + + inner.connected_decks.insert(port_name.clone(), connected); + inner.children.insert(actor_id, port_name); + drop(inner); + } // auto generated let signal = SignalContext::new(&self.connection, self.object_name())?; @@ -159,15 +196,15 @@ impl Manager { } #[tracing::instrument] - async fn removed(&mut self, port_name: String) -> anyhow::Result<()> { + async fn removed(&mut self, port_name: String) -> tokio_actors::Result<()> { tracing::debug!("removed"); let mut inner = self.inner.write().await; inner.pending_ports.remove(&port_name); inner.ignored_ports.remove(&port_name); - if let Some(connected) = inner.connected_decks.remove(&port_name) { + if let Some(mut connected) = inner.connected_decks.remove(&port_name) { tracing::info!("Sending stop"); - connected.handle.send(DeckMessage::Stop).await?; + connected.handle.stop(); } Ok(()) @@ -178,7 +215,7 @@ impl Manager { &mut self, port_name: String, ctx: &'_ mut Context, - ) -> anyhow::Result<()> { + ) -> tokio_actors::Result<()> { tracing::debug!("found"); let inner = self.inner.read().await; diff --git a/src/manager/connector.rs b/src/manager/connector.rs index 364d76e..8ff39d8 100644 --- a/src/manager/connector.rs +++ b/src/manager/connector.rs @@ -3,6 +3,7 @@ use crate::manager::{ ManagerMessage, }; use tokio_actors::{BoxFuture, Context, SendHandle}; +use tracing::{Instrument, Span}; use zbus::Connection; #[derive(Clone)] @@ -30,26 +31,29 @@ impl Connector { ctx: &'a mut Context, ) -> BoxFuture<'a> { tracing::debug!("Connector"); - Box::pin(async move { - ctx.stop(); + Box::pin( + async move { + ctx.stop(); - match self.connect(port_name.clone()).await { - Ok(deck) => { - self.parent.send(ManagerMessage::Connected(deck)).await?; - Ok(()) - } - Err(e) => { - tracing::info!("Error: {}", e); - tracing::info!("Details: {:?}", e); - self.parent.send(ManagerMessage::Ignored(port_name)).await?; - Err(e) + match self.connect(port_name.clone()).await { + Ok(deck) => { + self.parent.send(ManagerMessage::Connected(deck)).await?; + Ok(()) + } + Err(e) => { + tracing::info!("Error: {}", e); + tracing::info!("Details: {:?}", e); + self.parent.send(ManagerMessage::Ignored(port_name)).await?; + Err(e) + } } } - }) + .instrument(Span::current()), + ) } #[tracing::instrument] - async fn connect(&mut self, port_name: String) -> anyhow::Result { + async fn connect(&mut self, port_name: String) -> tokio_actors::Result { let mut port = streamdeck_handshake::handshake(port_name.as_str().into()).await?; let identity = streamdeck_commands::identity(&mut port).await?; diff --git a/src/manager/deck.rs b/src/manager/deck.rs index f8726f3..157a5e9 100644 --- a/src/manager/deck.rs +++ b/src/manager/deck.rs @@ -1,13 +1,14 @@ use crate::manager::ManagerMessage; use std::sync::Arc; use tokio::sync::RwLock; -use tokio_actors::{BoxFuture, Context, SendHandle}; +use tokio_actors::{Actor, BoxFuture, Context, SendHandle}; use tokio_serial::SerialStream; +use tracing::{Instrument, Span}; use zbus::{Connection, SignalContext}; mod port; -use port::{Port, PortMessage}; +use port::Port; #[derive(Debug, Clone)] pub(super) struct DeckInfo { @@ -19,9 +20,7 @@ pub(super) struct DeckInfo { #[derive(Clone, Debug)] pub(super) enum DeckMessage { - Start, Press(u8), - Stop, } #[derive(Clone)] @@ -49,6 +48,7 @@ struct Id(Vec); struct Inner { name: String, port: Option, + child: Option, } impl std::fmt::Debug for Inner { @@ -112,13 +112,14 @@ impl Deck { info: DeckInfo, port_name: String, parent: SendHandle, - ) -> anyhow::Result { + ) -> tokio_actors::Result { let deck = Deck { connection: connection.clone(), info, port_name, parent, inner: Arc::new(RwLock::new(Inner { + child: None, name, port: Some(port), })), @@ -137,48 +138,84 @@ impl Deck { format!("/dog/asonix/DeckManager/{}", self.info.id) } - #[tracing::instrument(skip(ctx))] + #[tracing::instrument(skip(_ctx))] pub(super) fn turn<'a>( &'a mut self, msg: DeckMessage, - ctx: &'a mut Context, + _ctx: &'a mut Context, ) -> BoxFuture<'a> { - match msg { - DeckMessage::Start => Box::pin(self.start(ctx)), - DeckMessage::Press(key) => Box::pin(self.press(key)), - DeckMessage::Stop => Box::pin(self.stop(ctx)), - } + Box::pin( + async move { + match msg { + DeckMessage::Press(key) => self.press(key).await, + } + } + .instrument(Span::current()), + ) } #[tracing::instrument(skip(ctx))] - async fn start(&mut self, ctx: &'_ mut Context) -> anyhow::Result<()> { + pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context) -> BoxFuture<'a> { + Box::pin(self.start(ctx).instrument(Span::current())) + } + + #[tracing::instrument(skip(_ctx))] + pub(super) fn on_stop<'a>(&'a mut self, _ctx: &'a mut Context) -> BoxFuture<'a> { + Box::pin( + async move { + self.connection + .object_server_mut() + .await + .remove::(self.object_name())?; + + Ok(()) + } + .instrument(Span::current()), + ) + } + + #[tracing::instrument(skip(ctx))] + pub(super) fn on_remove<'a>( + &'a mut self, + child_id: usize, + ctx: &'a mut Context, + ) -> BoxFuture<'a> { + Box::pin( + async move { + let mut inner = self.inner.write().await; + if let Some(id) = inner.child.take() { + if id == child_id { + ctx.stop(); + } else { + inner.child = Some(id); + } + } + + Ok(()) + } + .instrument(Span::current()), + ) + } + + #[tracing::instrument(skip(ctx))] + async fn start(&mut self, ctx: &'_ mut Context) -> tokio_actors::Result<()> { if let Some(serial) = self.inner.write().await.port.take() { let port = Port::new(ctx.handle(), serial); - let port_handle = ctx.spawn_child(port, Port::turn); - port_handle.send(PortMessage::Start).await?; + let port_handle = + ctx.spawn_child_with_hooks(Actor::new(port, Port::turn).on_start(Port::on_start)); + self.inner.write().await.child = Some(port_handle.actor_id()); } Ok(()) } #[tracing::instrument] - async fn press(&mut self, key: u8) -> anyhow::Result<()> { + async fn press(&mut self, key: u8) -> tokio_actors::Result<()> { tracing::debug!("{}", key); let ctx = SignalContext::new(&self.connection, self.object_name())?; self.key_pressed(&ctx, key).await?; Ok(()) } - #[tracing::instrument(skip(ctx))] - async fn stop(&mut self, ctx: &'_ mut Context) -> anyhow::Result<()> { - ctx.stop(); - self.connection - .object_server_mut() - .await - .remove::(self.object_name())?; - - Ok(()) - } - pub(super) fn port_name(&self) -> &str { &self.port_name } diff --git a/src/manager/deck/port.rs b/src/manager/deck/port.rs index 7402029..28ce465 100644 --- a/src/manager/deck/port.rs +++ b/src/manager/deck/port.rs @@ -2,6 +2,7 @@ use super::DeckMessage; use tokio::io::AsyncReadExt; use tokio_actors::{BoxFuture, Context, SendHandle}; use tokio_serial::SerialStream; +use tracing::{Instrument, Span}; #[derive(Clone, Debug)] pub(super) enum PortMessage { @@ -24,23 +25,38 @@ impl Port { Port { parent, serial } } + #[tracing::instrument(skip(ctx))] + pub(super) fn on_start<'a>(&'a mut self, ctx: &'a mut Context) -> BoxFuture<'a> { + Box::pin( + async move { + ctx.stop_on_error(); + ctx.handle().send(PortMessage::Start).await?; + + Ok(()) + } + .instrument(Span::current()), + ) + } + #[tracing::instrument(skip(ctx))] pub(super) fn turn<'a>( &'a mut self, msg: PortMessage, ctx: &'a mut Context, ) -> BoxFuture<'a> { - match msg { - PortMessage::Start => Box::pin(self.start(ctx)), - } + Box::pin( + async move { + match msg { + PortMessage::Start => self.start(ctx).await, + } + } + .instrument(Span::current()), + ) } #[tracing::instrument(skip(ctx))] - async fn start(&mut self, ctx: &'_ mut Context) -> anyhow::Result<()> { - if let Err(e) = ctx.handle().send(PortMessage::Start).await { - ctx.stop(); - return Err(e.into()); - } + async fn start(&mut self, ctx: &'_ mut Context) -> tokio_actors::Result<()> { + ctx.handle().send(PortMessage::Start).await?; loop { let mut key = [0u8; 1]; diff --git a/src/manager/searcher.rs b/src/manager/searcher.rs index 3deac29..6f7f09d 100644 --- a/src/manager/searcher.rs +++ b/src/manager/searcher.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, }; use tokio_actors::{BoxFuture, Context, SendHandle}; +use tracing::{Instrument, Span}; #[derive(Clone)] pub(super) struct Searcher { @@ -30,11 +31,11 @@ impl Searcher { #[tracing::instrument(skip(_ctx))] pub(super) fn turn<'a>(&'a mut self, _: (), _ctx: &'a mut Context<()>) -> BoxFuture<'a> { - Box::pin(self.search()) + Box::pin(self.search().instrument(Span::current())) } #[tracing::instrument] - async fn search(&mut self) -> anyhow::Result<()> { + async fn search(&mut self) -> tokio_actors::Result<()> { let ports = tokio::task::block_in_place(|| tokio_serial::available_ports())?; let port_names = ports