diff --git a/src/stream.rs b/src/stream.rs index 9e617aa..c52dc1b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -58,6 +58,22 @@ where }) } +pub(crate) fn map(stream: S, f: F) -> impl Stream +where + S: Stream, + I2: 'static, + F: Fn(I1) -> I2 + Copy, +{ + streem::from_fn(|yielder| async move { + let stream = std::pin::pin!(stream); + let mut streamer = stream.into_streamer(); + + while let Some(res) = streamer.next().await { + yielder.yield_((f)(res)).await; + } + }) +} + #[cfg(not(feature = "io-uring"))] pub(crate) fn map_ok(stream: S, f: F) -> impl Stream> where @@ -66,14 +82,7 @@ where E: 'static, F: Fn(T1) -> T2 + Copy, { - streem::from_fn(|yielder| async move { - let stream = std::pin::pin!(stream); - let mut streamer = stream.into_streamer(); - - while let Some(res) = streamer.next().await { - yielder.yield_(res.map(f)).await; - } - }) + map(stream, move |res| res.map(f)) } pub(crate) fn map_err(stream: S, f: F) -> impl Stream> @@ -83,14 +92,7 @@ where E2: 'static, F: Fn(E1) -> E2 + Copy, { - streem::from_fn(|yielder| async move { - let stream = std::pin::pin!(stream); - let mut streamer = stream.into_streamer(); - - while let Some(res) = streamer.next().await { - yielder.yield_(res.map_err(f)).await; - } - }) + map(stream, move |res| res.map_err(f)) } pub(crate) fn from_err(stream: S) -> impl Stream>