diff --git a/Cargo.lock b/Cargo.lock index 8258e3f..c7b8043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,6 +254,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.14" @@ -328,7 +343,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -339,7 +354,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -534,6 +549,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets 0.52.5", +] + [[package]] name = "clap" version = "4.5.7" @@ -565,7 +593,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -673,6 +701,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -759,7 +803,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -782,7 +826,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.68", ] [[package]] @@ -824,7 +868,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -836,7 +880,7 @@ dependencies = [ "diesel_table_macro_syntax", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -845,7 +889,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" dependencies = [ - "syn", + "syn 2.0.68", ] [[package]] @@ -859,6 +903,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "either" version = "1.12.0" @@ -937,6 +987,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -978,7 +1043,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -999,6 +1064,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1283,6 +1349,7 @@ dependencies = [ "hyper 1.3.1", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1321,6 +1388,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -1656,12 +1746,48 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "futures", + "humantime", + "hyper 1.3.1", + "itertools", + "md-5", + "parking_lot 0.12.3", + "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "opentelemetry" version = "0.23.0" @@ -1868,6 +1994,7 @@ dependencies = [ "metrics-exporter-prometheus", "mime", "nanorand", + "object_store", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -1879,7 +2006,6 @@ dependencies = [ "rustls", "rustls-channel-resolver", "rustls-pemfile", - "rusty-s3", "serde", "serde-tuple-vec-map", "serde_json", @@ -1924,7 +2050,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2033,7 +2159,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2062,14 +2188,61 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.30.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" dependencies = [ "memchr", "serde", ] +[[package]] +name = "quinn" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ceeeeabace7857413798eb1ffa1e9c905a9946a57d81fb69b4b71c4d8eb3ad" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash", + "rustls", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46" +dependencies = [ + "libc", + "once_cell", + "socket2 0.5.7", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -2188,7 +2361,7 @@ dependencies = [ "quote", "refinery-core", "regex", - "syn", + "syn 2.0.68", ] [[package]] @@ -2251,6 +2424,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", "http-body-util", @@ -2264,7 +2438,9 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", "rustls", + "rustls-native-certs", "rustls-pemfile", "rustls-pki-types", "serde", @@ -2356,6 +2532,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2390,6 +2572,19 @@ dependencies = [ "rustls", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -2423,25 +2618,6 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" -[[package]] -name = "rusty-s3" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31aa883f1b986a5249641e574ca0e11ac4fb9970b009c6fbb96fedaf4fa78db8" -dependencies = [ - "base64 0.21.7", - "hmac", - "md-5", - "percent-encoding", - "quick-xml", - "serde", - "serde_json", - "sha2", - "time", - "url", - "zeroize", -] - [[package]] name = "ryu" version = "1.0.18" @@ -2457,6 +2633,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scoped-futures" version = "0.1.3" @@ -2473,6 +2658,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +dependencies = [ + "bitflags 2.5.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.23" @@ -2505,7 +2713,7 @@ checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2639,6 +2847,28 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "socket2" version = "0.4.10" @@ -2708,6 +2938,17 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d0208408ba0c3df17ed26eb06992cb1a1268d41b2c0e12e65203fbe3972cee5" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.68" @@ -2748,7 +2989,7 @@ checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2825,7 +3066,7 @@ checksum = "8d9ef545650e79f30233c0003bcc2504d7efac6dad25fca40744de773fe2049c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2866,7 +3107,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3087,7 +3328,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3308,7 +3549,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.68", "wasm-bindgen-shared", ] @@ -3342,7 +3583,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3437,6 +3678,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -3633,7 +3883,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3653,5 +3903,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] diff --git a/Cargo.toml b/Cargo.toml index 7169298..b418032 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ metrics = "0.23.0" metrics-exporter-prometheus = { version = "0.15.0", default-features = false, features = ["http-listener"] } mime = "0.3.17" nanorand = { version = "0.7.0", optional = true } -# object_store = { version = "0.10.1", features = ["aws"] } +object_store = { version = "0.10.1", features = ["aws"] } opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] } opentelemetry = "0.23.0" opentelemetry-otlp = "0.16.0" @@ -57,7 +57,6 @@ rustls = { version = "0.23.10", default-features = false, features = ["logging", rustls-channel-resolver = "0.3.0" # pinned to rustls rustls-pemfile = "2.1.2" -rusty-s3 = "0.5.0" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" serde-tuple-vec-map = "1.0.1" diff --git a/deny.toml b/deny.toml index 11ab22d..1bde664 100644 --- a/deny.toml +++ b/deny.toml @@ -220,13 +220,11 @@ skip = [ "matchit", "parking_lot", "parking_lot_core", - "quick-xml", "regex-automata", "regex-syntax", "siphasher", "syn", "sync_wrapper", - "untrusted", # Ignore duplicates for systems we don't target "redox_syscall", diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 020b9e9..ed42305 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -201,9 +201,9 @@ pub(super) struct FilesystemDefaults { #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(super) struct ObjectStorageDefaults { - signature_duration: u64, + pub(super) signature_duration: u64, - client_timeout: u64, + pub(super) client_timeout: u64, } impl Default for ServerDefaults { diff --git a/src/config/file.rs b/src/config/file.rs index 87d0809..fd4d00e 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -95,6 +95,27 @@ pub(crate) struct ObjectStorage { pub(crate) public_endpoint: Option, } +impl From for ObjectStorage { + fn from(value: crate::config::primitives::ObjectStorage) -> Self { + let defaults = crate::config::defaults::ObjectStorageDefaults::default(); + + Self { + endpoint: value.endpoint, + use_path_style: value.use_path_style, + bucket_name: value.bucket_name, + region: value.region, + access_key: value.access_key, + secret_key: value.secret_key, + session_token: value.session_token, + signature_duration: value + .signature_duration + .unwrap_or(defaults.signature_duration), + client_timeout: value.client_timeout.unwrap_or(defaults.client_timeout), + public_endpoint: value.public_endpoint, + } + } +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] diff --git a/src/error_code.rs b/src/error_code.rs index 5e3f2ef..f11b919 100644 --- a/src/error_code.rs +++ b/src/error_code.rs @@ -45,9 +45,6 @@ impl ErrorCode { pub(crate) const OBJECT_IO_ERROR: ErrorCode = ErrorCode { code: "object-io-error", }; - pub(crate) const PARSE_OBJECT_ID_ERROR: ErrorCode = ErrorCode { - code: "parse-object-id-error", - }; pub(crate) const PANIC: ErrorCode = ErrorCode { code: "panic" }; pub(crate) const ALREADY_CLAIMED: ErrorCode = ErrorCode { code: "already-claimed", diff --git a/src/lib.rs b/src/lib.rs index 4a516e0..766a69f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,6 @@ use metrics_exporter_prometheus::PrometheusBuilder; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rustls_channel_resolver::ChannelSender; -use rusty_s3::UrlStyle; use std::{ marker::PhantomData, path::Path, @@ -1833,36 +1832,8 @@ where migrate_store(from, to, skip_missing_files, concurrency).await? } - config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { - endpoint, - bucket_name, - use_path_style, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - }) => { - let store = ObjectStore::build( - endpoint.clone(), - bucket_name, - if use_path_style { - UrlStyle::Path - } else { - UrlStyle::VirtualHost - }, - region, - access_key, - secret_key, - session_token, - signature_duration.unwrap_or(15), - client_timeout.unwrap_or(30), - public_endpoint, - ) - .await? - .build(client.clone()); + config::primitives::Store::ObjectStorage(object_config) => { + let store = ObjectStore::new(object_config.into()).await?; let to = State { config, @@ -2087,38 +2058,8 @@ impl PictRsConfiguration { ) .await?; } - config::primitives::Store::ObjectStorage( - config::primitives::ObjectStorage { - endpoint, - bucket_name, - use_path_style, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - }, - ) => { - let from = ObjectStore::build( - endpoint, - bucket_name, - if use_path_style { - UrlStyle::Path - } else { - UrlStyle::VirtualHost - }, - region, - access_key, - secret_key, - session_token, - signature_duration.unwrap_or(15), - client_timeout.unwrap_or(30), - public_endpoint, - ) - .await? - .build(client.clone()); + config::primitives::Store::ObjectStorage(object_config) => { + let from = ObjectStore::new(object_config.into()).await?; migrate_inner( config, @@ -2187,38 +2128,10 @@ impl PictRsConfiguration { } } } - config::Store::ObjectStorage(config::ObjectStorage { - endpoint, - bucket_name, - use_path_style, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - }) => { + config::Store::ObjectStorage(object_config) => { let arc_repo = repo.to_arc(); - let store = ObjectStore::build( - endpoint, - bucket_name, - if use_path_style { - UrlStyle::Path - } else { - UrlStyle::VirtualHost - }, - region, - access_key, - secret_key, - session_token, - signature_duration, - client_timeout, - public_endpoint, - ) - .await? - .build(client.clone()); + let store = ObjectStore::new(object_config).await?; let state = State { tmp_dir: tmp_dir.clone(), diff --git a/src/store.rs b/src/store.rs index 9da3bb9..e37d64e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -71,10 +71,8 @@ impl From for StoreError { impl From for StoreError { fn from(value: crate::store::object_store::ObjectError) -> Self { match value { - e @ crate::store::object_store::ObjectError::Status( - reqwest::StatusCode::NOT_FOUND, - _, - _, + e @ crate::store::object_store::ObjectError::Request( + ::object_store::Error::NotFound { .. }, ) => Self::ObjectNotFound(e), e => Self::ObjectStore(e), } diff --git a/src/store/object_store.rs b/src/store/object_store.rs index e0252dc..febb650 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,27 +1,17 @@ use crate::{ bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, store::Store, - stream::LocalBoxStream, sync::DropHandle, + stream::LocalBoxStream, }; -use actix_web::{ - error::BlockingError, - http::header::{ByteRangeSpec, Range}, - rt::task::JoinError, - web::Bytes, -}; -use base64::{prelude::BASE64_STANDARD, Engine}; +use actix_web::{rt::task::JoinError, web::Bytes}; use futures_core::Stream; -use reqwest::{ - header::{CONTENT_LENGTH, RANGE}, - Body, Response, StatusCode, +use object_store::{ + aws::{AmazonS3, AmazonS3Builder}, + path::Path, + Attribute, AttributeValue, Attributes, GetOptions, ObjectStore as ObjectStoreTrait, PutMode, + PutMultipartOpts, PutOptions, PutPayload, PutPayloadMut, }; -use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; -use rusty_s3::{ - actions::{CreateMultipartUpload, S3Action}, - Bucket, BucketError, Credentials, UrlStyle, -}; -use std::{string::FromUtf8Error, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use streem::IntoStreamer; -use tracing::Instrument; use url::Url; use super::StoreError; @@ -30,72 +20,24 @@ const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); #[derive(Debug, thiserror::Error)] pub(crate) enum ObjectError { - #[error("Failed to generate request")] - S3(#[from] BucketError), - #[error("IO Error")] IO(#[from] std::io::Error), - #[error("Error making request")] - RequestMiddleware(#[from] reqwest_middleware::Error), - #[error("Error in request response")] - Request(#[from] reqwest::Error), + Request(#[from] object_store::Error), - #[error("Failed to parse string")] - Utf8(#[from] FromUtf8Error), - - #[error("Failed to parse xml")] - Xml(#[source] XmlError), - - #[error("Invalid length")] - Length, - - #[error("Invalid etag response")] - Etag, + #[error("Failed to build object store client")] + BuildClient(#[source] object_store::Error), #[error("Task cancelled")] Canceled, - - #[error("Invalid status {0} for {2:?} - {1}")] - Status(StatusCode, String, Option>), -} - -#[derive(Debug)] -pub(crate) struct XmlError { - inner: Box, -} - -impl XmlError { - fn new(e: E) -> Self { - XmlError { inner: Box::new(e) } - } -} - -impl std::fmt::Display for XmlError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.inner.fmt(f) - } -} - -impl std::error::Error for XmlError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.inner.source() - } } impl ObjectError { pub(super) const fn error_code(&self) -> ErrorCode { match self { - Self::S3(_) - | Self::RequestMiddleware(_) - | Self::Request(_) - | Self::Xml(_) - | Self::Length - | Self::Etag - | Self::Status(_, _, _) => ErrorCode::OBJECT_REQUEST_ERROR, + Self::BuildClient(_) | Self::Request(_) => ErrorCode::OBJECT_REQUEST_ERROR, Self::IO(_) => ErrorCode::OBJECT_IO_ERROR, - Self::Utf8(_) => ErrorCode::PARSE_OBJECT_ID_ERROR, Self::Canceled => ErrorCode::PANIC, } } @@ -107,48 +49,12 @@ impl From for ObjectError { } } -impl From for ObjectError { - fn from(_: BlockingError) -> Self { - Self::Canceled - } -} - #[derive(Clone)] pub(crate) struct ObjectStore { - bucket: Bucket, - credentials: Credentials, - client: ClientWithMiddleware, - signature_expiration: Duration, - client_timeout: Duration, + s3_client: Arc, public_endpoint: Option, } -#[derive(Clone)] -pub(crate) struct ObjectStoreConfig { - bucket: Bucket, - credentials: Credentials, - signature_expiration: u64, - client_timeout: u64, - public_endpoint: Option, -} - -impl ObjectStoreConfig { - pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore { - ObjectStore { - bucket: self.bucket, - credentials: self.credentials, - client, - signature_expiration: Duration::from_secs(self.signature_expiration), - client_timeout: Duration::from_secs(self.client_timeout), - public_endpoint: self.public_endpoint, - } - } -} - -fn payload_to_io_error(e: reqwest::Error) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) -} - #[tracing::instrument(level = "debug", skip(stream))] async fn read_chunk(stream: &mut S) -> Result where @@ -171,38 +77,44 @@ where tracing::debug!( "BytesStream with {} chunks, avg length {}", buf.chunks_len(), - buf.len() / buf.chunks_len() + buf.len() / buf.chunks_len().max(1) ); Ok(buf) } -async fn status_error(response: Response, object: Option>) -> StoreError { - let status = response.status(); +impl From for PutPayload { + fn from(value: BytesStream) -> Self { + let mut payload = PutPayloadMut::new(); - let body = match response.text().await { - Err(e) => return ObjectError::Request(e).into(), - Ok(body) => body, - }; + for bytes in value { + payload.push(bytes); + } - ObjectError::Status(status, body, object).into() + payload.freeze() + } } impl Store for ObjectStore { async fn health_check(&self) -> Result<(), StoreError> { - let response = self - .head_bucket_request() - .await? - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_HEAD_BUCKET_REQUEST) - .await - .map_err(ObjectError::from)?; + let res = self + .s3_client + .put_opts( + &Path::from("health-check"), + PutPayload::new(), + PutOptions { + mode: PutMode::Overwrite, + ..Default::default() + }, + ) + .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + .await; - if !response.status().is_success() { - return Err(status_error(response, None).await); + match res { + Ok(_) => Ok(()), + Err(object_store::Error::NotModified { .. }) => Ok(()), + Err(e) => Err(ObjectError::Request(e).into()), } - - Ok(()) } #[tracing::instrument(skip_all)] @@ -215,68 +127,97 @@ impl Store for ObjectStore { where S: Stream>, { - match self - .start_upload( - crate::stream::error_injector(stream), - content_type.clone(), - extension, - ) - .await? - { - UploadState::Single(first_chunk) => { - let (req, object_id) = self - .put_object_request(first_chunk.len(), content_type, extension) - .await?; + let mut stream = std::pin::pin!(stream); + let first_chunk = read_chunk(&mut stream).await?; - let response = req - .body(Body::wrap_stream(first_chunk.into_io_stream())) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + let object_id: Arc = Arc::from(self.next_file(extension)); + let path = Path::from(object_id.as_ref()); + + let mut attributes = Attributes::new(); + attributes.insert( + Attribute::ContentType, + AttributeValue::from(content_type.to_string()), + ); + + if first_chunk.len() < CHUNK_SIZE { + self.s3_client + .put_opts( + &path, + first_chunk.into(), + PutOptions { + attributes, + ..Default::default() + }, + ) + .with_metrics(crate::init_metrics::OBJECT_STORAGE_PUT_OBJECT_REQUEST) + .await + .map_err(ObjectError::from)?; + + return Ok(object_id); + } + + let mut multipart = self + .s3_client + .put_multipart_opts( + &path, + PutMultipartOpts { + attributes, + ..Default::default() + }, + ) + .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + + let res = async { + let mut first_chunk = Some(first_chunk); + let mut complete = false; + + let mut futures = Vec::new(); + + while !complete { + let buf = if let Some(chunk) = first_chunk.take() { + chunk + } else { + read_chunk(&mut stream).await? + }; + + complete = buf.len() < CHUNK_SIZE; + + futures.push(crate::sync::abort_on_drop(crate::sync::spawn( + "put-multipart-part", + multipart.put_part(buf.into()).with_metrics( + crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, + ), + ))); + } + + for future in futures { + future + .await + .map_err(ObjectError::from)? + .map_err(ObjectError::from)?; + } + + multipart + .complete() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_COMPLETE_MULTIPART_REQUEST) + .await + .map_err(ObjectError::from)?; + + Ok(()) + } + .await; + + match res { + Ok(()) => Ok(object_id), + Err(e) => { + multipart + .abort() + .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) .await .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - return Ok(object_id); - } - UploadState::Multi(object_id, upload_id, futures) => { - // hack-ish: use async block as Result boundary - let res = async { - let mut etags = Vec::new(); - - for future in futures { - etags.push(future.await.map_err(ObjectError::from)??); - } - - let response = self - .send_complete_multipart_request( - &object_id, - &upload_id, - etags.iter().map(|s| s.as_ref()), - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - Ok(()) as Result<(), StoreError> - } - .await; - - if let Err(e) = res { - self.create_abort_multipart_request(&object_id, &upload_id) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - return Err(e); - } - - Ok(object_id) + Err(e) } } } @@ -299,429 +240,112 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result>, StoreError> { - let response = self - .get_object_request(identifier, from_start, len) - .send() + let from_start = from_start.map(|u| u as usize); + let len = len.map(|u| u as usize); + + let range = match (from_start, len) { + (Some(start), Some(length)) => Some((start..start + length).into()), + (Some(start), None) => Some((start..).into()), + (None, Some(length)) => Some((..length).into()), + (None, None) => None, + }; + + let path = Path::from(identifier.as_ref()); + + let get_result = self + .s3_client + .get_opts( + &path, + GetOptions { + range, + ..Default::default() + }, + ) .with_metrics(crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST) .await .map_err(ObjectError::from)?; - if !response.status().is_success() { - return Err(status_error(response, Some(identifier.clone())).await); - } - - Ok(Box::pin(crate::stream::error_injector( - crate::stream::metrics( - crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST_STREAM, - crate::stream::map_err(response.bytes_stream(), payload_to_io_error), - ), + Ok(Box::pin(crate::stream::metrics( + crate::init_metrics::OBJECT_STORAGE_GET_OBJECT_REQUEST_STREAM, + crate::stream::map_err(get_result.into_stream(), |e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + }), ))) } #[tracing::instrument(skip(self))] async fn len(&self, identifier: &Arc) -> Result { - let response = self - .head_object_request(identifier) - .send() + let path = Path::from(identifier.as_ref()); + + let object_meta = self + .s3_client + .head(&path) .with_metrics(crate::init_metrics::OBJECT_STORAGE_HEAD_OBJECT_REQUEST) .await .map_err(ObjectError::from)?; - if !response.status().is_success() { - return Err(status_error(response, Some(identifier.clone())).await); - } - - let length = response - .headers() - .get(CONTENT_LENGTH) - .ok_or(ObjectError::Length)? - .to_str() - .map_err(|_| ObjectError::Length)? - .parse::() - .map_err(|_| ObjectError::Length)?; - - Ok(length) + Ok(object_meta.size as _) } #[tracing::instrument(skip(self))] async fn remove(&self, identifier: &Arc) -> Result<(), StoreError> { - let response = self - .delete_object_request(identifier) - .send() + let path = Path::from(identifier.as_ref()); + + self.s3_client + .delete(&path) .with_metrics(crate::init_metrics::OBJECT_STORAGE_DELETE_OBJECT_REQUEST) .await .map_err(ObjectError::from)?; - if !response.status().is_success() { - return Err(status_error(response, Some(identifier.clone())).await); - } - Ok(()) } } -enum UploadState { - Single(BytesStream), - Multi( - Arc, - String, - Vec>>, - ), -} - impl ObjectStore { #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(access_key, secret_key, session_token))] - pub(crate) async fn build( - endpoint: Url, - bucket_name: String, - url_style: UrlStyle, - region: String, - access_key: String, - secret_key: String, - session_token: Option, - signature_expiration: u64, - client_timeout: u64, - public_endpoint: Option, - ) -> Result { - Ok(ObjectStoreConfig { - bucket: Bucket::new(endpoint, url_style, bucket_name, region) - .map_err(ObjectError::from)?, - credentials: if let Some(token) = session_token { - Credentials::new_with_token(access_key, secret_key, token) - } else { - Credentials::new(access_key, secret_key) - }, - signature_expiration, + pub(crate) async fn new( + crate::config::ObjectStorage { + endpoint, + bucket_name, + use_path_style, + region, + access_key, + secret_key, + session_token, client_timeout, public_endpoint, - }) - } + signature_duration: _, + }: crate::config::ObjectStorage, + ) -> Result { + let https = endpoint.scheme() == "https"; - #[tracing::instrument(skip_all)] - async fn start_upload( - &self, - stream: S, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result - where - S: Stream>, - { - let mut stream = std::pin::pin!(stream); + let client_options = object_store::ClientOptions::new() + .with_timeout(Duration::from_secs(client_timeout)) + .with_allow_http(!https); - let first_chunk = read_chunk(&mut stream).await?; + let builder = AmazonS3Builder::new() + .with_endpoint(endpoint) + .with_bucket_name(bucket_name) + .with_virtual_hosted_style_request(!use_path_style) + .with_region(region) + .with_access_key_id(access_key) + .with_secret_access_key(secret_key) + .with_allow_http(!https) + .with_client_options(client_options); - if first_chunk.len() < CHUNK_SIZE { - return Ok(UploadState::Single(first_chunk)); - } - - let mut first_chunk = Some(first_chunk); - - let (req, object_id) = self - .create_multipart_request(content_type, extension) - .await?; - let response = req - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_CREATE_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let body = response.text().await.map_err(ObjectError::Request)?; - let body = CreateMultipartUpload::parse_response(&body) - .map_err(XmlError::new) - .map_err(ObjectError::Xml)?; - let upload_id = body.upload_id(); - - // hack-ish: use async block as Result boundary - let res = async { - let mut complete = false; - let mut part_number = 0; - let mut futures = Vec::new(); - - while !complete { - tracing::trace!("save_stream: looping"); - - part_number += 1; - - let buf = if let Some(buf) = first_chunk.take() { - buf - } else { - read_chunk(&mut stream).await? - }; - - complete = buf.len() < CHUNK_SIZE; - - let this = self.clone(); - - let object_id2 = object_id.clone(); - let upload_id2 = upload_id.to_string(); - let handle = crate::sync::abort_on_drop(crate::sync::spawn( - "upload-multipart-part", - async move { - let response = this - .create_upload_part_request( - buf.clone(), - &object_id2, - part_number, - &upload_id2, - ) - .await? - .body(Body::wrap_stream(buf.into_io_stream())) - .send() - .with_metrics( - crate::init_metrics::OBJECT_STORAGE_CREATE_UPLOAD_PART_REQUEST, - ) - .await - .map_err(ObjectError::from)?; - - if !response.status().is_success() { - return Err(status_error(response, None).await); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - // early-drop response to close its tracing spans - drop(response); - - Ok(etag) as Result - } - .instrument(tracing::Span::current()), - )); - - futures.push(handle); - } - - Ok(futures) - } - .await; - - match res { - Ok(futures) => Ok(UploadState::Multi( - object_id, - upload_id.to_string(), - futures, - )), - Err(e) => { - self.create_abort_multipart_request(&object_id, upload_id) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_ABORT_MULTIPART_REQUEST) - .await - .map_err(ObjectError::from)?; - Err(e) - } - } - } - - async fn head_bucket_request(&self) -> Result { - let action = self.bucket.head_bucket(Some(&self.credentials)); - - Ok(self.build_request(action)) - } - - async fn put_object_request( - &self, - length: usize, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result<(RequestBuilder, Arc), StoreError> { - let path = self.next_file(extension); - - let mut action = self.bucket.put_object(Some(&self.credentials), &path); - - action - .headers_mut() - .insert("content-type", content_type.as_ref()); - action - .headers_mut() - .insert("content-length", length.to_string()); - - Ok((self.build_request(action), Arc::from(path))) - } - - async fn create_multipart_request( - &self, - content_type: mime::Mime, - extension: Option<&str>, - ) -> Result<(RequestBuilder, Arc), StoreError> { - let path = self.next_file(extension); - - let mut action = self - .bucket - .create_multipart_upload(Some(&self.credentials), &path); - - action - .headers_mut() - .insert("content-type", content_type.as_ref()); - - Ok((self.build_request(action), Arc::from(path))) - } - - async fn create_upload_part_request( - &self, - buf: BytesStream, - object_id: &Arc, - part_number: u16, - upload_id: &str, - ) -> Result { - use md5::Digest; - - let mut action = self.bucket.upload_part( - Some(&self.credentials), - object_id.as_ref(), - part_number, - upload_id, - ); - - let length = buf.len(); - - let hashing_span = tracing::debug_span!("Hashing request body"); - let hash_string = crate::sync::spawn_blocking("hash-buf", move || { - let guard = hashing_span.enter(); - let mut hasher = md5::Md5::new(); - for bytes in buf { - hasher.update(&bytes); - } - let hash = hasher.finalize(); - let hash_string = BASE64_STANDARD.encode(hash); - drop(guard); - hash_string - }) - .await - .map_err(ObjectError::from)?; - - action - .headers_mut() - .insert("content-type", "application/octet-stream"); - action.headers_mut().insert("content-md5", hash_string); - action - .headers_mut() - .insert("content-length", length.to_string()); - - Ok(self.build_request(action)) - } - - async fn send_complete_multipart_request<'a, I: Iterator>( - &'a self, - object_id: &'a Arc, - upload_id: &'a str, - etags: I, - ) -> Result { - let mut action = self.bucket.complete_multipart_upload( - Some(&self.credentials), - object_id.as_ref(), - upload_id, - etags, - ); - - action - .headers_mut() - .insert("content-type", "application/octet-stream"); - - let (req, action) = self.build_request_inner(action); - - let body: Vec = action.body().into(); - - req.header(CONTENT_LENGTH, body.len()) - .body(body) - .send() - .with_metrics(crate::init_metrics::OBJECT_STORAGE_COMPLETE_MULTIPART_REQUEST) - .await - } - - fn create_abort_multipart_request( - &self, - object_id: &Arc, - upload_id: &str, - ) -> RequestBuilder { - let action = self.bucket.abort_multipart_upload( - Some(&self.credentials), - object_id.as_ref(), - upload_id, - ); - - self.build_request(action) - } - - fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> RequestBuilder { - let (req, _) = self.build_request_inner(action); - req - } - - fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (RequestBuilder, A) { - let method = match A::METHOD { - rusty_s3::Method::Head => reqwest::Method::HEAD, - rusty_s3::Method::Get => reqwest::Method::GET, - rusty_s3::Method::Post => reqwest::Method::POST, - rusty_s3::Method::Put => reqwest::Method::PUT, - rusty_s3::Method::Delete => reqwest::Method::DELETE, + let builder = if let Some(token) = session_token { + builder.with_token(token) + } else { + builder }; - let url = action.sign(self.signature_expiration); + let s3_client = builder.build().map_err(ObjectError::BuildClient)?; - let req = self - .client - .request(method, url.as_str()) - .timeout(self.client_timeout); - - let req = action - .headers_mut() - .iter() - .fold(req, |req, (name, value)| req.header(name, value)); - - (req, action) - } - - fn get_object_request( - &self, - identifier: &Arc, - from_start: Option, - len: Option, - ) -> RequestBuilder { - let action = self - .bucket - .get_object(Some(&self.credentials), identifier.as_ref()); - - let req = self.build_request(action); - - let start = from_start.unwrap_or(0); - let end = len.map(|len| start + len - 1); - - req.header( - RANGE, - Range::Bytes(vec![if let Some(end) = end { - ByteRangeSpec::FromTo(start, end) - } else { - ByteRangeSpec::From(start) - }]) - .to_string(), - ) - } - - fn head_object_request(&self, identifier: &Arc) -> RequestBuilder { - let action = self - .bucket - .head_object(Some(&self.credentials), identifier.as_ref()); - - self.build_request(action) - } - - fn delete_object_request(&self, identifier: &Arc) -> RequestBuilder { - let action = self - .bucket - .delete_object(Some(&self.credentials), identifier.as_ref()); - - self.build_request(action) + Ok(ObjectStore { + s3_client: Arc::new(s3_client), + public_endpoint, + }) } fn next_file(&self, extension: Option<&str>) -> String { @@ -731,9 +355,6 @@ impl ObjectStore { impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ObjectStore") - .field("bucket", &self.bucket.name()) - .field("region", &self.bucket.region()) - .finish() + f.debug_struct("ObjectStore").finish() } }