Use futures-util instead of hand-implemented types

This commit is contained in:
Aode (Lion) 2021-09-05 20:00:31 -05:00
parent eccb7da3e9
commit ac79a9d5cd
7 changed files with 11 additions and 63 deletions

2
Cargo.lock generated
View file

@ -934,7 +934,7 @@ dependencies = [
"awc",
"base64",
"dashmap",
"futures-core",
"futures-util",
"mime",
"num_cpus",
"once_cell",

View file

@ -18,7 +18,7 @@ anyhow = "1.0"
awc = { version = "3.0.0-beta.7", default-features = false }
base64 = "0.13.0"
dashmap = "4.0.2"
futures-core = "0.3.17"
futures-util = "0.3.17"
mime = "0.3.1"
num_cpus = "1.13"
once_cell = "1.4.0"

View file

@ -7,7 +7,7 @@ use actix_web::{
};
use awc::Client;
use dashmap::{mapref::entry::Entry, DashMap};
use futures_core::stream::Stream;
use futures_util::{stream::{LocalBoxStream, once}, Stream};
use once_cell::sync::{Lazy, OnceCell};
use std::{
collections::HashSet,
@ -42,7 +42,6 @@ use self::{
config::{Config, Format},
error::UploadError,
middleware::{Internal, Tracing},
stream::{once, LocalBoxStream},
upload_manager::{Details, UploadManager},
validate::{image_webp, video_mp4},
};

View file

@ -1,9 +1,9 @@
use crate::stream::LocalBoxFuture;
use actix_web::{
dev::{Service, ServiceRequest, Transform},
http::StatusCode,
HttpResponse, ResponseError,
};
use futures_util::future::LocalBoxFuture;
use std::{
future::{ready, Ready},
task::{Context, Poll},

View file

@ -1,7 +1,4 @@
use crate::{
stream::{bytes_stream, LocalBoxStream},
UploadError,
};
use crate::{stream::bytes_stream, UploadError};
use actix_web::{
dev::Payload,
http::{
@ -11,6 +8,7 @@ use actix_web::{
web::Bytes,
FromRequest, HttpRequest,
};
use futures_util::stream::LocalBoxStream;
use std::{future::ready, io};
use tokio::io::{AsyncReadExt, AsyncSeekExt};

View file

@ -1,6 +1,6 @@
use crate::error::UploadError;
use actix_web::web::{Bytes, BytesMut};
use futures_core::stream::Stream;
use futures_util::Stream;
use std::{
future::Future,
pin::Pin,
@ -8,9 +8,6 @@ use std::{
};
use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf};
pub(crate) type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub(crate) struct Process {
child: tokio::process::Child,
}
@ -23,12 +20,6 @@ pub(crate) struct ProcessRead<I> {
struct BytesFreezer<S>(S);
pub(crate) struct Once<T> {
inner: Option<T>,
}
pub(crate) struct Next<'a, S>(&'a mut S);
impl Process {
fn new(child: tokio::process::Child) -> Self {
Process { child }
@ -92,14 +83,6 @@ pub(crate) fn bytes_stream(
))
}
pub(crate) fn once<T>(input: T) -> Once<T> {
Once { inner: Some(input) }
}
pub(crate) fn next<'a, S>(stream: &'a mut S) -> Next<'a, S> {
Next(stream)
}
impl<I> AsyncRead for ProcessRead<I>
where
I: AsyncRead + Unpin,
@ -139,35 +122,3 @@ where
.map_err(UploadError::from)
}
}
impl<T> Stream for Once<T>
where
T: Future + Unpin,
{
type Item = <T as Future>::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut fut) = self.inner.take() {
match Pin::new(&mut fut).poll(cx) {
Poll::Ready(item) => Poll::Ready(Some(item)),
Poll::Pending => {
self.inner = Some(fut);
Poll::Pending
}
}
} else {
Poll::Ready(None)
}
}
}
impl<'a, S> Future for Next<'a, S>
where
S: Stream + Unpin,
{
type Output = Option<<S as Stream>::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll_next(cx)
}
}

View file

@ -2,10 +2,10 @@ use crate::{
config::Format,
error::UploadError,
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
stream::{next, LocalBoxStream},
to_ext,
};
use actix_web::web;
use futures_util::stream::{LocalBoxStream, StreamExt};
use sha2::Digest;
use std::{
path::PathBuf,
@ -547,7 +547,7 @@ impl UploadManager {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = next(&mut stream).await {
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
@ -582,7 +582,7 @@ impl UploadManager {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
while let Some(res) = next(&mut stream).await {
while let Some(res) = stream.next().await {
let bytes = res?;
bytes_mut.extend_from_slice(&bytes);
}
@ -954,7 +954,7 @@ where
let fut = async move {
let mut file = tokio::fs::File::create(to1).await?;
while let Some(res) = next(&mut stream).await {
while let Some(res) = stream.next().await {
let mut bytes = res?;
file.write_all_buf(&mut bytes).await?;
}