From f97f90df199e7ccab511cdc07428180388e0697c Mon Sep 17 00:00:00 2001 From: Dominik Nakamura Date: Tue, 12 Jan 2021 15:59:51 +0900 Subject: [PATCH] Allow to shut down the client connection --- src/client/mod.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 8cac439..bafa028 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, + future::Future, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -17,6 +18,7 @@ use serde::de::DeserializeOwned; use tokio::{ net::TcpStream, sync::{broadcast, oneshot, Mutex}, + task::JoinHandle, }; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; @@ -73,6 +75,10 @@ pub struct Client { /// Broadcast sender that distributes received events to all current listeners. Events are /// dropped if nobody listens. event_sender: broadcast::Sender, + /// Handle to the background task that receives messages and distributes them to waiting + /// receivers and event listeners. It allows to shut down all the machinery once the client is + /// no longer needed. + handle: Option>, } /// Base stream of a [`WebSocketStream`] if TLS is **disabled**. @@ -153,7 +159,7 @@ impl Client { let (event_sender, _) = broadcast::channel(config.broadcast_capacity); let events_tx = event_sender.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { while let Some(Ok(msg)) = read.next().await { trace!("{}", msg); let res: Result<(), InnerError> = async { @@ -194,6 +200,7 @@ impl Client { id_counter, receivers, event_sender, + handle: Some(handle), }) } @@ -230,6 +237,24 @@ impl Client { .map_err(Error::DeserializeResponse) } + /// Disconnect from obs-websocket and shut down all machinery. + /// + /// This is called automatically when dropping the client but doesn't wait for all background + /// tasks to complete. Therefore, it is recommended to call this manually once the client is + /// no longer needed. + pub fn disconnect(&mut self) -> impl Future { + let handle = self.handle.take().map(|h| { + h.abort(); + h + }); + + async { + if let Some(h) = handle { + h.await.ok(); + } + } + } + /// Login to the OBS websocket if an authentication is required. pub async fn login(&self, password: Option>) -> Result<()> { let auth_required = self.general().get_auth_required().await?; @@ -360,3 +385,11 @@ fn extract_error(value: &mut serde_json::Value) -> Option { } }) } + +impl Drop for Client { + fn drop(&mut self) { + // We simply drop the future as the background task has been aborted but we have no way here + // to wait for it to fully shut down (except spinning up a new tokio runtime). + drop(self.disconnect()); + } +}