From 7bb1073fd9273a88bb6ea628ab493a327e8dbeb9 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sat, 12 Mar 2022 22:44:10 -0600 Subject: [PATCH] Rework dbus hierarchy --- streamdeck-common/src/input.rs | 11 +- streamdeck-daemon/Cargo.toml | 2 +- streamdeck-daemon/src/dbus.rs | 323 ++++++++++----------------- streamdeck-daemon/src/dbus/button.rs | 105 +++++++++ streamdeck-daemon/src/dbus/deck.rs | 166 ++++++++++++++ streamdeck-daemon/src/dbus/obs.rs | 147 ++++++++++++ streamdeck-daemon/src/deck.rs | 13 +- streamdeck-daemon/src/input.rs | 26 +-- streamdeck-daemon/src/main.rs | 40 ++-- streamdeck-daemon/src/manager.rs | 54 +++-- streamdeck-daemon/src/message.rs | 9 +- streamdeck-daemon/src/port.rs | 2 +- streamdeck-daemon/src/store.rs | 55 ++--- 13 files changed, 647 insertions(+), 306 deletions(-) create mode 100644 streamdeck-daemon/src/dbus/button.rs create mode 100644 streamdeck-daemon/src/dbus/deck.rs create mode 100644 streamdeck-daemon/src/dbus/obs.rs diff --git a/streamdeck-common/src/input.rs b/streamdeck-common/src/input.rs index 5b63c2c..6822258 100644 --- a/streamdeck-common/src/input.rs +++ b/streamdeck-common/src/input.rs @@ -1,7 +1,16 @@ use std::ops::Deref; #[derive( - Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize, + Clone, + Debug, + Default, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Deserialize, + serde::Serialize, )] #[cfg_attr(feature = "zbus", derive(zbus::zvariant::Type))] #[serde(transparent)] diff --git a/streamdeck-daemon/Cargo.toml b/streamdeck-daemon/Cargo.toml index 6dbc8d0..6e1b988 100644 --- a/streamdeck-daemon/Cargo.toml +++ b/streamdeck-daemon/Cargo.toml @@ -12,7 +12,7 @@ ipc-dbus = ["zbus", "streamdeck-common/ipc-dbus"] [dependencies] anyhow = "1" -base64 = "0.13.0" +base16ct = { version = "0.1.1", features = ["std"] } directories = "4.0" either = "1.6.1" futures-util = "0.3.15" diff --git a/streamdeck-daemon/src/dbus.rs b/streamdeck-daemon/src/dbus.rs index 9e58e18..aa9ae4b 100644 --- a/streamdeck-daemon/src/dbus.rs +++ b/streamdeck-daemon/src/dbus.rs @@ -1,111 +1,38 @@ use crate::{ - message::{Input, InputMessage, ManagerMessage, ObsMessage}, + message::{DeckMessage, InputMessage, ManagerMessage, ObsMessage}, store::Store, }; -use streamdeck_common::Command; -use tokio::sync::mpsc::Sender; +use std::collections::BTreeSet; +use streamdeck_common::Input; +use tokio::sync::{ + broadcast::Receiver, + mpsc::{self, Sender}, +}; +use zbus::SignalContext; -struct DeckService { - store: Store, - input: Sender, - obs: Sender, - manager: Sender, +mod button; +mod deck; +mod obs; + +pub(crate) const fn daemon_path() -> &'static str { + "/dog/asonix/git/asonix/StreamdeckDaemon" } -impl DeckService { - async fn do_query(&self, query: &str) -> anyhow::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); +pub(crate) const fn obs_path() -> &'static str { + "/dog/asonix/git/asonix/StreamdeckDaemon/obs" +} - let query = serde_json::from_str(query)?; +pub(crate) fn deck_path(serial_number: &str) -> String { + String::new() + daemon_path() + "/" + serial_number +} - self.obs.send(ObsMessage::Query(tx, query)).await?; - let response = rx.await?; - let response = serde_json::to_string(&response)?; +pub(crate) fn button_path(serial_number: &str, input: &Input) -> String { + deck_path(serial_number) + "/" + input.to_string().as_str() +} - Ok(response) - } - - async fn do_test(&self, command: &str) -> anyhow::Result<()> { - let command = serde_json::from_str(command)?; - self.obs.send(ObsMessage::Command(command)).await?; - Ok(()) - } - - async fn do_get_decks(&self) -> anyhow::Result> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.manager - .send(ManagerMessage::List(tx)) - .await - .map_err(|_| anyhow::anyhow!("manager is dropped"))?; - let decks = rx.await?; - - Ok(decks - .into_iter() - .map(|d| (d.serial_number, d.product_name, d.port_name)) - .collect::>()) - } - - async fn do_connect(&self, host: String, port: u16) -> anyhow::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.obs.send(ObsMessage::Connect(host, port)).await?; - self.obs.send(ObsMessage::State(tx)).await?; - let state = rx.await?; - Ok(state.to_string()) - } - - async fn do_disconnect(&self) -> anyhow::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.obs.send(ObsMessage::Disconnect).await?; - self.obs.send(ObsMessage::State(tx)).await?; - let state = rx.await?; - - Ok(state.to_string()) - } - - async fn do_get_state(&self) -> anyhow::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.obs.send(ObsMessage::State(tx)).await?; - let state = rx.await?; - - Ok(state.to_string()) - } - - async fn do_login(&self, password: String) -> anyhow::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.obs.send(ObsMessage::Authenticate(password)).await?; - self.obs.send(ObsMessage::State(tx)).await?; - let state = rx.await?; - - Ok(state.to_string()) - } - - async fn do_read_input(&self) -> anyhow::Result> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.input - .send(InputMessage::ReadInput(tx)) - .await - .map_err(|_| anyhow::anyhow!("input handler is dropped"))?; - let (serial_number, keys) = rx.await?; - - Ok(vec![(keys, serial_number)]) - } - - async fn do_set_input( - &self, - serial_number: &str, - keys: Input, - command: &str, - ) -> anyhow::Result<()> { - let command: Command = serde_json::from_str(command)?; - self.store.store(serial_number, keys, &command).await?; - Ok(()) - } +struct Daemon { + manager: Sender, + decks: BTreeSet, } fn fail(e: impl std::fmt::Display) -> zbus::fdo::Error { @@ -113,17 +40,7 @@ fn fail(e: impl std::fmt::Display) -> zbus::fdo::Error { } #[zbus::dbus_interface(name = "dog.asonix.git.asonix.StreamdeckDaemon")] -impl DeckService { - async fn query(&self, query: &str) -> zbus::fdo::Result { - tracing::debug!("query"); - self.do_query(query).await.map_err(fail) - } - - async fn test(&self, command: &str) -> zbus::fdo::Result<()> { - tracing::debug!("test"); - self.do_test(command).await.map_err(fail) - } - +impl Daemon { async fn enable_discovery(&self) -> zbus::fdo::Result<()> { tracing::debug!("enable_discovery"); self.manager @@ -140,121 +57,111 @@ impl DeckService { .map_err(fail) } - async fn get_decks(&self) -> zbus::fdo::Result> { + async fn get_decks(&self) -> Vec { tracing::debug!("get_decks"); - self.do_get_decks().await.map_err(fail) + self.decks.iter().cloned().collect() } - async fn connect(&self, host: String, port: u16) -> zbus::fdo::Result { - tracing::debug!("connect"); - self.do_connect(host, port).await.map_err(fail) - } + #[dbus_interface(signal)] + async fn deck_added(ctx: &SignalContext<'_>, serial_number: &str) -> zbus::Result<()>; - async fn disconnect(&self) -> zbus::fdo::Result { - tracing::debug!("disconnect"); - self.do_disconnect().await.map_err(fail) - } - - async fn get_state(&self) -> zbus::fdo::Result { - tracing::debug!("get_state"); - self.do_get_state().await.map_err(fail) - } - - async fn login(&self, password: String) -> zbus::fdo::Result { - tracing::debug!("login"); - self.do_login(password).await.map_err(fail) - } - - async fn get_commands(&self, serial_number: &str) -> zbus::fdo::Result> { - tracing::debug!("get_commands"); - let hm = self.store.get_commands(serial_number).await.map_err(fail)?; - - Ok(hm - .into_iter() - .filter_map(|(keys, cmd)| Some((keys, serde_json::to_string(&cmd).ok()?))) - .collect()) - } - - async fn read_input(&self) -> zbus::fdo::Result> { - tracing::debug!("read input"); - self.do_read_input().await.map_err(fail) - } - - async fn set_input( - &self, - serial_number: &str, - keys: Input, - command: &str, - ) -> zbus::fdo::Result<()> { - tracing::debug!("set input"); - self.do_set_input(serial_number, keys, command) - .await - .map_err(fail) - } - - async fn unset_input(&self, serial_number: &str, keys: Input) -> zbus::fdo::Result<()> { - tracing::debug!("unset input"); - self.store.unset(serial_number, keys).await.map_err(fail) - } - - async fn set_input_name( - &self, - serial_number: &str, - keys: Input, - name: &str, - ) -> zbus::fdo::Result<()> { - tracing::debug!("set_input_name"); - self.store - .set_input_name(serial_number, keys, name) - .await - .map_err(fail) - } - - async fn get_input_names( - &self, - serial_number: &str, - ) -> zbus::fdo::Result> { - tracing::debug!("get_input_names"); - let hm = self.store.input_names(serial_number).await.map_err(fail)?; - Ok(hm.into_iter().collect()) - } - - async fn set_deck_name(&self, serial_number: &str, name: &str) -> zbus::fdo::Result<()> { - tracing::debug!("set_deck_name"); - self.store - .set_deck_name(serial_number, name) - .await - .map_err(fail) - } - - async fn get_deck_name(&self, serial_number: &str) -> zbus::fdo::Result { - tracing::debug!("get_deck_name"); - self.store - .deck_name(serial_number) - .await - .map(|opt| opt.unwrap_or_else(|| String::from("Streamdeck"))) - .map_err(fail) - } + #[dbus_interface(signal)] + async fn deck_removed(ctx: &SignalContext<'_>, serial_number: &str) -> zbus::Result<()>; } +#[tracing::instrument(skip_all)] pub(crate) async fn spawn( store: Store, - input: Sender, + mut input: Receiver, + mut deck_rx: mpsc::Receiver, obs: Sender, manager: Sender, ) -> anyhow::Result { - let state = DeckService { - store, - input, - obs, - manager, + let state = Daemon { + manager: manager.clone(), + decks: BTreeSet::new(), }; let conn = zbus::ConnectionBuilder::session()? .name("dog.asonix.git.asonix.StreamdeckDaemon")? - .serve_at("/dog/asonix/git/asonix/StreamdeckDaemon", state)? + .serve_at(daemon_path(), state)? .build() .await?; + obs::Obs::hydrate(conn.clone(), obs).await?; + + let conn2 = conn.clone(); + tokio::spawn(async move { + let conn = conn2; + while let Some(msg) = deck_rx.recv().await { + match msg { + DeckMessage::Open(config) => { + if let Ok(daemon_ref) = conn + .object_server() + .interface::<_, Daemon>(daemon_path()) + .await + { + let mut daemon = daemon_ref.get_mut().await; + if let Err(e) = deck::Deck::hydrate( + conn.clone(), + config.serial_number.clone(), + config.product_name, + config.port_name, + store.clone(), + ) + .await + { + tracing::warn!("Error hydrating deck: {}", e); + continue; + } + + daemon.decks.insert(config.serial_number.clone()); + let ctx = daemon_ref.signal_context(); + let _ = Daemon::deck_added(ctx, &config.serial_number).await; + } + } + DeckMessage::Close(serial_number) => { + let _ = conn + .object_server() + .remove::(deck_path(&serial_number)) + .await; + if let Ok(daemon_ref) = conn + .object_server() + .interface::<_, Daemon>(daemon_path()) + .await + { + let mut daemon = daemon_ref.get_mut().await; + daemon.decks.remove(&serial_number); + let ctx = daemon_ref.signal_context(); + let _ = Daemon::deck_removed(ctx, &serial_number).await; + } + } + } + } + }); + + let conn2 = conn.clone(); + tokio::spawn(async move { + let conn = conn2; + + while let Ok((serial_number, input)) = input.recv().await { + if let Ok(deck_ref) = conn + .object_server() + .interface::<_, deck::Deck>(deck_path(&serial_number)) + .await + { + let _ = deck::Deck::button_pushed(deck_ref.signal_context(), input.clone()).await; + } + + if let Ok(button_ref) = conn + .object_server() + .interface::<_, button::Button>(button_path(&serial_number, &input)) + .await + { + let _ = button::Button::pushed(button_ref.signal_context()).await; + } + } + }); + Ok(conn) } diff --git a/streamdeck-daemon/src/dbus/button.rs b/streamdeck-daemon/src/dbus/button.rs new file mode 100644 index 0000000..edba729 --- /dev/null +++ b/streamdeck-daemon/src/dbus/button.rs @@ -0,0 +1,105 @@ +use crate::{dbus::button_path, store::Store}; +use streamdeck_common::{Command, Input}; +use zbus::{Connection, SignalContext}; + +pub(super) struct Button { + serial_number: String, + keys: Input, + command: String, + name: String, + + store: Store, +} + +impl Button { + #[tracing::instrument(skip(connection))] + pub(super) async fn hydrate( + connection: Connection, + serial_number: String, + keys: Input, + store: Store, + ) -> anyhow::Result<()> { + let command_opt = store.command(&serial_number, keys.clone()).await?; + + let command = command_opt.unwrap_or(Command::SetStreaming { + operation: streamdeck_common::StateOperation::Toggle, + }); + + let command = serde_json::to_string(&command)?; + + let name_opt = store.input_name(&serial_number, keys.clone()).await?; + + let name = name_opt.unwrap_or_else(|| String::from("Streamdeck")); + + let path = button_path(&serial_number, &keys); + + let button = Button { + serial_number, + keys, + command, + name, + store, + }; + + let _ = connection.object_server().at(path, button).await?; + + Ok(()) + } +} + +#[zbus::dbus_interface(name = "dog.asonix.git.asonix.StreamdeckDaemon.Deck.Button")] +impl Button { + #[dbus_interface(property)] + async fn name(&self) -> &str { + tracing::debug!("name"); + &self.name + } + + #[dbus_interface(property)] + async fn set_name(&mut self, name: String) { + tracing::debug!("set name"); + match self.do_set_name(&name).await { + Ok(()) => self.name = name, + Err(e) => { + tracing::warn!("Failed to set name: {}", e); + } + } + } + + #[dbus_interface(property)] + async fn command(&self) -> &str { + tracing::debug!("command"); + &self.command + } + + #[dbus_interface(property)] + pub(super) async fn set_command(&mut self, command: String) { + tracing::debug!("set command"); + match self.do_set_command(&command).await { + Ok(()) => self.command = command, + Err(e) => { + tracing::warn!("Failed to set command: {}", e); + } + } + } + + #[dbus_interface(signal)] + pub(super) async fn pushed(ctx: &SignalContext<'_>) -> zbus::Result<()>; +} + +impl Button { + async fn do_set_name(&self, name: &str) -> anyhow::Result<()> { + self.store + .set_input_name(&self.serial_number, self.keys.clone(), name) + .await?; + Ok(()) + } + + async fn do_set_command(&self, command: &str) -> anyhow::Result<()> { + let command: Command = serde_json::from_str(command)?; + self.store + .store(&self.serial_number, self.keys.clone(), &command) + .await?; + Ok(()) + } +} diff --git a/streamdeck-daemon/src/dbus/deck.rs b/streamdeck-daemon/src/dbus/deck.rs new file mode 100644 index 0000000..5fb19bd --- /dev/null +++ b/streamdeck-daemon/src/dbus/deck.rs @@ -0,0 +1,166 @@ +use crate::{ + dbus::{button::Button, button_path, deck_path}, + store::Store, +}; +use std::collections::BTreeSet; +use streamdeck_common::Input; +use zbus::{Connection, SignalContext}; + +pub(super) struct Deck { + connection: Connection, + serial_number: String, + name: String, + product_name: String, + port_name: String, + store: Store, + buttons: BTreeSet, +} + +impl Deck { + #[tracing::instrument(skip(connection))] + pub(crate) async fn hydrate( + connection: Connection, + serial_number: String, + product_name: String, + port_name: String, + store: Store, + ) -> anyhow::Result<()> { + let commands = store.get_commands(&serial_number).await?; + let name = store.deck_name(&serial_number).await?; + + let path = deck_path(&serial_number); + + let deck = Deck { + connection: connection.clone(), + serial_number, + name: name.unwrap_or_else(|| String::from("Streamdeck")), + product_name, + port_name, + store, + buttons: BTreeSet::new(), + }; + + let _ = connection.object_server().at(path.clone(), deck).await?; + + let iface_ref = connection + .object_server() + .interface::<_, Deck>(path) + .await?; + let mut iface = iface_ref.get_mut().await; + + for (input, command) in commands { + let command = serde_json::to_string(&command)?; + iface.create_button(input, command).await?; + } + + Ok(()) + } +} + +#[zbus::dbus_interface(name = "dog.asonix.git.asonix.StreamdeckDaemon.Deck")] +impl Deck { + #[dbus_interface(property)] + async fn name(&self) -> &str { + tracing::debug!("name"); + &self.name + } + + #[dbus_interface(property)] + async fn set_name(&mut self, name: String) { + tracing::debug!("set name"); + if self + .store + .set_deck_name(&self.serial_number, &name) + .await + .is_ok() + { + self.name = name; + } + } + + #[dbus_interface(property)] + async fn product_name(&self) -> &str { + tracing::debug!("product_name"); + &self.product_name + } + + #[dbus_interface(property)] + async fn port_name(&self) -> &str { + tracing::debug!("port_name"); + &self.port_name + } + + async fn get_buttons(&self) -> Vec { + tracing::debug!("get buttons"); + self.buttons.iter().cloned().collect() + } + + async fn create_button(&mut self, input: Input, command: String) -> zbus::fdo::Result { + tracing::debug!("create button"); + let path = button_path(&self.serial_number, &input); + + self.buttons.insert(path.clone()); + + Button::hydrate( + self.connection.clone(), + self.serial_number.clone(), + input.clone(), + self.store.clone(), + ) + .await + .map_err(crate::dbus::fail)?; + + let iface = self + .connection + .object_server() + .interface::<_, Button>(path.clone()) + .await?; + + let mut iface_ref = iface.get_mut().await; + iface_ref.set_command(command).await; + + let ctx = iface.signal_context(); + + Self::button_added(ctx, input).await?; + + Ok(path) + } + + async fn remove_button(&mut self, input: Input) -> zbus::fdo::Result<()> { + tracing::debug!("remove button"); + let path = button_path(&self.serial_number, &input); + + self.buttons.remove(&path); + + let _ = self + .connection + .object_server() + .remove::(path.clone()) + .await?; + + self.store + .unset(&self.serial_number, input.clone()) + .await + .map_err(crate::dbus::fail)?; + + let iface = self + .connection + .object_server() + .interface::<_, Self>(path) + .await?; + let ctx = iface.signal_context(); + + Self::button_removed(ctx, input).await?; + + Ok(()) + } + + #[dbus_interface(signal)] + pub(super) async fn button_pushed(ctx: &SignalContext<'_>, input: Input) -> zbus::Result<()>; + + #[dbus_interface(signal)] + async fn button_added(ctx: &SignalContext<'_>, input: Input) -> zbus::Result<()>; + + #[dbus_interface(signal)] + async fn button_removed(ctx: &SignalContext<'_>, input: Input) -> zbus::Result<()>; +} diff --git a/streamdeck-daemon/src/dbus/obs.rs b/streamdeck-daemon/src/dbus/obs.rs new file mode 100644 index 0000000..8cc5b43 --- /dev/null +++ b/streamdeck-daemon/src/dbus/obs.rs @@ -0,0 +1,147 @@ +use crate::{dbus::obs_path, message::ObsMessage}; +use streamdeck_common::ObsState; +use tokio::sync::mpsc::Sender; +use zbus::Connection; + +pub(crate) struct Obs { + connection: Connection, + state: String, + obs: Sender, +} + +impl Obs { + #[tracing::instrument(skip_all)] + pub(crate) async fn hydrate( + connection: Connection, + obs: Sender, + ) -> anyhow::Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + obs.send(ObsMessage::State(tx)).await?; + let state = rx.await?; + + let obs = Self { + connection: connection.clone(), + obs, + state: state.to_string(), + }; + + connection.object_server().at(obs_path(), obs).await?; + + Ok(()) + } +} + +#[zbus::dbus_interface(name = "dog.asonix.git.asonix.StreamdeckDaemon.Obs")] +impl Obs { + #[dbus_interface(property)] + async fn state(&self) -> &str { + &self.state + } + + async fn connect(&mut self, host: String, port: u16) -> zbus::fdo::Result<()> { + tracing::debug!("connect"); + self.do_connect(host, port) + .await + .map_err(crate::dbus::fail)?; + + Ok(()) + } + + async fn disconnect(&mut self) -> zbus::fdo::Result<()> { + tracing::debug!("disconnect"); + self.do_disconnect().await.map_err(crate::dbus::fail)?; + + Ok(()) + } + + async fn login(&mut self, password: String) -> zbus::fdo::Result<()> { + tracing::debug!("login"); + self.do_login(password).await.map_err(crate::dbus::fail)?; + + Ok(()) + } + + async fn query(&self, query: &str) -> zbus::fdo::Result { + tracing::debug!("query"); + self.do_query(query).await.map_err(crate::dbus::fail) + } + + async fn test(&self, command: &str) -> zbus::fdo::Result<()> { + tracing::debug!("test"); + + self.do_test(command).await.map_err(crate::dbus::fail) + } +} + +impl Obs { + async fn set_state(&mut self, state: ObsState) -> anyhow::Result<()> { + let state = state.to_string(); + if self.state != state { + self.state = state; + + let iface = self + .connection + .object_server() + .interface::<_, Self>(obs_path()) + .await?; + + let signal_context = iface.signal_context(); + + self.state_changed(signal_context).await?; + } + + Ok(()) + } + + async fn do_connect(&mut self, host: String, port: u16) -> anyhow::Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.obs.send(ObsMessage::Connect(host, port)).await?; + self.obs.send(ObsMessage::State(tx)).await?; + let state = rx.await?; + + self.set_state(state).await?; + Ok(()) + } + + async fn do_disconnect(&mut self) -> anyhow::Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.obs.send(ObsMessage::Disconnect).await?; + self.obs.send(ObsMessage::State(tx)).await?; + let state = rx.await?; + + self.set_state(state).await?; + Ok(()) + } + + async fn do_login(&mut self, password: String) -> anyhow::Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.obs.send(ObsMessage::Authenticate(password)).await?; + self.obs.send(ObsMessage::State(tx)).await?; + let state = rx.await?; + + self.set_state(state).await?; + Ok(()) + } + + async fn do_query(&self, query: &str) -> anyhow::Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let query = serde_json::from_str(query)?; + + self.obs.send(ObsMessage::Query(tx, query)).await?; + let response = rx.await?; + let response = serde_json::to_string(&response)?; + + Ok(response) + } + + async fn do_test(&self, command: &str) -> anyhow::Result<()> { + let command = serde_json::from_str(command)?; + self.obs.send(ObsMessage::Command(command)).await?; + Ok(()) + } +} diff --git a/streamdeck-daemon/src/deck.rs b/streamdeck-daemon/src/deck.rs index fb2f4c5..46a1126 100644 --- a/streamdeck-daemon/src/deck.rs +++ b/streamdeck-daemon/src/deck.rs @@ -2,12 +2,15 @@ use crate::{ message::{InputMessage, ManagerMessage}, port::{DeckConfig, Port}, }; -use tokio::{sync::mpsc::Sender, time::Duration}; +use tokio::{ + sync::{broadcast, mpsc::Sender}, + time::Duration, +}; #[derive(Debug)] pub(crate) struct Deck { pub(crate) manager: Sender, - pub(crate) input: Sender, + pub(crate) input: broadcast::Sender, pub(crate) port: Port, pub(crate) config: DeckConfig, } @@ -39,16 +42,14 @@ impl Deck { } } -async fn io_task(port: Port, sender: Sender, serial_number: String) { +async fn io_task(port: Port, sender: broadcast::Sender, serial_number: String) { let (mut read_port, _write_port) = port.split(); loop { tracing::trace!("read loop"); match read_port.read_keys().await { Ok(input) => { - let _ = sender - .send(InputMessage::Press(serial_number.clone(), input)) - .await; + let _ = sender.send((serial_number.clone(), input)); } Err(e) => { tracing::trace!("Error reading: {}", e); diff --git a/streamdeck-daemon/src/input.rs b/streamdeck-daemon/src/input.rs index 038504d..704de84 100644 --- a/streamdeck-daemon/src/input.rs +++ b/streamdeck-daemon/src/input.rs @@ -1,29 +1,15 @@ use crate::{ - message::{Input, InputMessage, ObsMessage}, + message::{InputMessage, ObsMessage}, store::Store, }; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; +use tokio::sync::{broadcast::Receiver, mpsc::Sender}; pub(crate) async fn task(store: Store, mut rx: Receiver, obs_tx: Sender) { - let mut sender_opt: Option> = None; + while let Ok((serial_number, keys)) = rx.recv().await { + tracing::debug!("{}: {}", serial_number, keys); - while let Some(msg) = rx.recv().await { - match msg { - InputMessage::Press(serial_number, keys) => { - tracing::debug!("{}: {}", serial_number, keys); - - if let Some(sender) = sender_opt.take() { - let _ = sender.send((serial_number, keys)); - } else if let Ok(Some(command)) = store.pressed(&serial_number, keys).await { - let _ = obs_tx.send(ObsMessage::Command(command)).await; - } - } - InputMessage::ReadInput(sender) => { - sender_opt = Some(sender); - } + if let Ok(Some(command)) = store.command(&serial_number, keys).await { + let _ = obs_tx.send(ObsMessage::Command(command)).await; } } } diff --git a/streamdeck-daemon/src/main.rs b/streamdeck-daemon/src/main.rs index 51213f4..f90e613 100644 --- a/streamdeck-daemon/src/main.rs +++ b/streamdeck-daemon/src/main.rs @@ -19,8 +19,9 @@ async fn main() -> anyhow::Result<()> { init_tracing()?; let (manager_tx, manager_rx) = tokio::sync::mpsc::channel(16); - let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); + let (input_tx, input_rx) = tokio::sync::broadcast::channel(16); let (obs_tx, obs_rx) = tokio::sync::mpsc::channel(16); + let (deck_tx, deck_rx) = tokio::sync::mpsc::channel(16); let project_dirs = ProjectDirs::from("dog", "asonix", "Streamdeck") .ok_or_else(|| anyhow::anyhow!("No home directory found"))?; @@ -35,26 +36,35 @@ async fn main() -> anyhow::Result<()> { let mut handles = vec![]; - #[cfg(feature = "ipc-dbus")] - { - dbus::spawn( - store.clone(), - input_tx.clone(), - obs_tx.clone(), - manager_tx.clone(), - ) - .await?; - } + tracing::info!("Spawning application"); handles.push(tokio::spawn(input::task( store.clone(), - input_rx, + input_tx.subscribe(), obs_tx.clone(), ))); - handles.push(tokio::spawn(obs::task(store, obs_tx, obs_rx))); - handles.push(tokio::spawn(manager::task( - input_tx, manager_tx, manager_rx, + tracing::info!("Spawned input task"); + handles.push(tokio::spawn(obs::task( + store.clone(), + obs_tx.clone(), + obs_rx, ))); + tracing::info!("Spawned obs task"); + handles.push(tokio::spawn(manager::task( + input_tx, + deck_tx, + manager_tx.clone(), + manager_rx, + ))); + tracing::info!("Spawned manager task"); + + #[cfg(feature = "ipc-dbus")] + { + dbus::spawn(store, input_rx, deck_rx, obs_tx, manager_tx).await?; + tracing::info!("spawned dbus task"); + } + + tracing::info!("Spawned application"); tokio::select! { _ = shutdown.notified() => {}, diff --git a/streamdeck-daemon/src/manager.rs b/streamdeck-daemon/src/manager.rs index cc16bc9..fc78843 100644 --- a/streamdeck-daemon/src/manager.rs +++ b/streamdeck-daemon/src/manager.rs @@ -1,6 +1,6 @@ use crate::{ deck::Deck, - message::{InputMessage, ManagerMessage}, + message::{DeckMessage, InputMessage, ManagerMessage}, port::{DeckConfig, Port}, }; use std::{ @@ -10,13 +10,17 @@ use std::{ task::{Context, Poll}, }; use tokio::{ - sync::mpsc::{Receiver, Sender}, + sync::{ + broadcast, + mpsc::{Receiver, Sender}, + }, task::JoinHandle, time::Duration, }; struct Manager { - input_tx: Sender, + input_tx: broadcast::Sender, + deck_tx: Sender, tx: Sender, decks: HashMap, ports: HashMap>, @@ -27,10 +31,15 @@ struct Manager { } impl Manager { - fn new(input_tx: Sender, tx: Sender) -> Self { + fn new( + input_tx: broadcast::Sender, + tx: Sender, + deck_tx: Sender, + ) -> Self { let discovery = tokio::spawn(tick_task(tx.clone())); Manager { input_tx, + deck_tx, tx, decks: HashMap::default(), ports: HashMap::default(), @@ -76,9 +85,6 @@ impl Manager { let handle = tokio::spawn(find_task(self.tx.clone())); self.finding = Some(handle); } - ManagerMessage::List(sender) => { - let _ = sender.send(self.decks.values().cloned().collect()); - } ManagerMessage::Found(found_decks) => { let known = self.ports.keys().cloned().collect::>(); let found = found_decks.iter().cloned().collect::>(); @@ -109,15 +115,19 @@ impl Manager { } ManagerMessage::Opened(deck, port) => { tracing::info!("New deck: {}", deck.port_name); - let handle = tokio::spawn( - Deck { - manager: self.tx.clone(), - input: self.input_tx.clone(), - port, - config: deck.clone(), - } - .run(), - ); + let deck_tx = self.deck_tx.clone(); + let deck_fut = Deck { + manager: self.tx.clone(), + input: self.input_tx.clone(), + port, + config: deck.clone(), + } + .run(); + let deck2 = deck.clone(); + let handle = tokio::spawn(async move { + let _ = deck_tx.send(DeckMessage::Open(deck2)).await; + deck_fut.await + }); self.ports.insert(deck.port_name.clone(), handle); self.decks.insert(deck.port_name.clone(), deck); } @@ -128,18 +138,24 @@ impl Manager { if let Some(handle) = self.ports.remove(&port_name) { handle.abort(); } - self.decks.remove(&port_name); + if let Some(deck) = self.decks.remove(&port_name) { + let deck_tx = self.deck_tx.clone(); + tokio::spawn(async move { + let _ = deck_tx.send(DeckMessage::Close(deck.serial_number)).await; + }); + } } } } } pub(crate) async fn task( - input_tx: Sender, + input_tx: broadcast::Sender, + deck_tx: Sender, tx: Sender, mut rx: Receiver, ) { - let mut state = Manager::new(input_tx, tx); + let mut state = Manager::new(input_tx, tx, deck_tx); while let Some(msg) = rx.recv().await { state.turn(msg).await; diff --git a/streamdeck-daemon/src/message.rs b/streamdeck-daemon/src/message.rs index 2097284..3246964 100644 --- a/streamdeck-daemon/src/message.rs +++ b/streamdeck-daemon/src/message.rs @@ -13,9 +13,11 @@ pub(crate) enum ObsMessage { Disconnect, } -pub(crate) enum InputMessage { - Press(String, Input), - ReadInput(Sender<(String, Input)>), +pub(crate) type InputMessage = (String, Input); + +pub(crate) enum DeckMessage { + Open(DeckConfig), + Close(String), } #[derive(Debug)] @@ -23,7 +25,6 @@ pub(crate) enum ManagerMessage { EnableDiscovery, DisableDiscovery, Tick, - List(Sender>), Found(Vec), Opened(DeckConfig, Port), Closed(String), diff --git a/streamdeck-daemon/src/port.rs b/streamdeck-daemon/src/port.rs index a7fad17..89ddb49 100644 --- a/streamdeck-daemon/src/port.rs +++ b/streamdeck-daemon/src/port.rs @@ -51,7 +51,7 @@ impl Port { async fn serial_number(&mut self) -> anyhow::Result { let bytes = streamdeck_commands::identity(&mut self.0).await?; - Ok(base64::encode(&bytes)) + Ok(base16ct::lower::encode_string(&bytes)) } async fn product_name(&mut self) -> anyhow::Result { diff --git a/streamdeck-daemon/src/store.rs b/streamdeck-daemon/src/store.rs index 5e1a644..3c930db 100644 --- a/streamdeck-daemon/src/store.rs +++ b/streamdeck-daemon/src/store.rs @@ -1,10 +1,9 @@ use sled::{Db, IVec, Tree}; -use std::collections::HashMap; use streamdeck_common::Command; use crate::message::Input; -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct Store { commands: Tree, settings: Tree, @@ -13,6 +12,17 @@ pub(crate) struct Store { db: Db, } +impl std::fmt::Debug for Store { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Store") + .field("commands", &"Tree") + .field("settings", &"Tree") + .field("names", &"Tree") + .field("db", &"Db") + .finish() + } +} + impl Store { pub(crate) async fn build(db: Db) -> Result { tokio::task::spawn_blocking(move || { @@ -40,30 +50,21 @@ impl Store { Ok(()) } - pub(crate) async fn input_names( + pub(crate) async fn input_name( &self, deck: &str, - ) -> Result, anyhow::Error> { - let prefix = self.input_name_prefix(deck); - let prefix_len = prefix.len(); + input: Input, + ) -> anyhow::Result> { + let key = self.input_name_key(deck, input); let names = self.names.clone(); - let vec = tokio::task::spawn_blocking(move || { - names - .scan_prefix(prefix.clone()) - .filter_map(|res| { - let (key, value) = res.ok()?; + let value = tokio::task::spawn_blocking(move || names.get(key)).await??; - let input = Input::from_slice(&key[prefix_len..]); - - let value = String::from_utf8_lossy(&value).to_string(); - Some((input, value)) - }) - .collect() - }) - .await?; - - Ok(vec) + if let Some(value) = value { + Ok(Some(String::from_utf8_lossy(&value).to_string())) + } else { + Ok(None) + } } pub(crate) async fn set_deck_name(&self, deck: &str, name: &str) -> Result<(), anyhow::Error> { @@ -139,11 +140,11 @@ impl Store { Ok(()) } - pub(crate) async fn pressed( + pub(crate) async fn command( &self, deck: &str, input: Input, - ) -> Result, anyhow::Error> { + ) -> anyhow::Result> { let key = self.cmd_key(deck, input); let commands = self.commands.clone(); @@ -190,14 +191,6 @@ impl Store { IVec::from(key) } - fn input_name_prefix(&self, deck: &str) -> IVec { - let mut key = b"names/".to_vec(); - key.extend(deck.as_bytes()); - key.push(b'/'); - - IVec::from(key) - } - fn input_name_key(&self, deck: &str, input: Input) -> IVec { let mut key = b"names/".to_vec(); key.extend(deck.as_bytes());