From a6d11a9f1f7c0a706a5bef997c431faa87d1fa67 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 5 May 2021 18:55:47 -0500 Subject: [PATCH] Update with auth support --- Cargo.lock | 31 +++++++++++++ Cargo.toml | 3 +- src/dbus.rs | 24 +++++----- src/deck.rs | 2 +- src/input.rs | 2 + src/main.rs | 17 +++++-- src/obs.rs | 128 ++++++++++++++++++++++++++++++++++++++------------- src/store.rs | 45 ++++++++++++++---- 8 files changed, 195 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14fd30e..c774a03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,6 +229,26 @@ dependencies = [ "generic-array", ] +[[package]] +name = "directories" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e69600ff1703123957937708eb27f7a564e48885c537782722ed0ba3189ce1d7" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d86534ed367a67548dc68113a0f5db55432fdfbb6e6f9d77704397d95d5780" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "either" version = "1.6.1" @@ -811,6 +831,16 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" +dependencies = [ + "getrandom", + "redox_syscall", +] + [[package]] name = "regex" version = "1.4.5" @@ -1013,6 +1043,7 @@ dependencies = [ "dbus", "dbus-crossroads", "dbus-tokio", + "directories", "env_logger", "log", "obws", diff --git a/Cargo.toml b/Cargo.toml index 0e8e239..c629cbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,9 @@ ipc-dbus = ["dbus", "dbus-tokio", "dbus-crossroads"] [dependencies] anyhow = "1" -log = "0.4.0" +directories = "3.0" env_logger = "0.8.0" +log = "0.4.0" obws = { version = "0.7.0", path = "../obws" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/dbus.rs b/src/dbus.rs index 0fa3a1e..06353c1 100644 --- a/src/dbus.rs +++ b/src/dbus.rs @@ -79,7 +79,7 @@ impl Dbus { let iface_token = cr.register("dog.asonix.git.asonix.Streamdeck", |b| { b.method_with_cr_async("GetScenes", (), ("scenes",), |mut ctx, cr, ()| { - log::info!("GetScenes"); + log::debug!("GetScenes"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let obs = state.obs.clone(); @@ -101,7 +101,7 @@ impl Dbus { }); b.method_with_cr_async("EnableDiscovery", (), (), |mut ctx, cr, ()| { - log::info!("EnableDiscovery"); + log::debug!("EnableDiscovery"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let manager = state.manager.clone(); @@ -116,7 +116,7 @@ impl Dbus { }); b.method_with_cr_async("DisableDiscovery", (), (), |mut ctx, cr, ()| { - log::info!("DisableDiscovery"); + log::debug!("DisableDiscovery"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let manager = state.manager.clone(); @@ -131,7 +131,7 @@ impl Dbus { }); b.method_with_cr_async("GetDecks", (), ("decks",), |mut ctx, cr, ()| { - log::info!("GetDecks"); + log::debug!("GetDecks"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let manager = state.manager.clone(); @@ -157,7 +157,7 @@ impl Dbus { ("host", "port"), ("state",), |mut ctx, cr, (host, port): (String, u16)| { - log::info!("Connect"); + log::debug!("Connect"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let obs = state.obs.clone(); @@ -179,7 +179,7 @@ impl Dbus { ); b.method_with_cr_async("Disconnect", (), ("state",), |mut ctx, cr, ()| { - log::info!("Disconnect"); + log::debug!("Disconnect"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let obs = state.obs.clone(); @@ -200,7 +200,7 @@ impl Dbus { }); b.method_with_cr_async("GetState", (), ("state",), |mut ctx, cr, ()| { - log::info!("GetState"); + log::debug!("GetState"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let obs = state.obs.clone(); @@ -223,7 +223,7 @@ impl Dbus { ("password",), ("state",), |mut ctx, cr, (password,): (String,)| { - log::info!("Login"); + log::debug!("Login"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let obs = state.obs.clone(); @@ -249,7 +249,7 @@ impl Dbus { ("serial_number",), ("commands",), |mut ctx, cr, (serial_number,): (String,)| { - log::info!("GetCommands"); + log::debug!("GetCommands"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let store = state.store.clone(); @@ -272,7 +272,7 @@ impl Dbus { ); b.method_with_cr_async("ReadInput", (), ("input",), |mut ctx, cr, ()| { - log::info!("ReadInput"); + log::debug!("ReadInput"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let input = state.input.clone(); @@ -295,7 +295,7 @@ impl Dbus { ("serial_number", "key", "command"), (), |mut ctx, cr, (serial_number, key, command): (String, u8, String)| { - log::info!("SetInput"); + log::debug!("SetInput"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let store = state.store.clone(); @@ -317,7 +317,7 @@ impl Dbus { ("serial_number", "key"), (), |mut ctx, cr, (serial_number, key): (String, u8)| { - log::info!("UnsetInput"); + log::debug!("UnsetInput"); let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap(); let store = state.store.clone(); diff --git a/src/deck.rs b/src/deck.rs index 7dda5ce..586085b 100644 --- a/src/deck.rs +++ b/src/deck.rs @@ -53,7 +53,7 @@ impl Deck { .await; } Err(e) => { - log::debug!("Error reading: {}", e); + log::trace!("Error reading: {}", e); if let Ok(e) = e.downcast::() { if matches!(e.kind(), std::io::ErrorKind::BrokenPipe) { break; diff --git a/src/input.rs b/src/input.rs index 885f19f..820315d 100644 --- a/src/input.rs +++ b/src/input.rs @@ -13,6 +13,8 @@ pub(crate) async fn task(store: Store, mut rx: Receiver, obs_tx: S while let Some(msg) = rx.recv().await { match msg { InputMessage::Press(serial_number, key) => { + log::debug!("{}: {}", serial_number, key); + if let Some(sender) = sender_opt.take() { let _ = sender.send((serial_number, key)); } else { diff --git a/src/main.rs b/src/main.rs index 9503c64..544a79f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use directories::ProjectDirs; use std::sync::Arc; use tokio::sync::Notify; @@ -24,7 +25,13 @@ async fn main() -> Result<(), anyhow::Error> { let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); let (obs_tx, obs_rx) = tokio::sync::mpsc::channel(16); - let db = sled::Config::new().temporary(true).open()?; + let project_dirs = ProjectDirs::from("dog", "asonix", "Streamdeck") + .ok_or_else(|| anyhow::anyhow!("No home directory found"))?; + let mut sled_dir = project_dirs.data_local_dir().to_owned(); + sled_dir.push("sled"); + sled_dir.push("db-0.34.6"); + + let db = sled::Config::new().path(sled_dir).open()?; let store = Store::build(db).await?; let shutdown = Arc::new(Notify::new()); @@ -45,8 +52,12 @@ async fn main() -> Result<(), anyhow::Error> { .await; } - handles.push(tokio::spawn(input::task(store, input_rx, obs_tx))); - handles.push(tokio::spawn(obs::task(obs_rx))); + handles.push(tokio::spawn(input::task( + store.clone(), + input_rx, + obs_tx.clone(), + ))); + handles.push(tokio::spawn(obs::task(store, obs_tx, obs_rx))); handles.push(tokio::spawn(manager::task( input_tx, manager_tx, manager_rx, ))); diff --git a/src/obs.rs b/src/obs.rs index 6b3db6a..7828a75 100644 --- a/src/obs.rs +++ b/src/obs.rs @@ -1,8 +1,12 @@ -use crate::message::{Command, ObsMessage, Query, State}; +use crate::{ + message::{Command, ObsMessage, Query, State}, + store::Store, +}; use obws::Client; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{Receiver, Sender}; pub(crate) struct Obs { + store: Store, inner: Inner, } @@ -17,31 +21,79 @@ struct Disconnected; struct Unauthenticated { client: Option, - challenge: Option, - salt: Option, } struct Connected { client: Option, } -pub(crate) async fn task(rx: Receiver) { - Obs::new().run(rx).await +pub(crate) async fn task(store: Store, tx: Sender, mut rx: Receiver) { + let store2 = store.clone(); + tokio::task::spawn(async move { + if let Err(e) = connect_obs(store2, tx).await { + log::info!("Failed to connect to OBS: {}", e); + } + }); + + let mut obs = Obs { + store, + inner: Inner::Disconnected(Disconnected), + }; + + while let Some(msg) = rx.recv().await { + obs.handle(msg).await; + } +} + +async fn connect_obs(store: Store, tx: Sender) -> Result<(), anyhow::Error> { + let host = store + .setting("obs-host") + .await? + .ok_or_else(|| anyhow::anyhow!("No key 'obs-host'"))?; + let port = store + .setting("obs-port") + .await? + .ok_or_else(|| anyhow::anyhow!("No key 'obs-port'"))?; + + if tx.send(ObsMessage::Connect(host, port)).await.is_err() { + return Err(anyhow::anyhow!("Failed to send connect message")); + } + + let (sender, rx) = tokio::sync::oneshot::channel(); + + if tx.send(ObsMessage::State(sender)).await.is_err() { + return Err(anyhow::anyhow!("Failed to send state message")); + } + + match rx.await? { + State::Disconnected => return Err(anyhow::anyhow!("Failed to connect to obs")), + State::Connected => return Ok(()), + State::Unauthenticated => { + let password = store + .setting("obs-password") + .await? + .ok_or_else(|| anyhow::anyhow!("No stored password"))?; + + if tx.send(ObsMessage::Authenticate(password)).await.is_err() { + return Err(anyhow::anyhow!("Failed to send authenticate message")); + } + } + } + + let (sender, rx) = tokio::sync::oneshot::channel(); + + if tx.send(ObsMessage::State(sender)).await.is_err() { + return Err(anyhow::anyhow!("Failed to send state message")); + } + + if !matches!(rx.await?, State::Connected) { + return Err(anyhow::anyhow!("Failed to authenticate with OBS")); + } + + Ok(()) } impl Obs { - fn new() -> Self { - Obs { - inner: Inner::Disconnected(Disconnected), - } - } - - async fn run(mut self, mut rx: Receiver) { - while let Some(msg) = rx.recv().await { - self.handle(msg).await - } - } - async fn handle(&mut self, message: ObsMessage) { match message { ObsMessage::State(sender) => { @@ -54,17 +106,17 @@ impl Obs { let _ = sender.send(state); } message => { - self.inner.handle(message).await; + self.inner.handle(message, &self.store).await; } } } } impl Inner { - async fn handle(&mut self, message: ObsMessage) { + async fn handle(&mut self, message: ObsMessage, store: &Store) { let new_state = match self { - Inner::Disconnected(disconnected) => disconnected.handle(message).await, - Inner::Unauthenticated(unauthenticated) => unauthenticated.handle(message).await, + Inner::Disconnected(disconnected) => disconnected.handle(message, store).await, + Inner::Unauthenticated(unauthenticated) => unauthenticated.handle(message, store).await, Inner::Connected(connected) => connected.handle(message).await, }; @@ -75,10 +127,10 @@ impl Inner { } impl Disconnected { - async fn handle(&mut self, message: ObsMessage) -> Option { + async fn handle(&mut self, message: ObsMessage, store: &Store) -> Option { match message { ObsMessage::Connect(host, port) => self - .connect(host, port) + .connect(host, port, store) .await .map_err(|e| log::error!("{}", e)) .ok(), @@ -86,16 +138,22 @@ impl Disconnected { } } - async fn connect(&mut self, host: String, port: u16) -> Result { + async fn connect( + &mut self, + host: String, + port: u16, + store: &Store, + ) -> Result { log::info!("Connecting to {}:{}", host, port); - let client = Client::connect(host, port).await?; + let client = Client::connect(host.clone(), port).await?; let auth_required = client.general().get_auth_required().await?; + store.save_setting("obs-host", host).await?; + store.save_setting("obs-port", port).await?; + if auth_required.auth_required { return Ok(Inner::Unauthenticated(Unauthenticated { client: Some(client), - challenge: auth_required.challenge, - salt: auth_required.salt, })); } @@ -106,16 +164,20 @@ impl Disconnected { } impl Unauthenticated { - async fn handle(&mut self, message: ObsMessage) -> Option { + async fn handle(&mut self, message: ObsMessage, store: &Store) -> Option { match message { - ObsMessage::Authenticate(password) => self.authenticate(password).await.ok(), + ObsMessage::Authenticate(password) => self.authenticate(password, store).await.ok(), _ => None, } } - async fn authenticate(&mut self, password: String) -> Result { + async fn authenticate( + &mut self, + password: String, + store: &Store, + ) -> Result { if let Some(client) = self.client.as_ref() { - if let Err(e) = client.login(Some(password)).await { + if let Err(e) = client.login(Some(password.clone())).await { if client.general().get_version().await.is_ok() { return Err(e.into()); } else { @@ -124,6 +186,8 @@ impl Unauthenticated { } } + store.save_setting("obs-password", password).await?; + Ok(Inner::Connected(Connected { client: self.client.take(), })) diff --git a/src/store.rs b/src/store.rs index b1b41e3..d363f39 100644 --- a/src/store.rs +++ b/src/store.rs @@ -4,20 +4,49 @@ use sled::{Db, IVec, Tree}; #[derive(Clone, Debug)] pub(crate) struct Store { commands: Tree, + settings: Tree, db: Db, } impl Store { pub(crate) async fn build(db: Db) -> Result { - let db2 = db.clone(); - - Ok(Store { - commands: tokio::task::spawn_blocking(move || { - db2.open_tree("dog.asonix.git.asonix.streamdeck/commands") - }) - .await??, - db, + tokio::task::spawn_blocking(move || { + Ok(Store { + commands: db.open_tree("dog.asonix.git.asonix.streamdeck/commands")?, + settings: db.open_tree("dog.asonix.git.asonix.streamdeck/settings")?, + db, + }) as Result }) + .await? + } + + pub(crate) async fn setting(&self, key: &str) -> Result, anyhow::Error> + where + T: serde::de::DeserializeOwned, + { + let key = key.to_owned(); + let settings = self.settings.clone(); + + let opt = tokio::task::spawn_blocking(move || settings.get(key)).await??; + + if let Some(ivec) = opt { + return Ok(Some(serde_json::from_slice(&ivec)?)); + } + + Ok(None) + } + + pub(crate) async fn save_setting(&self, key: &str, value: T) -> Result<(), anyhow::Error> + where + T: serde::Serialize, + { + let key = key.to_owned(); + let value = serde_json::to_vec(&value)?; + let settings = self.settings.clone(); + + tokio::task::spawn_blocking(move || settings.insert(key, value)).await??; + + Ok(()) } pub(crate) async fn unset(&self, deck: &str, input: u8) -> Result<(), anyhow::Error> {