streaming/src/streamer.rs
2023-09-09 16:34:16 -04:00

93 lines
2.5 KiB
Rust

//! Types and Traits for consuming Streams
use std::pin::Pin;
/// Enables consuming any Stream + Unpin
pub trait IntoStreamer: futures_core::Stream {
/// Construct a type that consumes the stream
///
/// Example
/// ```rust
/// use streem::IntoStreamer;
/// let stream = streem::from_fn(|yielder| async move { yielder.yield_(1).await; });
/// let pinned = std::pin::pin!(stream);
/// let streamer = pinned.into_streamer();
/// ```
fn into_streamer(self) -> Streamer<Self>
where
Self: Sized + Unpin,
{
Streamer { inner: self }
}
}
impl<S> IntoStreamer for S where S: futures_core::Stream {}
/// Enables calling async next on a stream
pub struct Streamer<S> {
inner: S,
}
impl<S> Streamer<S> {
/// Receive the next value from the stream
///
/// Example:
/// ```rust
/// let input_stream = std::pin::pin!(streem::from_fn(|yielder| async move {
/// # for i in 0..10 {
/// # yielder.yield_(i).await;
/// # }
/// }));
/// use streem::IntoStreamer;
///
/// let mut streamer = input_stream.into_streamer();
///
/// # let _ = futures_executor::block_on(async move {
/// while let Some(item) = streamer.next().await {
/// println!("{item}");
/// }
/// # });
/// ```
pub async fn next(&mut self) -> Option<S::Item>
where
S: futures_core::Stream + Unpin,
{
std::future::poll_fn(|cx| Pin::new(&mut self.inner).poll_next(cx)).await
}
/// Receive a result of the next Ok stream value, or the given Err value
///
/// Example:
/// ```rust
/// # fn fallible_fn(i: i32) -> Result<i32, String> {
/// # Ok(i)
/// # }
/// #
/// let input_stream = std::pin::pin!(streem::try_from_fn(|yielder| async move {
/// # for i in 0..10 {
/// # let value = fallible_fn(i)?;
/// #
/// # yielder.yield_ok(value).await;
/// # }
/// #
/// # Ok(()) as Result<_, String>
/// }));
/// use streem::IntoStreamer;
///
/// let mut streamer = input_stream.into_streamer();
///
/// # let _ = futures_executor::block_on(async move {
/// while let Some(item) = streamer.try_next().await? {
/// println!("{item}");
/// }
/// # Ok(()) as Result<_, String>
/// # });
/// ```
pub async fn try_next<T, E>(&mut self) -> Result<Option<T>, E>
where
S: futures_core::Stream<Item = Result<T, E>> + Unpin,
{
self.next().await.transpose()
}
}