From bcf73eb4e47220c84747438e514b780083c76723 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 5 Jan 2024 19:50:10 -0600 Subject: [PATCH] Improve cooperation from from_iterator to prevent task starvation --- src/stream.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/stream.rs b/src/stream.rs index 0b2e57d..f03ec31 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -76,10 +76,22 @@ where streem::from_fn(|yielder| async move { let mut stream = rx.into_stream().into_streamer(); + let yield_count = buffer.max(8); + let mut count = 0; + while let Some(res) = stream.next().await { tracing::trace!("from_iterator: looping"); + count += 1; + count = count % yield_count; + yielder.yield_(res).await; + + // every 8 (or buffer-size) items, yield to executor before looping + // improves cooperation + if count == 0 { + tokio::task::yield_now().await; + } } let _ = handle.await;