streamdeck-workspace/streamdeck-gtk/src/daemon.rs

1142 lines
33 KiB
Rust

use async_io::Timer;
use event_listener::{Event, EventListener};
use futures_channel::{mpsc::Receiver, oneshot::Sender};
use futures_core::stream::Stream;
use futures_util::{sink::SinkExt, stream::StreamExt};
use once_cell::sync::Lazy;
use std::{
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::{Arc, Mutex, Weak},
task::{Context, Poll},
time::{Duration, Instant},
};
use streamdeck_common::{Command, ObsState, Query, QueryResponse, Scene, SceneItem};
use zbus::{dbus_proxy, Connection, Result};
pub(crate) async fn state_management() {
let new_deck_stream = Handle::current().state().added_deck_stream();
glib::MainContext::default().spawn_local(async move {
futures_util::pin_mut!(new_deck_stream);
while let Some(deck_state) = new_deck_stream.next().await {
glib::MainContext::default().spawn_local(deck_management(deck_state));
}
});
let mut interval = Timer::interval(Duration::from_secs(3));
loop {
cache_decks();
cache_obs_state();
let _ = interval.next().await;
}
}
async fn deck_management(deck_state: DeckState) {
let weak = Arc::downgrade(&deck_state.inner);
drop(deck_state);
let mut interval = Timer::interval(Duration::from_secs(3));
while let Some(inner) = weak.upgrade() {
let deck_state = DeckState { inner };
cache_deck_name(deck_state.serial_number());
cache_commands(deck_state.serial_number());
let _ = interval.next().await;
}
}
pub(crate) struct EventStream {
listener: Option<EventListener>,
event: Weak<Event>,
}
#[derive(Clone)]
pub(crate) struct State {
inner: Arc<Mutex<StateInner>>,
}
#[derive(Default)]
struct StateInner {
state: ObsState,
state_event: Arc<Event>,
decks: HashMap<String, DeckState>,
added_deck_txs: Vec<futures_channel::mpsc::Sender<String>>,
removed_deck_txs: Vec<futures_channel::mpsc::Sender<String>>,
}
#[derive(Clone)]
pub(crate) struct DeckState {
inner: Arc<Mutex<DeckStateInner>>,
}
struct DeckStateInner {
info: DeckInfo,
name_event: Arc<Event>,
commands: HashMap<u8, CommandState>,
added_command_txs: Vec<futures_channel::mpsc::Sender<u8>>,
removed_command_txs: Vec<futures_channel::mpsc::Sender<u8>>,
}
#[derive(Clone)]
pub(crate) struct CommandState {
inner: Arc<Mutex<CommandStateInner>>,
}
struct CommandStateInner {
key: u8,
command: Command,
command_event: Arc<Event>,
name: Option<String>,
name_event: Arc<Event>,
}
#[derive(Clone, Debug)]
pub(crate) struct DeckInfo {
pub(crate) serial_number: String,
pub(crate) device_name: String,
pub(crate) port_name: String,
}
#[derive(Clone, Debug)]
pub(crate) struct CommandInfo {
pub(crate) key: u8,
pub(crate) command: Command,
}
#[derive(Clone, Debug)]
pub(crate) struct ReadInput {
pub(crate) key: u8,
pub(crate) serial_number: String,
}
#[derive(Clone, Debug)]
pub(crate) struct InputName {
pub(crate) key: u8,
pub(crate) name: String,
}
#[derive(Clone, Debug)]
pub(crate) struct Handle {
_arc: Arc<DropLog<Daemon>>,
tx: futures_channel::mpsc::Sender<DBusMessage>,
}
#[derive(Clone, Debug)]
struct Daemon {
tx: futures_channel::mpsc::Sender<DBusMessage>,
state: State,
}
#[derive(Clone, Debug)]
struct DropLog<T> {
inner: T,
name: &'static str,
}
/// Create an inner type T when it is asked for, no sooner, AND, if the type is no longer held by
/// any other parties, allow it to be dropped (deallocated) and next time T is asked for, create a
/// new one
#[derive(Clone)]
struct OnDemand<T> {
inner: Arc<Mutex<Weak<T>>>,
init: Arc<dyn Fn() -> T + Send + Sync>,
}
#[derive(Debug)]
enum DBusMessage {
Query(Sender<String>, String),
Test(String),
EnableDiscovery,
DisableDiscovery,
GetDecks(Sender<Vec<(String, String, String)>>),
Connect(String, u16, Sender<String>),
Disconnect(Sender<String>),
GetState(Sender<String>),
Login(String, Sender<String>),
GetCommands(String, Sender<Vec<(u8, String)>>),
ReadInput(Sender<Vec<(u8, String)>>),
SetInput(String, u8, String),
UnsetInput(String, u8),
SetInputName(String, u8, String),
GetInputNames(String, Sender<Vec<(u8, String)>>),
SetDeckName(String, String),
GetDeckName(String, Sender<String>),
}
#[dbus_proxy(
default_service = "dog.asonix.git.asonix.StreamdeckDaemon",
interface = "dog.asonix.git.asonix.StreamdeckDaemon",
default_path = "/dog/asonix/git/asonix/StreamdeckDaemon"
)]
trait StreamdeckDaemon {
fn query(&self, query: &str) -> Result<String>;
fn test(&self, command: &str) -> Result<()>;
fn enable_discovery(&self) -> Result<()>;
fn disable_discovery(&self) -> Result<()>;
fn get_decks(&self) -> Result<Vec<(String, String, String)>>;
fn connect(&self, host: &str, port: u16) -> Result<String>;
fn disconnect(&self) -> Result<String>;
fn get_state(&self) -> Result<String>;
fn login(&self, password: &str) -> Result<String>;
fn get_commands(&self, serial_number: &str) -> Result<Vec<(u8, String)>>;
fn read_input(&self) -> Result<Vec<(u8, String)>>;
fn set_input(&self, serial_number: &str, key: u8, command: &str) -> Result<()>;
fn unset_input(&self, serial_number: &str, key: u8) -> Result<()>;
fn set_input_name(&self, serial_number: &str, key: u8, name: &str) -> Result<()>;
fn get_input_names(&self, serial_number: &str) -> Result<Vec<(u8, String)>>;
fn set_deck_name(&self, serial_number: &str, name: &str) -> Result<()>;
fn get_deck_name(&self, serial_number: &str) -> Result<String>;
}
impl State {
pub(crate) fn new() -> Self {
State {
inner: Arc::new(Mutex::new(StateInner {
state: ObsState::Disconnected,
..StateInner::default()
})),
}
}
pub(crate) fn decks(&self) -> Vec<DeckState> {
self.inner.lock().unwrap().decks.values().cloned().collect()
}
pub(crate) fn deck_state(&self, serial_number: &str) -> Option<DeckState> {
let inner = self.inner.lock().unwrap();
inner.decks.get(serial_number).map(Clone::clone)
}
pub(crate) fn obs_state(&self) -> ObsState {
self.inner.lock().unwrap().state
}
pub(crate) fn obs_state_stream(self) -> impl Stream<Item = ObsState> {
let mut event_stream = {
let inner = self.inner.lock().unwrap();
EventStream::new(&inner.state_event)
};
async_stream::stream! {
while let Some(_) = event_stream.next().await {
yield self.obs_state();
}
}
}
pub(crate) fn added_deck_stream(self) -> impl Stream<Item = DeckState> {
let (tx, mut rx) = futures_channel::mpsc::channel(16);
{
let mut inner = self.inner.lock().unwrap();
inner.added_deck_txs.push(tx);
}
let weak = Arc::downgrade(&self.inner);
drop(self);
async_stream::stream! {
while let Some(serial_number) = rx.next().await {
if let Some(inner) = weak.upgrade() {
let state = State { inner };
if let Some(deck_state) = state.deck_state(&serial_number) {
yield deck_state;
}
} else {
break;
}
}
}
}
pub(crate) fn removed_deck_stream(self) -> impl Stream<Item = String> {
let (tx, rx) = futures_channel::mpsc::channel(16);
let mut inner = self.inner.lock().unwrap();
inner.removed_deck_txs.push(tx);
rx
}
fn update_obs_state(&self, state: ObsState) {
let mut inner = self.inner.lock().unwrap();
if inner.state != state {
inner.state = state;
inner.state_event.notify(usize::MAX);
}
}
fn update_decks(&self, decks: Vec<DeckInfo>) {
let mut decks = decks
.iter()
.map(|deck_info| (deck_info.serial_number.clone(), deck_info.clone()))
.collect::<HashMap<_, _>>();
let new_decks = decks.keys().cloned().collect::<HashSet<String>>();
let mut inner = self.inner.lock().unwrap();
let current_decks = inner.decks.keys().cloned().collect::<HashSet<String>>();
for added_deck in new_decks.difference(&current_decks) {
if let Some(deck) = decks.remove(added_deck) {
let deck_state = DeckState::from_deck_info(deck);
inner.decks.insert(added_deck.to_owned(), deck_state);
let mut new_senders = Vec::new();
for mut sender in inner.added_deck_txs.drain(..) {
if sender.is_closed() {
continue;
}
let _ = sender.try_send(added_deck.to_owned());
new_senders.push(sender);
}
inner.added_deck_txs = new_senders;
}
}
for removed_deck in current_decks.difference(&new_decks) {
inner.decks.remove(removed_deck);
let mut new_senders = Vec::new();
for mut sender in inner.removed_deck_txs.drain(..) {
if sender.is_closed() {
continue;
}
let _ = sender.try_send(removed_deck.to_owned());
new_senders.push(sender);
}
inner.removed_deck_txs = new_senders;
}
}
}
impl DeckState {
pub(crate) fn commands(&self) -> Vec<CommandState> {
self.inner
.lock()
.unwrap()
.commands
.values()
.cloned()
.collect()
}
pub(crate) fn command_state(&self, key: u8) -> Option<CommandState> {
self.inner
.lock()
.unwrap()
.commands
.get(&key)
.map(Clone::clone)
}
pub(crate) fn serial_number(&self) -> String {
self.inner.lock().unwrap().info.serial_number.clone()
}
pub(crate) fn device_name(&self) -> String {
self.inner.lock().unwrap().info.device_name.clone()
}
pub(crate) fn port_name(&self) -> String {
self.inner.lock().unwrap().info.port_name.clone()
}
pub(crate) fn device_name_stream(self) -> impl Stream<Item = String> {
let mut event_stream = {
let inner = self.inner.lock().unwrap();
EventStream::new(&inner.name_event)
};
let inner = Arc::downgrade(&self.inner);
async_stream::stream! {
while let Some(_) = event_stream.next().await {
if let Some(inner) = inner.upgrade() {
let name = {
let inner = inner.lock().unwrap();
inner.info.device_name.clone()
};
yield name;
} else {
break;
}
}
}
}
pub(crate) fn added_command_stream(self) -> impl Stream<Item = CommandState> {
let (tx, mut rx) = futures_channel::mpsc::channel(16);
{
let mut inner = self.inner.lock().unwrap();
inner.added_command_txs.push(tx);
}
let weak = Arc::downgrade(&self.inner);
drop(self);
async_stream::stream! {
while let Some(key) = rx.next().await {
if let Some(inner) = weak.upgrade() {
let deck_state = DeckState { inner };
if let Some(command_state) = deck_state.command_state(key) {
yield command_state;
}
} else {
break;
}
}
}
}
pub(crate) fn removed_command_stream(self) -> impl Stream<Item = u8> {
let (tx, rx) = futures_channel::mpsc::channel(16);
let mut inner = self.inner.lock().unwrap();
inner.removed_command_txs.push(tx);
rx
}
fn from_deck_info(deck_info: DeckInfo) -> Self {
DeckState {
inner: Arc::new(Mutex::new(DeckStateInner {
info: deck_info,
name_event: Arc::new(Event::new()),
commands: HashMap::new(),
added_command_txs: Vec::new(),
removed_command_txs: Vec::new(),
})),
}
}
fn update_device_name(&self, name: String) {
let mut inner = self.inner.lock().unwrap();
inner.info.device_name = name;
inner.name_event.notify(usize::MAX);
}
fn update_commands(&self, commands: Vec<CommandInfo>) {
let new_keys = commands.iter().map(|ci| ci.key).collect::<HashSet<u8>>();
let mut inner = self.inner.lock().unwrap();
let current_keys = inner.commands.keys().cloned().collect::<HashSet<u8>>();
let mut infos = commands
.into_iter()
.map(|ci| (ci.key, ci.command))
.collect::<HashMap<u8, Command>>();
for added_key in new_keys.difference(&current_keys) {
if let Some(command) = infos.remove(added_key) {
let command_state = CommandState::from_command(*added_key, command);
inner.commands.insert(*added_key, command_state);
let mut new_senders = Vec::new();
for mut sender in inner.added_command_txs.drain(..) {
if sender.is_closed() {
continue;
}
let _ = sender.try_send(*added_key);
new_senders.push(sender);
}
inner.added_command_txs = new_senders;
}
}
for removed_key in current_keys.difference(&new_keys) {
inner.commands.remove(removed_key);
let mut new_senders = Vec::new();
for mut sender in inner.removed_command_txs.drain(..) {
if sender.is_closed() {
continue;
}
let _ = sender.try_send(*removed_key);
new_senders.push(sender);
}
inner.removed_command_txs = new_senders;
}
for (key, command) in infos {
if let Some(command_state) = inner.commands.get(&key) {
command_state.update_command(command);
}
}
}
}
impl CommandState {
pub(crate) fn key(&self) -> u8 {
self.inner.lock().unwrap().key
}
pub(crate) fn command(&self) -> Command {
self.inner.lock().unwrap().command.clone()
}
pub(crate) fn command_stream(self) -> impl Stream<Item = Command> {
let mut event_stream = {
let inner = self.inner.lock().unwrap();
EventStream::new(&inner.command_event)
};
let inner = Arc::downgrade(&self.inner);
async_stream::stream! {
while let Some(_) = event_stream.next().await {
if let Some(inner) = inner.upgrade() {
let command = {
let inner = inner.lock().unwrap();
inner.command.clone()
};
yield command;
} else {
break;
}
}
}
}
pub(crate) fn name(&self) -> String {
let inner = self.inner.lock().unwrap();
inner.name.clone().unwrap_or(format!("{}", inner.key))
}
pub(crate) fn name_stream(self) -> impl Stream<Item = String> {
let mut event_stream = {
let inner = self.inner.lock().unwrap();
EventStream::new(&inner.name_event)
};
let inner = Arc::downgrade(&self.inner);
async_stream::stream! {
while let Some(_) = event_stream.next().await {
if let Some(inner) = inner.upgrade() {
let name = {
let inner = inner.lock().unwrap();
inner.name.clone()
};
if let Some(name) = name {
yield name;
}
} else {
break;
}
}
}
}
fn from_command(key: u8, command: Command) -> Self {
CommandState {
inner: Arc::new(Mutex::new(CommandStateInner {
key,
command,
command_event: Arc::new(Event::new()),
name: None,
name_event: Arc::new(Event::new()),
})),
}
}
fn update_command(&self, command: Command) {
let mut inner = self.inner.lock().unwrap();
inner.command = command;
inner.command_event.notify(usize::MAX);
}
fn update_name(&self, name: String) {
let mut inner = self.inner.lock().unwrap();
inner.name = Some(name);
inner.name_event.notify(usize::MAX);
}
}
impl Handle {
pub(crate) fn current() -> Self {
static DAEMON: Lazy<OnDemand<DropLog<Daemon>>> = Lazy::new(|| {
OnDemand::new(|| {
DropLog::new(
Daemon {
tx: connect(),
state: State::new(),
},
"Daemon",
)
})
});
let _arc = (&DAEMON).get();
Handle {
tx: _arc.tx.clone(),
_arc,
}
}
pub(crate) async fn get_scenes(&mut self) -> anyhow::Result<Vec<Scene>> {
let (tx, rx) = futures_channel::oneshot::channel();
let query = serde_json::to_string(&Query::GetScenes)?;
self.tx.send(DBusMessage::Query(tx, query)).await?;
let response = rx.await?;
let response: QueryResponse = serde_json::from_str(&response)?;
// TODO: update OBS related messages to update OBS state
match response {
QueryResponse::Scenes { scenes } => Ok(scenes),
_ => Err(anyhow::anyhow!("Wrong response type")),
}
}
pub(crate) async fn get_scene_items(
&mut self,
scene_name: String,
) -> anyhow::Result<Vec<SceneItem>> {
let (tx, rx) = futures_channel::oneshot::channel();
let query = serde_json::to_string(&Query::GetSceneItems { scene_name })?;
self.tx.send(DBusMessage::Query(tx, query)).await?;
let response = rx.await?;
let response: QueryResponse = serde_json::from_str(&response)?;
// TODO: update OBS related messages to update OBS state
match response {
QueryResponse::SceneItems { items } => Ok(items),
_ => Err(anyhow::anyhow!("Wrong response type")),
}
}
pub(crate) async fn get_scene_item(
&mut self,
scene_name: String,
item_id: i64,
) -> anyhow::Result<SceneItem> {
let (tx, rx) = futures_channel::oneshot::channel();
let query = serde_json::to_string(&Query::GetSceneItem {
scene_name,
item_id,
})?;
self.tx.send(DBusMessage::Query(tx, query)).await?;
let response = rx.await?;
let response: QueryResponse = serde_json::from_str(&response)?;
// TODO: update OBS related messages to update OBS state
match response {
QueryResponse::SceneItem { item } => Ok(item),
_ => Err(anyhow::anyhow!("Wrong response type")),
}
}
pub(crate) async fn test(&mut self, command: &Command) -> anyhow::Result<()> {
let command = serde_json::to_string(command)?;
self.tx.send(DBusMessage::Test(command)).await?;
Ok(())
}
pub(crate) async fn enable_discovery(&mut self) -> anyhow::Result<()> {
self.tx.send(DBusMessage::EnableDiscovery).await?;
Ok(())
}
pub(crate) async fn disable_discovery(&mut self) -> anyhow::Result<()> {
self.tx.send(DBusMessage::DisableDiscovery).await?;
Ok(())
}
pub(crate) fn state(&self) -> State {
self._arc.state.clone()
}
async fn get_decks(&mut self) -> anyhow::Result<Vec<DeckInfo>> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::GetDecks(tx)).await?;
Ok(rx
.await?
.into_iter()
.map(|(serial_number, device_name, port_name)| DeckInfo {
serial_number,
device_name,
port_name,
})
.collect::<Vec<_>>())
}
pub(crate) async fn connect(&mut self, host: String, port: u16) -> anyhow::Result<ObsState> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::Connect(host, port, tx)).await?;
let state = rx.await?.parse()?;
self.state().update_obs_state(state);
Ok(state)
}
pub(crate) async fn disconnect(&mut self) -> anyhow::Result<ObsState> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::Disconnect(tx)).await?;
let state = rx.await?.parse()?;
self.state().update_obs_state(state);
Ok(state)
}
async fn get_state(&mut self) -> anyhow::Result<ObsState> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::GetState(tx)).await?;
let state = rx.await?.parse()?;
self.state().update_obs_state(state);
Ok(state)
}
pub(crate) async fn login(&mut self, password: String) -> anyhow::Result<ObsState> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::Login(password, tx)).await?;
let state = rx.await?.parse()?;
self.state().update_obs_state(state);
Ok(state)
}
async fn get_commands(&mut self, serial_number: String) -> anyhow::Result<Vec<CommandInfo>> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx
.send(DBusMessage::GetCommands(serial_number, tx))
.await?;
Ok(rx
.await?
.into_iter()
.map(|(key, command)| {
Ok(CommandInfo {
key,
command: serde_json::from_str(&command)?,
})
})
.collect::<anyhow::Result<Vec<_>>>()?)
}
pub(crate) async fn read_input(&mut self) -> anyhow::Result<Vec<ReadInput>> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx.send(DBusMessage::ReadInput(tx)).await?;
Ok(rx
.await?
.into_iter()
.map(|(key, serial_number)| ReadInput { key, serial_number })
.collect::<Vec<_>>())
}
pub(crate) async fn set_input(
&mut self,
serial_number: String,
key: u8,
command: &Command,
) -> anyhow::Result<()> {
let command = serde_json::to_string(command)?;
self.tx
.send(DBusMessage::SetInput(serial_number.clone(), key, command))
.await?;
cache_commands(serial_number);
Ok(())
}
pub(crate) async fn unset_input(
&mut self,
serial_number: String,
key: u8,
) -> anyhow::Result<()> {
self.tx
.send(DBusMessage::UnsetInput(serial_number.clone(), key))
.await?;
cache_commands(serial_number);
Ok(())
}
pub(crate) async fn set_input_name(
&mut self,
serial_number: String,
key: u8,
name: String,
) -> anyhow::Result<()> {
self.tx
.send(DBusMessage::SetInputName(serial_number.clone(), key, name))
.await?;
cache_input_names(serial_number);
Ok(())
}
async fn get_input_names(&mut self, serial_number: String) -> anyhow::Result<Vec<InputName>> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx
.send(DBusMessage::GetInputNames(serial_number, tx))
.await?;
Ok(rx
.await?
.into_iter()
.map(|(key, name)| InputName { key, name })
.collect::<Vec<_>>())
}
pub(crate) async fn set_deck_name(
&mut self,
serial_number: String,
name: String,
) -> anyhow::Result<()> {
self.tx
.send(DBusMessage::SetDeckName(serial_number.clone(), name))
.await?;
cache_deck_name(serial_number);
Ok(())
}
async fn get_deck_name(&mut self, serial_number: String) -> anyhow::Result<String> {
let (tx, rx) = futures_channel::oneshot::channel();
self.tx
.send(DBusMessage::GetDeckName(serial_number, tx))
.await?;
Ok(rx.await?)
}
}
fn cache_decks() {
glib::MainContext::default().spawn_local(async move {
if let Ok(decks) = Handle::current().get_decks().await {
Handle::current().state().update_decks(decks);
}
});
}
fn cache_obs_state() {
glib::MainContext::default().spawn_local(async move {
if let Ok(obs_state) = Handle::current().get_state().await {
Handle::current().state().update_obs_state(obs_state);
}
});
}
fn cache_commands(serial_number: String) {
glib::MainContext::default().spawn_local(async move {
if let Ok(commands) = Handle::current().get_commands(serial_number.clone()).await {
if let Some(deck_state) = Handle::current().state().deck_state(&serial_number) {
deck_state.update_commands(commands);
}
}
cache_input_names(serial_number);
});
}
fn cache_input_names(serial_number: String) {
glib::MainContext::default().spawn_local(async move {
if let Ok(names) = Handle::current()
.get_input_names(serial_number.clone())
.await
{
if let Some(deck_state) = Handle::current().state().deck_state(&serial_number) {
for InputName { key, name } in names {
if let Some(command_state) = deck_state.command_state(key) {
command_state.update_name(name);
}
}
}
}
});
}
fn cache_deck_name(serial_number: String) {
glib::MainContext::default().spawn_local(async move {
if let Ok(name) = Handle::current().get_deck_name(serial_number.clone()).await {
if let Some(deck_state) = Handle::current().state().deck_state(&serial_number) {
deck_state.update_device_name(name);
}
}
});
}
impl<T> DropLog<T> {
fn new(inner: T, name: &'static str) -> Self {
log::debug!("Constructed {}", name);
DropLog { inner, name }
}
}
impl<T> std::ops::Deref for DropLog<T> {
type Target = T;
fn deref(&self) -> &T {
&self.inner
}
}
impl<T> std::ops::DerefMut for DropLog<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T> Drop for DropLog<T> {
fn drop(&mut self) {
log::debug!("Dropping {}", self.name);
}
}
impl<T> OnDemand<T> {
fn new(init: impl Fn() -> T + Send + Sync + 'static) -> Self {
OnDemand {
inner: Arc::new(Mutex::new(Weak::new())),
init: Arc::new(init),
}
}
fn get(&self) -> Arc<T> {
let opt = self.inner.lock().unwrap().upgrade();
if let Some(arc) = opt {
arc
} else {
let arc = Arc::new((self.init)());
*self.inner.lock().unwrap() = Arc::downgrade(&arc);
arc
}
}
}
fn connect() -> futures_channel::mpsc::Sender<DBusMessage> {
let (tx, mut rx) = futures_channel::mpsc::channel(16);
log::debug!("Spawning connection pool");
let mut senders = (0..16).map(|_| spawn_connection()).collect::<Vec<_>>();
glib::MainContext::default().spawn(async move {
let mut count = 0;
let mut start;
let len = senders.len();
while let Some(mut msg) = rx.next().await {
start = count;
while let Err(e) = senders[count].try_send(msg) {
msg = e.into_inner();
count = (count + 1) % len;
if count == start {
let _ = senders[count].send(msg).await;
break;
}
}
count = (count + 1) % len;
}
log::debug!("Shutting down dbus connection pool");
});
tx
}
async fn process_messages(rx: &mut Receiver<DBusMessage>) -> anyhow::Result<()> {
let conn = Connection::session().await?;
let daemon = StreamdeckDaemonProxy::new(&conn).await?;
while let Some(msg) = rx.next().await {
match msg {
DBusMessage::Query(sender, query) => {
let response = daemon.query(&query).await?;
let _ = sender.send(response);
}
DBusMessage::Test(command) => {
daemon.test(&command).await?;
}
DBusMessage::EnableDiscovery => {
daemon.enable_discovery().await?;
}
DBusMessage::DisableDiscovery => {
daemon.disable_discovery().await?;
}
DBusMessage::GetDecks(sender) => {
let decks = daemon.get_decks().await?;
let _ = sender.send(decks);
}
DBusMessage::Connect(host, port, sender) => {
let state = daemon.connect(&host, port).await?;
let _ = sender.send(state);
}
DBusMessage::Disconnect(sender) => {
let state = daemon.disconnect().await?;
let _ = sender.send(state);
}
DBusMessage::GetState(sender) => {
let state = daemon.get_state().await?;
let _ = sender.send(state);
}
DBusMessage::Login(password, sender) => {
let state = daemon.login(&password).await?;
let _ = sender.send(state);
}
DBusMessage::GetCommands(serial_number, sender) => {
let commands = daemon.get_commands(&serial_number).await?;
let _ = sender.send(commands);
}
DBusMessage::ReadInput(sender) => {
let input = daemon.read_input().await?;
let _ = sender.send(input);
}
DBusMessage::SetInput(serial_number, key, command) => {
daemon.set_input(&serial_number, key, &command).await?;
}
DBusMessage::UnsetInput(serial_number, key) => {
daemon.unset_input(&serial_number, key).await?;
}
DBusMessage::SetInputName(serial_number, key, name) => {
daemon.set_input_name(&serial_number, key, &name).await?;
}
DBusMessage::GetInputNames(serial_number, sender) => {
let input_names = daemon.get_input_names(&serial_number).await?;
let _ = sender.send(input_names);
}
DBusMessage::SetDeckName(serial_number, name) => {
daemon.set_deck_name(&serial_number, &name).await?;
}
DBusMessage::GetDeckName(serial_number, sender) => {
let name = daemon.get_deck_name(&serial_number).await?;
let _ = sender.send(name);
}
}
}
Ok(())
}
fn spawn_connection() -> futures_channel::mpsc::Sender<DBusMessage> {
let (tx, mut rx) = futures_channel::mpsc::channel(16);
glib::MainContext::default().spawn(async move {
let mut now = Instant::now();
let mut last_failure = now;
let mut failures_per_minute = 0f64;
let mut time_between_failures;
let mut failure_ratio;
let one_minute = Duration::from_secs(60);
while let Err(e) = process_messages(&mut rx).await {
log::warn!("Disconnected from dbus: {}", e);
now = Instant::now();
time_between_failures = now - last_failure;
last_failure = now;
if time_between_failures > one_minute {
failures_per_minute = 1f64;
} else {
failure_ratio =
time_between_failures.as_secs() as f64 / one_minute.as_secs() as f64;
failures_per_minute = ((1f64 - failure_ratio) * failures_per_minute) + 1f64;
}
async_io::Timer::after(Duration::from_secs(
failures_per_minute.powf(1.5).min(0.1) as u64
))
.await;
}
log::debug!("Shuting down dbus connection");
});
tx
}
impl std::fmt::Debug for State {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "State")
}
}
impl EventStream {
fn new(event: &Arc<Event>) -> Self {
EventStream {
listener: Some(event.listen()),
event: Arc::downgrade(event),
}
}
}
impl Stream for EventStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let event = match self.event.upgrade() {
Some(event) => event,
None => return Poll::Ready(None),
};
let mut listener = self.listener.take().unwrap_or_else(|| event.listen());
match Pin::new(&mut listener).poll(cx) {
Poll::Ready(()) => {
let mut listener;
while {
listener = event.listen();
Pin::new(&mut listener).poll(cx).is_ready()
} {}
self.listener = Some(listener);
Poll::Ready(Some(()))
}
Poll::Pending => {
self.listener = Some(listener);
Poll::Pending
}
}
}
}