From 63d66050c8350af5204d66e8850179900815fe1d Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Tue, 29 Mar 2022 16:18:00 -0500 Subject: [PATCH] Enable buffering iterator items for iterstream --- src/repo/sled.rs | 2 +- src/stream.rs | 29 +++++++++++++++++------------ 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/repo/sled.rs b/src/repo/sled.rs index b65b562..b34d008 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -227,7 +227,7 @@ impl HashRepo for SledRepo { .keys() .map(|res| res.map_err(Error::from)); - Box::pin(from_iterator(iter)) + Box::pin(from_iterator(iter, 8)) } #[tracing::instrument] diff --git a/src/stream.rs b/src/stream.rs index 7ded6bd..bd14aef 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -41,9 +41,10 @@ pub(crate) trait StreamTimeout { pub(crate) fn from_iterator( iterator: I, + buffer: usize, ) -> IterStream { IterStream { - state: IterStreamState::New { iterator }, + state: IterStreamState::New { iterator, buffer }, } } @@ -76,6 +77,7 @@ pin_project_lite::pin_project! { enum IterStreamState { New { iterator: I, + buffer: usize, }, Running { handle: JoinHandle<()>, @@ -184,8 +186,8 @@ where let this = self.as_mut().get_mut(); match std::mem::replace(&mut this.state, IterStreamState::Pending) { - IterStreamState::New { iterator } => { - let (sender, receiver) = tokio::sync::mpsc::channel(1); + IterStreamState::New { iterator, buffer } => { + let (sender, receiver) = tokio::sync::mpsc::channel(buffer); let mut handle = actix_rt::task::spawn_blocking(move || { let iterator = iterator.into_iter(); @@ -202,27 +204,30 @@ where } this.state = IterStreamState::Running { handle, receiver }; + + self.poll_next(cx) } IterStreamState::Running { mut handle, mut receiver, } => match Pin::new(&mut receiver).poll_recv(cx) { Poll::Ready(Some(item)) => { + this.state = IterStreamState::Running { handle, receiver }; + + Poll::Ready(Some(item)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => { if Pin::new(&mut handle).poll(cx).is_ready() { - return Poll::Ready(Some(item)); + return Poll::Ready(None); } this.state = IterStreamState::Running { handle, receiver }; - } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => { - this.state = IterStreamState::Running { handle, receiver }; - return Poll::Pending; + + Poll::Pending } }, - IterStreamState::Pending => return Poll::Ready(None), + IterStreamState::Pending => panic!("Polled after completion"), } - - self.poll_next(cx) } }