use actix_web::web::Bytes; use futures_core::Stream; use std::{pin::Pin, time::Duration}; use streem::IntoStreamer; use crate::future::WithMetrics; pub(crate) fn take(stream: S, amount: usize) -> impl Stream where S: Stream, S::Item: 'static, { streem::from_fn(|yielder| async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); let mut count = 0; if count == amount { return; } while let Some(item) = streamer.next().await { tracing::trace!("take: looping"); yielder.yield_(item).await; count += 1; if count == amount { break; } } }) } pub(crate) fn metrics(name: &'static str, stream: S) -> impl Stream where S: Stream, S::Item: 'static, { streem::from_fn(|yielder| { async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); while let Some(item) = streamer.next().await { tracing::trace!("metrics: looping"); yielder.yield_(item).await; } } .with_metrics(name) }) } pub(crate) fn make_send(stream: S) -> impl Stream + Send where S: Stream + 'static, S::Item: Send + Sync, { let (tx, rx) = crate::sync::channel(1); let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { tracing::trace!("make send tx: looping"); if tx.send_async(res).await.is_err() { break; } } })); streem::from_fn(|yiedler| async move { let mut stream = rx.into_stream().into_streamer(); while let Some(res) = stream.next().await { tracing::trace!("make send rx: looping"); yiedler.yield_(res).await; } let _ = handle.await; }) } pub(crate) fn from_iterator(iterator: I, buffer: usize) -> impl Stream + Send where I: IntoIterator + Send + 'static, I::Item: Send + Sync, { let (tx, rx) = crate::sync::channel(buffer); let handle = crate::sync::spawn_blocking("blocking-iterator", move || { for value in iterator { if tx.send(value).is_err() { break; } } }); streem::from_fn(|yielder| async move { let mut stream = rx.into_stream().into_streamer(); let yield_count = buffer.max(8); let mut count = 0; while let Some(res) = stream.next().await { tracing::trace!("from_iterator: looping"); count += 1; count %= yield_count; yielder.yield_(res).await; // every 8 (or buffer-size) items, yield to executor before looping // improves cooperation if count == 0 { tokio::task::yield_now().await; } } let _ = handle.await; }) } pub(crate) fn map(stream: S, f: F) -> impl Stream where S: Stream, I2: 'static, F: Fn(I1) -> I2 + Copy, { streem::from_fn(|yielder| async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { tracing::trace!("map: looping"); yielder.yield_((f)(res)).await; } }) } #[cfg(not(feature = "io-uring"))] pub(crate) fn map_ok(stream: S, f: F) -> impl Stream> where S: Stream>, T2: 'static, E: 'static, F: Fn(T1) -> T2 + Copy, { map(stream, move |res| res.map(f)) } pub(crate) fn map_err(stream: S, f: F) -> impl Stream> where S: Stream>, T: 'static, E2: 'static, F: Fn(E1) -> E2 + Copy, { map(stream, move |res| res.map_err(f)) } pub(crate) fn from_err(stream: S) -> impl Stream> where S: Stream>, T: 'static, E1: Into, E2: 'static, { map_err(stream, Into::into) } pub(crate) fn empty() -> impl Stream where T: 'static, { streem::from_fn(|_| std::future::ready(())) } pub(crate) fn once(value: T) -> impl Stream where T: 'static, { streem::from_fn(|yielder| yielder.yield_(value)) } pub(crate) fn timeout( duration: Duration, stream: S, ) -> impl Stream> where S: Stream, S::Item: 'static, { streem::try_from_fn(|yielder| async move { tokio::time::timeout(duration, async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); while let Some(res) = streamer.next().await { tracing::trace!("timeout: looping"); yielder.yield_ok(res).await; } }) .await .map_err(|_| TimeoutError) }) } pub(crate) fn limit(limit: usize, stream: S) -> impl Stream> where S: Stream>, E: From + 'static, { streem::try_from_fn(|yielder| async move { let stream = std::pin::pin!(stream); let mut streamer = stream.into_streamer(); let mut count = 0; while let Some(bytes) = streamer.try_next().await? { tracing::trace!("limit: looping"); count += bytes.len(); if count > limit { return Err(LimitError.into()); } yielder.yield_ok(bytes).await; } Ok(()) }) } pub(crate) type LocalBoxStream<'a, T> = Pin + 'a>>; #[derive(Debug, thiserror::Error)] #[error("Resonse body larger than size limit")] pub(crate) struct LimitError; #[derive(Debug, thiserror::Error)] #[error("Timeout in body")] pub(crate) struct TimeoutError;