Enable buffering iterator items for iterstream
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Aode (lion) 2022-03-29 16:18:00 -05:00
parent 5adb3fde89
commit 63d66050c8
2 changed files with 18 additions and 13 deletions

View file

@ -227,7 +227,7 @@ impl HashRepo for SledRepo {
.keys() .keys()
.map(|res| res.map_err(Error::from)); .map(|res| res.map_err(Error::from));
Box::pin(from_iterator(iter)) Box::pin(from_iterator(iter, 8))
} }
#[tracing::instrument] #[tracing::instrument]

View file

@ -41,9 +41,10 @@ pub(crate) trait StreamTimeout {
pub(crate) fn from_iterator<I: IntoIterator + Unpin + Send + 'static>( pub(crate) fn from_iterator<I: IntoIterator + Unpin + Send + 'static>(
iterator: I, iterator: I,
buffer: usize,
) -> IterStream<I, I::Item> { ) -> IterStream<I, I::Item> {
IterStream { IterStream {
state: IterStreamState::New { iterator }, state: IterStreamState::New { iterator, buffer },
} }
} }
@ -76,6 +77,7 @@ pin_project_lite::pin_project! {
enum IterStreamState<I, T> { enum IterStreamState<I, T> {
New { New {
iterator: I, iterator: I,
buffer: usize,
}, },
Running { Running {
handle: JoinHandle<()>, handle: JoinHandle<()>,
@ -184,8 +186,8 @@ where
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
match std::mem::replace(&mut this.state, IterStreamState::Pending) { match std::mem::replace(&mut this.state, IterStreamState::Pending) {
IterStreamState::New { iterator } => { IterStreamState::New { iterator, buffer } => {
let (sender, receiver) = tokio::sync::mpsc::channel(1); let (sender, receiver) = tokio::sync::mpsc::channel(buffer);
let mut handle = actix_rt::task::spawn_blocking(move || { let mut handle = actix_rt::task::spawn_blocking(move || {
let iterator = iterator.into_iter(); let iterator = iterator.into_iter();
@ -202,27 +204,30 @@ where
} }
this.state = IterStreamState::Running { handle, receiver }; this.state = IterStreamState::Running { handle, receiver };
self.poll_next(cx)
} }
IterStreamState::Running { IterStreamState::Running {
mut handle, mut handle,
mut receiver, mut receiver,
} => match Pin::new(&mut receiver).poll_recv(cx) { } => match Pin::new(&mut receiver).poll_recv(cx) {
Poll::Ready(Some(item)) => { Poll::Ready(Some(item)) => {
this.state = IterStreamState::Running { handle, receiver };
Poll::Ready(Some(item))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => {
if Pin::new(&mut handle).poll(cx).is_ready() { if Pin::new(&mut handle).poll(cx).is_ready() {
return Poll::Ready(Some(item)); return Poll::Ready(None);
} }
this.state = IterStreamState::Running { handle, receiver }; this.state = IterStreamState::Running { handle, receiver };
}
Poll::Ready(None) => return Poll::Ready(None), Poll::Pending
Poll::Pending => {
this.state = IterStreamState::Running { handle, receiver };
return Poll::Pending;
} }
}, },
IterStreamState::Pending => return Poll::Ready(None), IterStreamState::Pending => panic!("Polled after completion"),
} }
self.poll_next(cx)
} }
} }