From 2f0a3618d8c1605dfb7ff22594c67ccaa02f0873 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 21 Jul 2023 16:58:31 -0500 Subject: [PATCH] Replace awc with reqwest --- Cargo.lock | 266 +++++++++++++++++++++++++++----------- Cargo.toml | 18 +-- pict-rs.nix | 2 +- pict-rs.toml | 8 +- src/bytes_stream.rs | 10 ++ src/error.rs | 21 ++- src/lib.rs | 42 +++--- src/store/object_store.rs | 118 ++++++++--------- 8 files changed, 296 insertions(+), 189 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18d7d97..b41ff38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,25 +159,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "actix-tls" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fde0cf292f7cdc7f070803cb9a0d45c018441321a78b1042ffbbb81ec333297" -dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", - "actix-utils", - "futures-core", - "http", - "log", - "pin-project-lite", - "tokio-rustls", - "tokio-util", - "webpki-roots", -] - [[package]] name = "actix-utils" version = "3.0.1" @@ -367,40 +348,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "awc" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ef547a81796eb2dfe9b345aba34c2e08391a0502493711395b36dd64052b69" -dependencies = [ - "actix-codec", - "actix-http", - "actix-rt", - "actix-service", - "actix-tls", - "actix-utils", - "ahash 0.7.6", - "base64 0.21.2", - "bytes", - "cfg-if", - "derive_more", - "futures-core", - "futures-util", - "h2", - "http", - "itoa", - "log", - "mime", - "percent-encoding", - "pin-project-lite", - "rand", - "rustls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", -] - [[package]] name = "axum" version = "0.6.18" @@ -972,8 +919,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1126,6 +1075,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1193,6 +1156,12 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" + [[package]] name = "is-terminal" version = "0.4.9" @@ -1348,6 +1317,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1642,7 +1621,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.5.0-alpha.3" +version = "0.5.0-alpha.4" dependencies = [ "actix-form-data", "actix-rt", @@ -1650,7 +1629,6 @@ dependencies = [ "actix-web", "anyhow", "async-trait", - "awc", "base64 0.21.2", "clap", "color-eyre", @@ -1667,6 +1645,9 @@ dependencies = [ "opentelemetry-otlp", "pin-project-lite", "quick-xml 0.29.0", + "reqwest", + "reqwest-middleware", + "reqwest-tracing", "rusty-s3", "serde", "serde_cbor", @@ -1683,7 +1664,6 @@ dependencies = [ "toml 0.7.6", "tracing", "tracing-actix-web", - "tracing-awc", "tracing-error", "tracing-futures", "tracing-log", @@ -1893,6 +1873,81 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" +[[package]] +name = "reqwest" +version = "0.11.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" +dependencies = [ + "base64 0.21.2", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "mime_guess", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots", + "winreg", +] + +[[package]] +name = "reqwest-middleware" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4531c89d50effe1fac90d095c8b133c20c5c714204feee0bfc3fd158e784209d" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "serde", + "task-local-extensions", + "thiserror", +] + +[[package]] +name = "reqwest-tracing" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8" +dependencies = [ + "anyhow", + "async-trait", + "getrandom", + "matchit", + "opentelemetry", + "reqwest", + "reqwest-middleware", + "task-local-extensions", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "ring" version = "0.16.20" @@ -1968,14 +2023,33 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.8" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", + "rustls-webpki", "sct", - "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.2", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +dependencies = [ + "ring", + "untrusted", ] [[package]] @@ -2242,6 +2316,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "task-local-extensions" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8" +dependencies = [ + "pin-utils", +] + [[package]] name = "thiserror" version = "1.0.43" @@ -2358,13 +2441,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.4" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ "rustls", "tokio", - "webpki", ] [[package]] @@ -2580,23 +2662,6 @@ dependencies = [ "syn 2.0.26", ] -[[package]] -name = "tracing-awc" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa1a68fce4d1a7fad459f81ddcafbdd7c6f6bcda5c7e07d5f42db637931fac7" -dependencies = [ - "actix-http", - "actix-service", - "awc", - "bytes", - "futures-core", - "opentelemetry", - "pin-project-lite", - "tracing", - "tracing-opentelemetry", -] - [[package]] name = "tracing-core" version = "0.1.31" @@ -2701,6 +2766,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -2814,6 +2888,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -2843,6 +2929,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -2969,6 +3068,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index 5a1aaa9..44eeea8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.5.0-alpha.3" +version = "0.5.0-alpha.4" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -11,12 +11,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = [] -io-uring = [ - "actix-rt/io-uring", - "actix-server/io-uring", - "tokio-uring", - "sled/io_uring", -] +io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring", "sled/io_uring"] [dependencies] actix-form-data = "0.7.0-beta.4" @@ -25,7 +20,6 @@ actix-server = "2.0.0" actix-web = { version = "4.0.0", default-features = false } anyhow = "1.0" async-trait = "0.1.51" -awc = { version = "3.0.0", default-features = false, features = ["rustls"] } base64 = "0.21.0" clap = { version = "4.0.2", features = ["derive"] } color-eyre = "0.6" @@ -42,6 +36,9 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] } opentelemetry-otlp = "0.12" pin-project-lite = "0.2.7" quick-xml = { version = "0.29.0", features = ["serialize"] } +reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] } +reqwest-middleware = "0.2.2" +reqwest-tracing = { version = "0.4.5", features = ["opentelemetry_0_19"] } rusty-s3 = "0.4.1" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" @@ -79,8 +76,3 @@ uuid = { version = "1", features = ["v4", "serde"] } version = "0.7.5" default-features = false features = ["opentelemetry_0_19"] - -[dependencies.tracing-awc] -version = "0.1.7" -default-features = false -features = ["opentelemetry_0_19"] diff --git a/pict-rs.nix b/pict-rs.nix index f2ad990..e4f1c58 100644 --- a/pict-rs.nix +++ b/pict-rs.nix @@ -12,7 +12,7 @@ rustPlatform.buildRustPackage { pname = "pict-rs"; - version = "0.5.0-alpha.3"; + version = "0.5.0-alpha.4"; src = ./.; cargoLock = { diff --git a/pict-rs.toml b/pict-rs.toml index 978335e..b636438 100644 --- a/pict-rs.toml +++ b/pict-rs.toml @@ -31,13 +31,7 @@ max_file_count = 1 # environment variable: PICTRS__CLIENT__POOL_SIZE # default: 100 # -# This number is multiplied the number of cores available to pict-rs. Running on a 2 core machine -# with the default value will result in 200 pooled connections. Running on a 32 core machine with -# the default value will result in 3200 pooled connections. -# -# This number can be lowered to keep pict-rs within ulimit bounds if you encounter errors related to -# "Too many open files". Alternatively, increasing the ulimit of your system can solve this problem -# as well. +# Sets the maximum number of allowed idle connections per-host. pool_size = 100 ## Optional: time (in seconds) the client will wait for a response before giving up diff --git a/src/bytes_stream.rs b/src/bytes_stream.rs index 25ff6c9..8f13b4f 100644 --- a/src/bytes_stream.rs +++ b/src/bytes_stream.rs @@ -2,8 +2,10 @@ use actix_web::{ body::MessageBody, web::{Bytes, BytesMut}, }; +use futures_util::Stream; use std::{ collections::{vec_deque::IntoIter, VecDeque}, + convert::Infallible, pin::Pin, task::{Context, Poll}, }; @@ -76,3 +78,11 @@ impl MessageBody for BytesStream { Ok(self.into_bytes()) } } + +impl Stream for BytesStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().inner.pop_front().map(Ok)) + } +} diff --git a/src/error.rs b/src/error.rs index bb7b3e8..249c7b6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -82,6 +82,15 @@ pub(crate) enum UploadError { #[error("Error in exiftool")] Exiftool(#[from] crate::exiftool::ExifError), + #[error("Error building reqwest client")] + BuildClient(#[source] reqwest::Error), + + #[error("Error making request")] + RequestMiddleware(#[from] reqwest_middleware::Error), + + #[error("Error in request response")] + Request(#[from] reqwest::Error), + #[error("pict-rs is in read-only mode")] ReadOnly, @@ -115,12 +124,6 @@ pub(crate) enum UploadError { #[error("Unable to download image, bad response {0}")] Download(actix_web::http::StatusCode), - #[error("Unable to download image")] - Payload(#[from] awc::error::PayloadError), - - #[error("Unable to send request, {0}")] - SendRequest(String), - #[error("Tried to save an image with an already-taken name")] DuplicateAlias, @@ -140,12 +143,6 @@ pub(crate) enum UploadError { Timeout(#[from] crate::stream::TimeoutError), } -impl From for UploadError { - fn from(e: awc::error::SendRequestError) -> Self { - UploadError::SendRequest(e.to_string()) - } -} - impl From for UploadError { fn from(_: actix_web::error::BlockingError) -> Self { UploadError::Canceled diff --git a/src/lib.rs b/src/lib.rs index 4993061..dbf6a2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,6 @@ use actix_web::{ http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; -use awc::{Client, Connector}; use formats::InputProcessableFormat; use futures_util::{ stream::{empty, once}, @@ -41,6 +40,8 @@ use futures_util::{ }; use once_cell::sync::{Lazy, OnceCell}; use repo::sled::SledRepo; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; use std::{ future::ready, @@ -51,7 +52,6 @@ use std::{ }; use tokio::sync::Semaphore; use tracing_actix_web::TracingLogger; -use tracing_awc::Tracing; use tracing_futures::Instrument; use self::{ @@ -470,7 +470,7 @@ struct UrlQuery { /// download an image from a URL #[tracing::instrument(name = "Downloading file", skip(client, repo, store))] async fn download( - client: web::Data, + client: web::Data, repo: web::Data, store: web::Data, query: web::Query, @@ -486,6 +486,7 @@ async fn download( } let stream = res + .bytes_stream() .map_err(Error::from) .limit((CONFIG.media.max_file_size * MEGABYTES) as u64); @@ -1182,15 +1183,17 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { error } -fn build_client() -> awc::Client { - let connector = Connector::new().limit(CONFIG.client.pool_size); +fn build_client() -> Result { + let client = reqwest::Client::builder() + .user_agent("pict-rs v0.5.0-main") + .use_rustls_tls() + .pool_max_idle_per_host(CONFIG.client.pool_size) + .build() + .map_err(UploadError::BuildClient)?; - Client::builder() - .connector(connector) - .wrap(Tracing) - .add_default_header(("User-Agent", "pict-rs v0.4.1")) - .timeout(Duration::from_secs(CONFIG.client.timeout)) - .finish() + Ok(ClientBuilder::new(client) + .with(TracingMiddleware::default()) + .build()) } fn next_worker_id() -> String { @@ -1209,7 +1212,7 @@ fn configure_endpoints< config: &mut web::ServiceConfig, repo: R, store: S, - client: Client, + client: ClientWithMiddleware, extra_config: F, ) { config @@ -1301,10 +1304,11 @@ where async fn launch_file_store( repo: R, store: FileStore, + client: ClientWithMiddleware, extra_config: F, ) -> std::io::Result<()> { HttpServer::new(move || { - let client = build_client(); + let client = client.clone(); let store = store.clone(); let repo = repo.clone(); @@ -1328,10 +1332,11 @@ async fn launch_object_store< >( repo: R, store_config: ObjectStoreConfig, + client: ClientWithMiddleware, extra_config: F, ) -> std::io::Result<()> { HttpServer::new(move || { - let client = build_client(); + let client = client.clone(); let store = store_config.clone().build(client.clone()); let repo = repo.clone(); @@ -1351,7 +1356,7 @@ async fn launch_object_store< async fn migrate_inner( repo: Repo, - client: Client, + client: ClientWithMiddleware, from: S1, to: config::primitives::Store, skip_missing_files: bool, @@ -1481,6 +1486,7 @@ fn sled_extra_config(sc: &mut web::ServiceConfig) { pub async fn run() -> color_eyre::Result<()> { let repo = Repo::open(CONFIG.repo.clone())?; repo.migrate_from_db(CONFIG.old_db.path.clone()).await?; + let client = build_client()?; match (*OPERATION).clone() { Operation::Run => (), @@ -1489,8 +1495,6 @@ pub async fn run() -> color_eyre::Result<()> { from, to, } => { - let client = build_client(); - match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; @@ -1551,7 +1555,7 @@ pub async fn run() -> color_eyre::Result<()> { .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; - launch_file_store(sled_repo, store, sled_extra_config).await?; + launch_file_store(sled_repo, store, client, sled_extra_config).await?; } } } @@ -1592,7 +1596,7 @@ pub async fn run() -> color_eyre::Result<()> { .requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; - launch_object_store(sled_repo, store, sled_extra_config).await?; + launch_object_store(sled_repo, store, client, sled_extra_config).await?; } } } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 287c2ae..737df93 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -5,16 +5,17 @@ use crate::{ }; use actix_rt::task::JoinError; use actix_web::{ - error::{BlockingError, PayloadError}, + error::BlockingError, http::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, }, web::Bytes, }; -use awc::{error::SendRequestError, Client, ClientRequest, ClientResponse, SendClientRequest}; use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::{Stream, StreamExt, TryStreamExt}; +use reqwest::{header::RANGE, Body, Response}; +use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; @@ -46,8 +47,11 @@ pub(crate) enum ObjectError { #[error("IO Error")] IO(#[from] std::io::Error), - #[error("Error making request: {0}")] - SendRequest(String), + #[error("Error making request")] + RequestMiddleware(#[from] reqwest_middleware::Error), + + #[error("Error in request response")] + Request(#[from] reqwest::Error), #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -66,15 +70,6 @@ pub(crate) enum ObjectError { #[error("Invalid status: {0}\n{1}")] Status(StatusCode, String), - - #[error("Unable to upload image")] - Upload(awc::error::PayloadError), -} - -impl From for ObjectError { - fn from(e: SendRequestError) -> Self { - Self::SendRequest(e.to_string()) - } } impl From for ObjectError { @@ -95,7 +90,7 @@ pub(crate) struct ObjectStore { repo: Repo, bucket: Bucket, credentials: Credentials, - client: Client, + client: ClientWithMiddleware, signature_expiration: Duration, client_timeout: Duration, public_endpoint: Option, @@ -123,7 +118,7 @@ struct InitiateMultipartUploadResponse { } impl ObjectStoreConfig { - pub(crate) fn build(self, client: Client) -> ObjectStore { + pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore { ObjectStore { path_gen: self.path_gen, repo: self.repo, @@ -137,11 +132,8 @@ impl ObjectStoreConfig { } } -fn payload_to_io_error(e: PayloadError) -> std::io::Error { - match e { - PayloadError::Io(io) => io, - otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()), - } +fn payload_to_io_error(e: reqwest::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) } #[tracing::instrument(skip(stream))] @@ -162,15 +154,15 @@ where Ok(buf) } -async fn status_error(mut response: ClientResponse) -> StoreError { - let body = match response.body().await { - Err(e) => return ObjectError::Upload(e).into(), +async fn status_error(response: Response) -> StoreError { + let status = response.status(); + + let body = match response.text().await { + Err(e) => return ObjectError::Request(e).into(), Ok(body) => body, }; - let body = String::from_utf8_lossy(&body).to_string(); - - ObjectError::Status(response.status(), body).into() + ObjectError::Status(status, body).into() } #[async_trait::async_trait(?Send)] @@ -220,7 +212,8 @@ impl Store for ObjectStore { drop(stream); let (req, object_id) = self.put_object_request(content_type).await?; let response = req - .send_body(first_chunk) + .body(Body::wrap_stream(first_chunk)) + .send() .await .map_err(ObjectError::from)?; @@ -234,13 +227,13 @@ impl Store for ObjectStore { let mut first_chunk = Some(first_chunk); let (req, object_id) = self.create_multipart_request(content_type).await?; - let mut response = req.send().await.map_err(ObjectError::from)?; + let response = req.send().await.map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); } - let body = response.body().await.map_err(ObjectError::Upload)?; + let body = response.bytes().await.map_err(ObjectError::Request)?; let body: InitiateMultipartUploadResponse = quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; let upload_id = &body.upload_id; @@ -276,7 +269,8 @@ impl Store for ObjectStore { &upload_id2, ) .await? - .send_body(buf) + .body(Body::wrap_stream(buf)) + .send() .await .map_err(ObjectError::from)?; @@ -348,7 +342,7 @@ impl Store for ObjectStore { ) -> Result { let (req, object_id) = self.put_object_request(content_type).await?; - let response = req.send_body(bytes).await.map_err(ObjectError::from)?; + let response = req.body(bytes).send().await.map_err(ObjectError::from)?; if !response.status().is_success() { return Err(status_error(response).await); @@ -381,7 +375,9 @@ impl Store for ObjectStore { return Err(status_error(response).await); } - Ok(Box::pin(response.map_err(payload_to_io_error))) + Ok(Box::pin( + response.bytes_stream().map_err(payload_to_io_error), + )) } #[tracing::instrument(skip(self, writer))] @@ -393,7 +389,7 @@ impl Store for ObjectStore { where Writer: AsyncWrite + Unpin, { - let mut response = self + let response = self .get_object_request(identifier, None, None) .send() .await @@ -406,7 +402,9 @@ impl Store for ObjectStore { )); } - while let Some(res) = response.next().await { + let mut stream = response.bytes_stream(); + + while let Some(res) = stream.next().await { let mut bytes = res.map_err(payload_to_io_error)?; writer.write_all_buf(&mut bytes).await?; } @@ -489,7 +487,7 @@ impl ObjectStore { }) } - async fn head_bucket_request(&self) -> Result { + async fn head_bucket_request(&self) -> Result { let action = self.bucket.head_bucket(Some(&self.credentials)); Ok(self.build_request(action)) @@ -498,7 +496,7 @@ impl ObjectStore { async fn put_object_request( &self, content_type: mime::Mime, - ) -> Result<(ClientRequest, ObjectId), StoreError> { + ) -> Result<(RequestBuilder, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self.bucket.put_object(Some(&self.credentials), &path); @@ -513,7 +511,7 @@ impl ObjectStore { async fn create_multipart_request( &self, content_type: mime::Mime, - ) -> Result<(ClientRequest, ObjectId), StoreError> { + ) -> Result<(RequestBuilder, ObjectId), StoreError> { let path = self.next_file().await?; let mut action = self @@ -533,7 +531,7 @@ impl ObjectStore { object_id: &ObjectId, part_number: u16, upload_id: &str, - ) -> Result { + ) -> Result { use md5::Digest; let mut action = self.bucket.upload_part( @@ -571,12 +569,12 @@ impl ObjectStore { Ok(self.build_request(action)) } - fn send_complete_multipart_request<'a, I: Iterator>( + async fn send_complete_multipart_request<'a, I: Iterator>( &'a self, object_id: &'a ObjectId, upload_id: &'a str, etags: I, - ) -> SendClientRequest { + ) -> Result { let mut action = self.bucket.complete_multipart_upload( Some(&self.credentials), object_id.as_str(), @@ -590,14 +588,14 @@ impl ObjectStore { let (req, action) = self.build_request_inner(action); - req.send_body(action.body()) + req.body(action.body()).send().await } fn create_abort_multipart_request( &self, object_id: &ObjectId, upload_id: &str, - ) -> ClientRequest { + ) -> RequestBuilder { let action = self.bucket.abort_multipart_upload( Some(&self.credentials), object_id.as_str(), @@ -607,18 +605,18 @@ impl ObjectStore { self.build_request(action) } - fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> RequestBuilder { let (req, _) = self.build_request_inner(action); req } - fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) { + fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (RequestBuilder, A) { let method = match A::METHOD { - rusty_s3::Method::Head => awc::http::Method::HEAD, - rusty_s3::Method::Get => awc::http::Method::GET, - rusty_s3::Method::Post => awc::http::Method::POST, - rusty_s3::Method::Put => awc::http::Method::PUT, - rusty_s3::Method::Delete => awc::http::Method::DELETE, + rusty_s3::Method::Head => reqwest::Method::HEAD, + rusty_s3::Method::Get => reqwest::Method::GET, + rusty_s3::Method::Post => reqwest::Method::POST, + rusty_s3::Method::Put => reqwest::Method::PUT, + rusty_s3::Method::Delete => reqwest::Method::DELETE, }; let url = action.sign(self.signature_expiration); @@ -631,7 +629,7 @@ impl ObjectStore { let req = action .headers_mut() .iter() - .fold(req, |req, tup| req.insert_header(tup)); + .fold(req, |req, (name, value)| req.header(name, value)); (req, action) } @@ -641,7 +639,7 @@ impl ObjectStore { identifier: &ObjectId, from_start: Option, len: Option, - ) -> ClientRequest { + ) -> RequestBuilder { let action = self .bucket .get_object(Some(&self.credentials), identifier.as_str()); @@ -651,14 +649,18 @@ impl ObjectStore { let start = from_start.unwrap_or(0); let end = len.map(|len| start + len - 1); - req.insert_header(Range::Bytes(vec![if let Some(end) = end { - ByteRangeSpec::FromTo(start, end) - } else { - ByteRangeSpec::From(start) - }])) + req.header( + RANGE, + Range::Bytes(vec![if let Some(end) = end { + ByteRangeSpec::FromTo(start, end) + } else { + ByteRangeSpec::From(start) + }]) + .to_string(), + ) } - fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest { + fn head_object_request(&self, identifier: &ObjectId) -> RequestBuilder { let action = self .bucket .head_object(Some(&self.credentials), identifier.as_str()); @@ -666,7 +668,7 @@ impl ObjectStore { self.build_request(action) } - fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest { + fn delete_object_request(&self, identifier: &ObjectId) -> RequestBuilder { let action = self .bucket .delete_object(Some(&self.credentials), identifier.as_str());