Close all event streams on disconnect

This commit is contained in:
Dominik Nakamura 2021-03-11 20:42:45 +09:00
parent 1eb2881d83
commit bf54702c0c
No known key found for this signature in database
GPG key ID: E4C6A749B2491910
6 changed files with 31 additions and 13 deletions

View file

@ -15,7 +15,7 @@ async fn main() -> Result<()> {
client.login(env::var("OBS_PASSWORD").ok()).await?;
let events = client.events();
let events = client.events()?;
pin_mut!(events);
while let Some(event) = events.next().await {

View file

@ -1,5 +1,7 @@
//! The client to the obs-websocket API and main entry point.
#[cfg(feature = "events")]
use std::sync::Weak;
use std::{
collections::HashMap,
future::Future,
@ -82,7 +84,7 @@ pub struct Client {
/// Broadcast sender that distributes received events to all current listeners. Events are
/// dropped if nobody listens.
#[cfg(feature = "events")]
event_sender: broadcast::Sender<Event>,
event_sender: Weak<broadcast::Sender<Event>>,
/// Handle to the background task that receives messages and distributes them to waiting
/// receivers and event listeners. It allows to shut down all the machinery once the client is
/// no longer needed.
@ -165,7 +167,9 @@ impl Client {
let (event_sender, _) =
broadcast::channel(config.broadcast_capacity.unwrap_or(DEFAULT_CAPACITY));
#[cfg(feature = "events")]
let events_tx = event_sender.clone();
let event_sender = Arc::new(event_sender);
#[cfg(feature = "events")]
let events_tx = Arc::clone(&event_sender);
let handle = tokio::spawn(async move {
while let Some(Ok(msg)) = read.next().await {
@ -228,7 +232,7 @@ impl Client {
id_counter,
receivers,
#[cfg(feature = "events")]
event_sender,
event_sender: Arc::downgrade(&event_sender),
handle: Some(handle),
})
}
@ -331,14 +335,24 @@ impl Client {
///
/// **Note**: To be able to iterate over the stream you have to pin it with
/// [`futures_util::pin_mut`] for example.
///
/// # Errors
///
/// Getting a new stream of events fails with [`Error::Disconnected`] if the client is
/// disconnected from obs-websocket. That can happen either by manually disconnecting, stopping
/// obs-websocket or closing OBS.
#[cfg(feature = "events")]
pub fn events(&self) -> impl Stream<Item = Event> {
let mut receiver = self.event_sender.subscribe();
pub fn events(&self) -> Result<impl Stream<Item = Event>> {
if let Some(sender) = &self.event_sender.upgrade() {
let mut receiver = sender.subscribe();
async_stream::stream! {
while let Ok(event) = receiver.recv().await {
yield event;
}
Ok(async_stream::stream! {
while let Ok(event) = receiver.recv().await {
yield event;
}
})
} else {
Err(crate::Error::Disconnected)
}
}

View file

@ -76,4 +76,8 @@ pub enum Error {
/// Unknown flags were found while trying to parse bitflags.
#[error("value {0} contains unknown flags")]
UnknownFlags(u8),
/// Tried to interact with obs-websocket while not connected (for example trying to get a new
/// event stream).
#[error("currently not connected to obs-websocket")]
Disconnected,
}

View file

@ -12,7 +12,7 @@ mod common;
#[tokio::test]
async fn main() -> Result<()> {
let client = common::new_client().await?;
let events = client.events();
let events = client.events()?;
let client = client.media_control();
pin_mut!(events);

View file

@ -14,7 +14,7 @@ mod common;
#[tokio::test]
async fn main() -> Result<()> {
let client = common::new_client().await?;
let events = client.events();
let events = client.events()?;
let client = client.recording();
pin_mut!(events);

View file

@ -13,7 +13,7 @@ mod common;
#[tokio::test]
async fn main() -> Result<()> {
let client = common::new_client().await?;
let events = client.events();
let events = client.events()?;
let client = client.replay_buffer();
pin_mut!(events);