93 lines
2.5 KiB
Rust
93 lines
2.5 KiB
Rust
//! Types and Traits for consuming Streams
|
|
|
|
use std::pin::Pin;
|
|
|
|
/// Enables consuming any Stream + Unpin
|
|
pub trait IntoStreamer: futures_core::Stream {
|
|
/// Construct a type that consumes the stream
|
|
///
|
|
/// Example
|
|
/// ```rust
|
|
/// use streem::IntoStreamer;
|
|
/// let stream = streem::from_fn(|yielder| async move { yielder.yield_(1).await; });
|
|
/// let pinned = std::pin::pin!(stream);
|
|
/// let streamer = pinned.into_streamer();
|
|
/// ```
|
|
fn into_streamer(self) -> Streamer<Self>
|
|
where
|
|
Self: Sized + Unpin,
|
|
{
|
|
Streamer { inner: self }
|
|
}
|
|
}
|
|
|
|
impl<S> IntoStreamer for S where S: futures_core::Stream {}
|
|
|
|
/// Enables calling async next on a stream
|
|
pub struct Streamer<S> {
|
|
inner: S,
|
|
}
|
|
|
|
impl<S> Streamer<S> {
|
|
/// Receive the next value from the stream
|
|
///
|
|
/// Example:
|
|
/// ```rust
|
|
/// let input_stream = std::pin::pin!(streem::from_fn(|yielder| async move {
|
|
/// # for i in 0..10 {
|
|
/// # yielder.yield_(i).await;
|
|
/// # }
|
|
/// }));
|
|
/// use streem::IntoStreamer;
|
|
///
|
|
/// let mut streamer = input_stream.into_streamer();
|
|
///
|
|
/// # let _ = futures_executor::block_on(async move {
|
|
/// while let Some(item) = streamer.next().await {
|
|
/// println!("{item}");
|
|
/// }
|
|
/// # });
|
|
/// ```
|
|
pub async fn next(&mut self) -> Option<S::Item>
|
|
where
|
|
S: futures_core::Stream + Unpin,
|
|
{
|
|
std::future::poll_fn(|cx| Pin::new(&mut self.inner).poll_next(cx)).await
|
|
}
|
|
|
|
/// Receive a result of the next Ok stream value, or the given Err value
|
|
///
|
|
/// Example:
|
|
/// ```rust
|
|
/// # fn fallible_fn(i: i32) -> Result<i32, String> {
|
|
/// # Ok(i)
|
|
/// # }
|
|
/// #
|
|
/// let input_stream = std::pin::pin!(streem::try_from_fn(|yielder| async move {
|
|
/// # for i in 0..10 {
|
|
/// # let value = fallible_fn(i)?;
|
|
/// #
|
|
/// # yielder.yield_ok(value).await;
|
|
/// # }
|
|
/// #
|
|
/// # Ok(()) as Result<_, String>
|
|
/// }));
|
|
/// use streem::IntoStreamer;
|
|
///
|
|
/// let mut streamer = input_stream.into_streamer();
|
|
///
|
|
/// # let _ = futures_executor::block_on(async move {
|
|
/// while let Some(item) = streamer.try_next().await? {
|
|
/// println!("{item}");
|
|
/// }
|
|
/// # Ok(()) as Result<_, String>
|
|
/// # });
|
|
/// ```
|
|
pub async fn try_next<T, E>(&mut self) -> Result<Option<T>, E>
|
|
where
|
|
S: futures_core::Stream<Item = Result<T, E>> + Unpin,
|
|
{
|
|
self.next().await.transpose()
|
|
}
|
|
}
|