streamdeck/src/dbus.rs
2021-05-05 18:55:47 -05:00

347 lines
12 KiB
Rust

use crate::{
message::{Command, InputMessage, ManagerMessage, ObsMessage, Query},
store::Store,
};
use dbus::{channel::MatchingReceiver, message::MatchRule, nonblock::SyncConnection};
use dbus_crossroads::{Crossroads, MethodErr};
use std::sync::Arc;
use tokio::sync::{mpsc::Sender, Notify};
pub(crate) struct Dbus {
connection: Arc<SyncConnection>,
store: Store,
input: Sender<InputMessage>,
obs: Sender<ObsMessage>,
manager: Sender<ManagerMessage>,
}
struct DbusState {
store: Store,
input: Sender<InputMessage>,
obs: Sender<ObsMessage>,
manager: Sender<ManagerMessage>,
}
impl Dbus {
pub(crate) async fn build(
shutdown: Arc<Notify>,
store: Store,
input: Sender<InputMessage>,
obs: Sender<ObsMessage>,
manager: Sender<ManagerMessage>,
) -> Result<Self, anyhow::Error> {
let (resource, connection) =
tokio::task::spawn_blocking(move || dbus_tokio::connection::new_session_sync())
.await??;
tokio::spawn(async move {
resource.await;
shutdown.notify_one();
});
connection
.request_name("dog.asonix.git.asonix.Streamdeck", false, true, false)
.await?;
Ok(Dbus {
connection,
store,
input,
obs,
manager,
})
}
pub(crate) async fn run(self) {
let Dbus {
connection,
store,
input,
obs,
manager,
} = self;
let state = DbusState {
store,
input,
obs,
manager,
};
let mut cr = Crossroads::new();
cr.set_async_support(Some((
connection.clone(),
Box::new(|x| {
tokio::spawn(x);
}),
)));
let iface_token = cr.register("dog.asonix.git.asonix.Streamdeck", |b| {
b.method_with_cr_async("GetScenes", (), ("scenes",), |mut ctx, cr, ()| {
log::debug!("GetScenes");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let obs = state.obs.clone();
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
if obs
.send(ObsMessage::Query(Query::GetScenes(tx)))
.await
.is_ok()
{
if let Ok(scenes) = rx.await {
return ctx.reply(Ok((scenes,)));
}
}
ctx.reply(Err(MethodErr::failed("Failed to get scenes")))
}
});
b.method_with_cr_async("EnableDiscovery", (), (), |mut ctx, cr, ()| {
log::debug!("EnableDiscovery");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let manager = state.manager.clone();
async move {
if manager.send(ManagerMessage::EnableDiscovery).await.is_ok() {
return ctx.reply(Ok(()));
}
ctx.reply(Err(MethodErr::failed("Failed to enable discovery")))
}
});
b.method_with_cr_async("DisableDiscovery", (), (), |mut ctx, cr, ()| {
log::debug!("DisableDiscovery");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let manager = state.manager.clone();
async move {
if manager.send(ManagerMessage::DisableDiscovery).await.is_ok() {
return ctx.reply(Ok(()));
}
ctx.reply(Err(MethodErr::failed("Failed to disable discovery")))
}
});
b.method_with_cr_async("GetDecks", (), ("decks",), |mut ctx, cr, ()| {
log::debug!("GetDecks");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let manager = state.manager.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if manager.send(ManagerMessage::List(tx)).await.is_ok() {
if let Ok(decks) = rx.await {
return ctx.reply(Ok((decks
.into_iter()
.map(|d| (d.serial_number, d.product_name, d.port_name))
.collect::<Vec<_>>(),)));
}
}
ctx.reply(Err(MethodErr::failed("Failed to fetch decks")))
}
});
b.method_with_cr_async(
"Connect",
("host", "port"),
("state",),
|mut ctx, cr, (host, port): (String, u16)| {
log::debug!("Connect");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let obs = state.obs.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if obs.send(ObsMessage::Connect(host, port)).await.is_ok() {
if obs.send(ObsMessage::State(tx)).await.is_ok() {
if let Ok(state) = rx.await {
return ctx.reply(Ok((state.to_string(),)));
}
}
}
ctx.reply(Err(MethodErr::failed("Failed to start connection")))
}
},
);
b.method_with_cr_async("Disconnect", (), ("state",), |mut ctx, cr, ()| {
log::debug!("Disconnect");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let obs = state.obs.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if obs.send(ObsMessage::Disconnect).await.is_ok() {
if obs.send(ObsMessage::State(tx)).await.is_ok() {
if let Ok(state) = rx.await {
return ctx.reply(Ok((state.to_string(),)));
}
}
}
ctx.reply(Err(MethodErr::failed("Failed to start disconnection")))
}
});
b.method_with_cr_async("GetState", (), ("state",), |mut ctx, cr, ()| {
log::debug!("GetState");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let obs = state.obs.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if obs.send(ObsMessage::State(tx)).await.is_ok() {
if let Ok(state) = rx.await {
return ctx.reply(Ok((state.to_string(),)));
}
}
ctx.reply(Err(MethodErr::failed("Failed to get state")))
}
});
b.method_with_cr_async(
"Login",
("password",),
("state",),
|mut ctx, cr, (password,): (String,)| {
log::debug!("Login");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let obs = state.obs.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if obs.send(ObsMessage::Authenticate(password)).await.is_ok() {
if obs.send(ObsMessage::State(tx)).await.is_ok() {
if let Ok(state) = rx.await {
return ctx.reply(Ok((state.to_string(),)));
}
}
}
ctx.reply(Err(MethodErr::failed("Failed to start login")))
}
},
);
b.method_with_cr_async(
"GetCommands",
("serial_number",),
("commands",),
|mut ctx, cr, (serial_number,): (String,)| {
log::debug!("GetCommands");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let store = state.store.clone();
async move {
if let Ok(vec) = store.get_commands(&&serial_number).await {
let vec: Vec<_> = vec
.into_iter()
.filter_map(|(key, cmd)| {
Some((key, serde_json::to_string(&cmd).ok()?))
})
.collect();
return ctx.reply(Ok((vec,)));
}
ctx.reply(Err(MethodErr::failed("Failed to get commands")))
}
},
);
b.method_with_cr_async("ReadInput", (), ("input",), |mut ctx, cr, ()| {
log::debug!("ReadInput");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let input = state.input.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
async move {
if input.send(InputMessage::ReadInput(tx)).await.is_ok() {
if let Ok((serial_number, key)) = rx.await {
return ctx.reply(Ok((vec![(key, serial_number)],)));
}
}
ctx.reply(Err(MethodErr::failed("Failed to fetch key")))
}
});
b.method_with_cr_async(
"SetInput",
("serial_number", "key", "command"),
(),
|mut ctx, cr, (serial_number, key, command): (String, u8, String)| {
log::debug!("SetInput");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let store = state.store.clone();
async move {
if let Ok(command) = serde_json::from_str::<Command>(&command) {
if store.store(&serial_number, key, &command).await.is_ok() {
return ctx.reply(Ok(()));
}
}
ctx.reply(Err(MethodErr::failed("Failed to set mapping")))
}
},
);
b.method_with_cr_async(
"UnsetInput",
("serial_number", "key"),
(),
|mut ctx, cr, (serial_number, key): (String, u8)| {
log::debug!("UnsetInput");
let state: &mut DbusState = cr.data_mut(ctx.path()).unwrap();
let store = state.store.clone();
async move {
if store.unset(&serial_number, key).await.is_ok() {
return ctx.reply(Ok(()));
}
ctx.reply(Err(MethodErr::failed("Failed to set mapping")))
}
},
);
});
cr.insert("/dog/asonix/git/asonix/Streamdeck", &[iface_token], state);
connection.start_receive(
MatchRule::new_method_call(),
Box::new(move |msg, conn| {
cr.handle_message(msg, conn).unwrap();
true
}),
);
}
}