diff --git a/src/stream.rs b/src/stream.rs index 0b2e57d..f03ec31 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -76,10 +76,22 @@ where streem::from_fn(|yielder| async move { let mut stream = rx.into_stream().into_streamer(); + let yield_count = buffer.max(8); + let mut count = 0; + while let Some(res) = stream.next().await { tracing::trace!("from_iterator: looping"); + count += 1; + count = count % yield_count; + yielder.yield_(res).await; + + // every 8 (or buffer-size) items, yield to executor before looping + // improves cooperation + if count == 0 { + tokio::task::yield_now().await; + } } let _ = handle.await;