diff --git a/examples/events.rs b/examples/events.rs index c5d6aa1..d010935 100644 --- a/examples/events.rs +++ b/examples/events.rs @@ -15,7 +15,7 @@ async fn main() -> Result<()> { client.login(env::var("OBS_PASSWORD").ok()).await?; - let events = client.events(); + let events = client.events()?; pin_mut!(events); while let Some(event) = events.next().await { diff --git a/src/client/mod.rs b/src/client/mod.rs index d844249..3e1e9af 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,7 @@ //! The client to the obs-websocket API and main entry point. +#[cfg(feature = "events")] +use std::sync::Weak; use std::{ collections::HashMap, future::Future, @@ -82,7 +84,7 @@ pub struct Client { /// Broadcast sender that distributes received events to all current listeners. Events are /// dropped if nobody listens. #[cfg(feature = "events")] - event_sender: broadcast::Sender, + event_sender: Weak>, /// Handle to the background task that receives messages and distributes them to waiting /// receivers and event listeners. It allows to shut down all the machinery once the client is /// no longer needed. @@ -165,7 +167,9 @@ impl Client { let (event_sender, _) = broadcast::channel(config.broadcast_capacity.unwrap_or(DEFAULT_CAPACITY)); #[cfg(feature = "events")] - let events_tx = event_sender.clone(); + let event_sender = Arc::new(event_sender); + #[cfg(feature = "events")] + let events_tx = Arc::clone(&event_sender); let handle = tokio::spawn(async move { while let Some(Ok(msg)) = read.next().await { @@ -228,7 +232,7 @@ impl Client { id_counter, receivers, #[cfg(feature = "events")] - event_sender, + event_sender: Arc::downgrade(&event_sender), handle: Some(handle), }) } @@ -331,14 +335,24 @@ impl Client { /// /// **Note**: To be able to iterate over the stream you have to pin it with /// [`futures_util::pin_mut`] for example. + /// + /// # Errors + /// + /// Getting a new stream of events fails with [`Error::Disconnected`] if the client is + /// disconnected from obs-websocket. That can happen either by manually disconnecting, stopping + /// obs-websocket or closing OBS. #[cfg(feature = "events")] - pub fn events(&self) -> impl Stream { - let mut receiver = self.event_sender.subscribe(); + pub fn events(&self) -> Result> { + if let Some(sender) = &self.event_sender.upgrade() { + let mut receiver = sender.subscribe(); - async_stream::stream! { - while let Ok(event) = receiver.recv().await { - yield event; - } + Ok(async_stream::stream! { + while let Ok(event) = receiver.recv().await { + yield event; + } + }) + } else { + Err(crate::Error::Disconnected) } } diff --git a/src/lib.rs b/src/lib.rs index 2b7aeff..82e53e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,4 +76,8 @@ pub enum Error { /// Unknown flags were found while trying to parse bitflags. #[error("value {0} contains unknown flags")] UnknownFlags(u8), + /// Tried to interact with obs-websocket while not connected (for example trying to get a new + /// event stream). + #[error("currently not connected to obs-websocket")] + Disconnected, } diff --git a/tests/media_control.rs b/tests/media_control.rs index 0597e1b..a7e3968 100644 --- a/tests/media_control.rs +++ b/tests/media_control.rs @@ -12,7 +12,7 @@ mod common; #[tokio::test] async fn main() -> Result<()> { let client = common::new_client().await?; - let events = client.events(); + let events = client.events()?; let client = client.media_control(); pin_mut!(events); diff --git a/tests/recording.rs b/tests/recording.rs index 75d3793..8ae08b7 100644 --- a/tests/recording.rs +++ b/tests/recording.rs @@ -14,7 +14,7 @@ mod common; #[tokio::test] async fn main() -> Result<()> { let client = common::new_client().await?; - let events = client.events(); + let events = client.events()?; let client = client.recording(); pin_mut!(events); diff --git a/tests/replay_buffer.rs b/tests/replay_buffer.rs index 219697e..b5da8be 100644 --- a/tests/replay_buffer.rs +++ b/tests/replay_buffer.rs @@ -13,7 +13,7 @@ mod common; #[tokio::test] async fn main() -> Result<()> { let client = common::new_client().await?; - let events = client.events(); + let events = client.events()?; let client = client.replay_buffer(); pin_mut!(events);