asonix/use-bytes-stream #52

Merged
asonix merged 12 commits from asonix/use-bytes-stream into main 2024-02-23 01:36:10 +00:00
4 changed files with 31 additions and 16 deletions
Showing only changes of commit 227e9cc3a7 - Show all commits

View file

@ -8,6 +8,7 @@ use std::{
}; };
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio_util::bytes::Buf;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct BytesStream { pub(crate) struct BytesStream {
@ -37,9 +38,19 @@ impl BytesStream {
bs.add_bytes(bytes); bs.add_bytes(bytes);
} }
tracing::debug!(
"BytesStream with {} chunks, avg length {}",
bs.chunks_len(),
bs.len() / bs.chunks_len()
);
Ok(bs) Ok(bs)
} }
pub(crate) fn chunks_len(&self) -> usize {
self.inner.len()
}
pub(crate) fn add_bytes(&mut self, bytes: Bytes) { pub(crate) fn add_bytes(&mut self, bytes: Bytes) {
self.total_len += bytes.len(); self.total_len += bytes.len();
self.inner.push_back(bytes); self.inner.push_back(bytes);
@ -54,10 +65,7 @@ impl BytesStream {
} }
pub(crate) fn into_reader(self) -> BytesReader { pub(crate) fn into_reader(self) -> BytesReader {
BytesReader { BytesReader { inner: self.inner }
index: 0,
inner: self.inner,
}
} }
pub(crate) fn into_io_stream(self) -> IoStream { pub(crate) fn into_io_stream(self) -> IoStream {
@ -70,7 +78,6 @@ pub(crate) struct IoStream {
} }
pub(crate) struct BytesReader { pub(crate) struct BytesReader {
index: usize,
inner: VecDeque<Bytes>, inner: VecDeque<Bytes>,
} }
@ -114,19 +121,20 @@ impl AsyncRead for BytesReader {
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
while buf.remaining() > 0 { while buf.remaining() > 0 {
if let Some(bytes) = self.inner.front() { tracing::trace!("bytes reader: looping");
if self.index == bytes.len() {
if let Some(bytes) = self.inner.front_mut() {
if bytes.is_empty() {
self.inner.pop_front(); self.inner.pop_front();
self.index = 0;
continue; continue;
} }
let upper_bound = (self.index + buf.remaining()).min(bytes.len()); let upper_bound = buf.remaining().min(bytes.len());
let slice = &bytes[self.index..upper_bound]; let slice = &bytes[..upper_bound];
buf.put_slice(slice); buf.put_slice(slice);
self.index += slice.len(); bytes.advance(upper_bound);
} else { } else {
break; break;
} }

View file

@ -127,7 +127,9 @@ async fn process<S: Store + 'static>(
) )
.await? .await?
.into_bytes_stream() .into_bytes_stream()
.instrument(tracing::info_span!("Reading processed image to vec")) .instrument(tracing::info_span!(
"Reading processed image to BytesStream"
))
.await?; .await?;
drop(permit); drop(permit);

View file

@ -1,4 +1,3 @@
use std::{ use std::{
ffi::OsStr, ffi::OsStr,
future::Future, future::Future,
@ -8,7 +7,7 @@ use std::{
}; };
use tokio::{ use tokio::{
io::{AsyncReadExt}, io::AsyncReadExt,
process::{Child, ChildStdin, Command}, process::{Child, ChildStdin, Command},
}; };
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
@ -323,7 +322,7 @@ impl ProcessRead {
let cmd = self.command.clone(); let cmd = self.command.clone();
self.with_stdout(move |stdout| { self.with_stdout(move |stdout| {
BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 16)) BytesStream::try_from_stream(ReaderStream::with_capacity(stdout, 1024 * 64))
}) })
.await? .await?
.map_err(move |e| ProcessError::Read(cmd, e)) .map_err(move |e| ProcessError::Read(cmd, e))

View file

@ -186,6 +186,12 @@ where
} }
} }
tracing::debug!(
"BytesStream with {} chunks, avg length {}",
buf.chunks_len(),
buf.len() / buf.chunks_len()
);
Ok(buf) Ok(buf)
} }
@ -225,7 +231,7 @@ impl Store for ObjectStore {
where where
Reader: AsyncRead + Unpin + 'static, Reader: AsyncRead + Unpin + 'static,
{ {
self.save_stream(ReaderStream::with_capacity(reader, 1024 * 16), content_type) self.save_stream(ReaderStream::with_capacity(reader, 1024 * 64), content_type)
.await .await
} }