Allow to shut down the client connection

This commit is contained in:
Dominik Nakamura 2021-01-12 15:59:51 +09:00
parent 737a9b4205
commit f97f90df19
No known key found for this signature in database
GPG key ID: E4C6A749B2491910

View file

@ -2,6 +2,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
future::Future,
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
@ -17,6 +18,7 @@ use serde::de::DeserializeOwned;
use tokio::{ use tokio::{
net::TcpStream, net::TcpStream,
sync::{broadcast, oneshot, Mutex}, sync::{broadcast, oneshot, Mutex},
task::JoinHandle,
}; };
use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
@ -73,6 +75,10 @@ pub struct Client {
/// Broadcast sender that distributes received events to all current listeners. Events are /// Broadcast sender that distributes received events to all current listeners. Events are
/// dropped if nobody listens. /// dropped if nobody listens.
event_sender: broadcast::Sender<Event>, event_sender: broadcast::Sender<Event>,
/// Handle to the background task that receives messages and distributes them to waiting
/// receivers and event listeners. It allows to shut down all the machinery once the client is
/// no longer needed.
handle: Option<JoinHandle<()>>,
} }
/// Base stream of a [`WebSocketStream`] if TLS is **disabled**. /// Base stream of a [`WebSocketStream`] if TLS is **disabled**.
@ -153,7 +159,7 @@ impl Client {
let (event_sender, _) = broadcast::channel(config.broadcast_capacity); let (event_sender, _) = broadcast::channel(config.broadcast_capacity);
let events_tx = event_sender.clone(); let events_tx = event_sender.clone();
tokio::spawn(async move { let handle = tokio::spawn(async move {
while let Some(Ok(msg)) = read.next().await { while let Some(Ok(msg)) = read.next().await {
trace!("{}", msg); trace!("{}", msg);
let res: Result<(), InnerError> = async { let res: Result<(), InnerError> = async {
@ -194,6 +200,7 @@ impl Client {
id_counter, id_counter,
receivers, receivers,
event_sender, event_sender,
handle: Some(handle),
}) })
} }
@ -230,6 +237,24 @@ impl Client {
.map_err(Error::DeserializeResponse) .map_err(Error::DeserializeResponse)
} }
/// Disconnect from obs-websocket and shut down all machinery.
///
/// This is called automatically when dropping the client but doesn't wait for all background
/// tasks to complete. Therefore, it is recommended to call this manually once the client is
/// no longer needed.
pub fn disconnect(&mut self) -> impl Future {
let handle = self.handle.take().map(|h| {
h.abort();
h
});
async {
if let Some(h) = handle {
h.await.ok();
}
}
}
/// Login to the OBS websocket if an authentication is required. /// Login to the OBS websocket if an authentication is required.
pub async fn login(&self, password: Option<impl AsRef<str>>) -> Result<()> { pub async fn login(&self, password: Option<impl AsRef<str>>) -> Result<()> {
let auth_required = self.general().get_auth_required().await?; let auth_required = self.general().get_auth_required().await?;
@ -360,3 +385,11 @@ fn extract_error(value: &mut serde_json::Value) -> Option<String> {
} }
}) })
} }
impl Drop for Client {
fn drop(&mut self) {
// We simply drop the future as the background task has been aborted but we have no way here
// to wait for it to fully shut down (except spinning up a new tokio runtime).
drop(self.disconnect());
}
}