pict-rs/src/bytes_stream.rs

146 lines
3.4 KiB
Rust
Raw Normal View History

2024-02-22 22:15:32 +00:00
use actix_web::web::Bytes;
2023-08-23 16:59:42 +00:00
use futures_core::Stream;
use std::{
collections::{vec_deque::IntoIter, VecDeque},
2023-07-21 21:58:31 +00:00
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};
2024-02-22 22:02:33 +00:00
use streem::IntoStreamer;
use tokio::io::AsyncRead;
#[derive(Clone, Debug)]
pub(crate) struct BytesStream {
inner: VecDeque<Bytes>,
total_len: usize,
}
impl BytesStream {
pub(crate) fn new() -> Self {
Self {
inner: VecDeque::new(),
total_len: 0,
}
}
2024-02-22 22:10:34 +00:00
#[tracing::instrument(skip(stream))]
2024-02-22 22:02:33 +00:00
pub(crate) async fn try_from_stream<S, E>(stream: S) -> Result<Self, E>
where
S: Stream<Item = Result<Bytes, E>>,
{
let stream = std::pin::pin!(stream);
let mut stream = stream.into_streamer();
let mut bs = Self::new();
while let Some(bytes) = stream.try_next().await? {
2024-02-22 22:10:34 +00:00
tracing::trace!("try_from_stream: looping");
2024-02-22 22:02:33 +00:00
bs.add_bytes(bytes);
}
Ok(bs)
}
pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
self.total_len += bytes.len();
self.inner.push_back(bytes);
}
pub(crate) fn len(&self) -> usize {
self.total_len
}
2024-02-22 22:02:33 +00:00
pub(crate) fn is_empty(&self) -> bool {
2024-02-22 23:09:03 +00:00
self.total_len == 0
2024-02-22 22:02:33 +00:00
}
pub(crate) fn into_reader(self) -> BytesReader {
BytesReader {
index: 0,
inner: self.inner,
}
}
pub(crate) fn into_io_stream(self) -> IoStream {
2024-02-22 22:15:32 +00:00
IoStream { inner: self.inner }
2024-02-22 22:02:33 +00:00
}
}
pub(crate) struct IoStream {
2024-02-22 22:15:32 +00:00
inner: VecDeque<Bytes>,
2024-02-22 22:02:33 +00:00
}
pub(crate) struct BytesReader {
index: usize,
inner: VecDeque<Bytes>,
}
impl IntoIterator for BytesStream {
type Item = Bytes;
type IntoIter = IntoIter<Bytes>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}
2024-02-22 22:15:32 +00:00
impl Stream for BytesStream {
type Item = Result<Bytes, Infallible>;
2024-02-22 22:15:32 +00:00
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
2024-02-22 22:15:32 +00:00
fn size_hint(&self) -> (usize, Option<usize>) {
(self.inner.len(), Some(self.inner.len()))
}
}
2023-07-21 21:58:31 +00:00
2024-02-22 22:15:32 +00:00
impl Stream for IoStream {
type Item = std::io::Result<Bytes>;
2023-07-21 21:58:31 +00:00
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
2024-02-22 22:02:33 +00:00
2024-02-22 22:15:32 +00:00
fn size_hint(&self) -> (usize, Option<usize>) {
(self.inner.len(), Some(self.inner.len()))
2024-02-22 22:02:33 +00:00
}
}
impl AsyncRead for BytesReader {
fn poll_read(
mut self: Pin<&mut Self>,
2024-02-22 22:21:31 +00:00
_: &mut Context<'_>,
2024-02-22 22:02:33 +00:00
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
while buf.remaining() > 0 {
2024-02-22 22:21:31 +00:00
if let Some(bytes) = self.inner.front() {
if self.index == bytes.len() {
self.inner.pop_front();
self.index = 0;
continue;
}
2024-02-22 22:02:33 +00:00
2024-02-22 22:21:31 +00:00
let upper_bound = (self.index + buf.remaining()).min(bytes.len());
2024-02-22 22:02:33 +00:00
2024-02-22 22:21:31 +00:00
let slice = &bytes[self.index..upper_bound];
2024-02-22 22:02:33 +00:00
2024-02-22 22:21:31 +00:00
buf.put_slice(slice);
self.index += slice.len();
} else {
break;
}
2024-02-22 22:02:33 +00:00
}
Poll::Ready(Ok(()))
}
}
impl From<Bytes> for BytesStream {
fn from(value: Bytes) -> Self {
let mut bs = BytesStream::new();
bs.add_bytes(value);
bs
}
}