Remove direct dep on futures-util
Some checks are pending
continuous-integration/drone/push Build is running

This commit is contained in:
asonix 2023-08-23 19:10:10 -05:00
parent 8f50a15b25
commit 95637fdfe5
8 changed files with 58 additions and 20 deletions

1
Cargo.lock generated
View file

@ -1744,7 +1744,6 @@ dependencies = [
"dashmap",
"flume",
"futures-core",
"futures-util",
"hex",
"md-5",
"metrics",

View file

@ -28,7 +28,6 @@ console-subscriber = "0.1"
dashmap = "5.1.0"
flume = "0.11.0"
futures-core = "0.3"
futures-util = { version = "0.3.17", default-features = false }
hex = "0.4.3"
md-5 = "0.10.5"
metrics = "0.21.1"

View file

@ -2,10 +2,10 @@ use crate::{
error::Error,
repo::{ArcRepo, UploadId},
store::Store,
stream::StreamMap,
};
use actix_web::web::Bytes;
use futures_core::Stream;
use futures_util::TryStreamExt;
use mime::APPLICATION_OCTET_STREAM;
use tracing::{Instrument, Span};
@ -58,7 +58,8 @@ where
.create_upload(self.upload_id.expect("Upload id exists"))
.await?;
let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let stream =
stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
// use octet-stream, we don't know the upload's real type yet
let identifier = store.save_stream(stream, APPLICATION_OCTET_STREAM).await?;

View file

@ -6,10 +6,13 @@ pub(crate) use tokio_file::File;
#[cfg(not(feature = "io-uring"))]
mod tokio_file {
use crate::{store::file_store::FileError, stream::IntoStreamer, Either};
use crate::{
store::file_store::FileError,
stream::{IntoStreamer, StreamMap},
Either,
};
use actix_web::web::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::TryStreamExt;
use std::{io::SeekFrom, path::Path};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead};
@ -97,7 +100,7 @@ mod tokio_file {
(None, None) => Either::right(self.inner),
};
Ok(FramedRead::new(obj, BytesCodec::new()).map_ok(BytesMut::freeze))
Ok(FramedRead::new(obj, BytesCodec::new()).map(|res| res.map(BytesMut::freeze)))
}
}
}

View file

@ -35,7 +35,6 @@ use actix_web::{
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
};
use futures_core::Stream;
use futures_util::{StreamExt, TryStreamExt};
use metrics_exporter_prometheus::PrometheusBuilder;
use middleware::Metrics;
use once_cell::sync::Lazy;
@ -69,7 +68,7 @@ use self::{
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
serde_str::Serde,
store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store},
stream::{empty, once, StreamLimit, StreamTimeout},
stream::{empty, once, StreamLimit, StreamMap, StreamTimeout},
};
pub use self::config::{ConfigSource, PictRsConfiguration};
@ -154,7 +153,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
let span = tracing::info_span!("file-upload", ?filename);
let stream = stream.map_err(Error::from);
let stream = stream.map(|res| res.map_err(Error::from));
Box::pin(
async move {
@ -213,7 +212,7 @@ impl<S: Store + 'static> FormData for Import<S> {
let span = tracing::info_span!("file-import", ?filename);
let stream = stream.map_err(Error::from);
let stream = stream.map(|res| res.map_err(Error::from));
Box::pin(
async move {
@ -350,7 +349,7 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
let span = tracing::info_span!("file-proxy", ?filename);
let stream = stream.map_err(Error::from);
let stream = stream.map(|res| res.map_err(Error::from));
Box::pin(
async move {
@ -521,7 +520,7 @@ async fn download_stream(
let stream = res
.bytes_stream()
.map_err(Error::from)
.map(|res| res.map_err(Error::from))
.limit((config.media.max_file_size * MEGABYTES) as u64);
Ok(stream)
@ -1231,7 +1230,7 @@ async fn ranged_file_resp<S: Store + 'static>(
Either::left(Either::left(
range::chop_store(range, store, &identifier, len)
.await?
.map_err(Error::from),
.map(|res| res.map_err(Error::from)),
)),
)
} else {
@ -1248,7 +1247,7 @@ async fn ranged_file_resp<S: Store + 'static>(
let stream = store
.to_stream(&identifier, None, None)
.await?
.map_err(Error::from);
.map(|res| res.map_err(Error::from));
if not_found {
(HttpResponse::NotFound(), Either::right(stream))

View file

@ -8,8 +8,8 @@ use crate::{
repo::{Alias, ArcRepo, UploadId, UploadResult},
serde_str::Serde,
store::{Identifier, Store},
stream::StreamMap,
};
use futures_util::TryStreamExt;
use std::path::PathBuf;
pub(super) fn perform<'a, S>(
@ -92,7 +92,7 @@ where
let stream = store2
.to_stream(&ident, None, None)
.await?
.map_err(Error::from);
.map(|res| res.map_err(Error::from));
let session =
crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?;

View file

@ -2,7 +2,7 @@ use crate::{
bytes_stream::BytesStream,
repo::{Repo, SettingsRepo},
store::Store,
stream::IntoStreamer,
stream::{IntoStreamer, StreamMap},
};
use actix_rt::task::JoinError;
use actix_web::{
@ -15,7 +15,6 @@ use actix_web::{
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_core::Stream;
use futures_util::TryStreamExt;
use reqwest::{header::RANGE, Body, Response};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle};
@ -382,7 +381,9 @@ impl Store for ObjectStore {
}
Ok(Box::pin(
response.bytes_stream().map_err(payload_to_io_error),
response
.bytes_stream()
.map(|res| res.map_err(payload_to_io_error)),
))
}

View file

@ -13,6 +13,42 @@ use std::{
time::Duration,
};
pin_project_lite::pin_project! {
pub(crate) struct Map<S, F> {
#[pin]
stream: S,
func: F,
}
}
pub(crate) trait StreamMap: Stream {
fn map<F, U>(self, func: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> U,
Self: Sized,
{
Map { stream: self, func }
}
}
impl<T> StreamMap for T where T: Stream {}
impl<S, F, U> Stream for Map<S, F>
where
S: Stream,
F: FnMut(S::Item) -> U,
{
type Item = U;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let value = std::task::ready!(this.stream.poll_next(cx));
Poll::Ready(value.map(this.func))
}
}
pub(crate) struct Empty<T>(PhantomData<T>);
impl<T> Stream for Empty<T> {