From f9f4fc63d670f357c93f24147c2ee3e1278e2d97 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Wed, 27 Oct 2021 23:06:03 -0500 Subject: [PATCH] Optionally support s3-compatible storage (untested) --- Cargo.lock | 605 +++++++++++++++++- Cargo.toml | 8 +- pict-rs.toml | 65 ++ src/config.rs | 168 +++-- src/error.rs | 8 +- src/ffmpeg.rs | 8 +- src/file.rs | 4 +- src/magick.rs | 19 +- src/main.rs | 42 +- src/{migrate/mod.rs => migrate.rs} | 0 src/processor.rs | 2 +- src/serde_str.rs | 73 +++ src/store.rs | 2 + src/store/file_store.rs | 2 +- src/store/file_store/restructure.rs | 4 +- src/store/object_store.rs | 220 +++++++ src/store/object_store/object_id.rs | 26 + .../mod.rs => upload_manager.rs} | 67 +- src/upload_manager/session.rs | 10 +- 19 files changed, 1164 insertions(+), 169 deletions(-) create mode 100644 pict-rs.toml rename src/{migrate/mod.rs => migrate.rs} (100%) create mode 100644 src/serde_str.rs create mode 100644 src/store/object_store.rs create mode 100644 src/store/object_store/object_id.rs rename src/{upload_manager/mod.rs => upload_manager.rs} (93%) diff --git a/Cargo.lock b/Cargo.lock index f863927..3ac19b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,8 +45,8 @@ dependencies = [ "actix-service", "actix-tls", "actix-utils", - "ahash", - "base64", + "ahash 0.7.6", + "base64 0.13.0", "bitflags", "bytes", "bytestring", @@ -113,7 +113,7 @@ dependencies = [ "http", "log", "regex", - "serde", + "serde 1.0.130", ] [[package]] @@ -169,9 +169,9 @@ dependencies = [ "futures-core", "http", "log", - "tokio-rustls", + "tokio-rustls 0.23.0", "tokio-util", - "webpki-roots", + "webpki-roots 0.22.1", ] [[package]] @@ -199,7 +199,7 @@ dependencies = [ "actix-service", "actix-utils", "actix-web-codegen", - "ahash", + "ahash 0.7.6", "bytes", "cfg-if", "derive_more", @@ -215,12 +215,12 @@ dependencies = [ "paste", "pin-project", "regex", - "serde", + "serde 1.0.130", "serde_json", "serde_urlencoded", "smallvec", "socket2", - "time", + "time 0.3.4", "url", ] @@ -236,6 +236,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ahash" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e" + [[package]] name = "ahash" version = "0.7.6" @@ -280,6 +286,12 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-stream" version = "0.3.2" @@ -312,6 +324,23 @@ dependencies = [ "syn", ] +[[package]] +name = "attohttpc" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8bda305457262b339322106c776e3fd21df860018e566eb6a5b1aa4b6ae02d" +dependencies = [ + "http", + "log", + "rustls 0.18.1", + "serde 1.0.130", + "serde_json", + "url", + "webpki 0.21.4", + "webpki-roots 0.19.0", + "wildmatch", +] + [[package]] name = "atty" version = "0.2.14" @@ -339,7 +368,7 @@ dependencies = [ "actix-http", "actix-rt", "actix-service", - "base64", + "base64 0.13.0", "bytes", "cfg-if", "derive_more", @@ -350,12 +379,43 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand", - "rustls", - "serde", + "rustls 0.20.0", + "serde 1.0.130", "serde_json", "serde_urlencoded", ] +[[package]] +name = "aws-creds" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e1c8f64305d3f3096cb247983a3cae16f8c2960129699bcb70639e31180794" +dependencies = [ + "anyhow", + "attohttpc", + "dirs", + "rust-ini 0.17.0", + "serde 1.0.130", + "serde-xml-rs", + "serde_derive", + "url", +] + +[[package]] +name = "aws-region" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2884b8f2aaeb4a4bf80b219b4fe1d340139ca9331679c57e0fd4a24f571a78bd" +dependencies = [ + "anyhow", +] + +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + [[package]] name = "base64" version = "0.13.0" @@ -416,6 +476,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits 0.2.14", + "time 0.1.43", + "winapi", +] + [[package]] name = "clap" version = "2.33.3" @@ -431,6 +504,22 @@ dependencies = [ "vec_map", ] +[[package]] +name = "config" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1b9d958c2b1368a663f05538fc1b5975adce1e19f435acceae987aceeeb369" +dependencies = [ + "lazy_static", + "nom", + "rust-ini 0.13.0", + "serde 1.0.130", + "serde-hjson", + "serde_json", + "toml", + "yaml-rust", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -488,6 +577,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crypto-mac" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -520,6 +619,35 @@ dependencies = [ "generic-array", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d86534ed367a67548dc68113a0f5db55432fdfbb6e6f9d77704397d95d5780" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + +[[package]] +name = "dlv-list" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68df3f2b690c1b86e65ef7830956aededf3cb0a16f898f79b9a6f421a7b6211b" +dependencies = [ + "rand", +] + [[package]] name = "either" version = "1.6.1" @@ -716,6 +844,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +dependencies = [ + "ahash 0.4.7", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -740,6 +877,22 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hmac" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" +dependencies = [ + "crypto-mac", + "digest", +] + [[package]] name = "http" version = "0.2.5" @@ -798,6 +951,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" +dependencies = [ + "futures-util", + "hyper", + "log", + "rustls 0.19.1", + "tokio", + "tokio-rustls 0.22.0", + "webpki 0.21.4", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -828,7 +996,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -850,6 +1018,12 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" + [[package]] name = "itertools" version = "0.10.1" @@ -886,12 +1060,31 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" +dependencies = [ + "arrayvec", + "bitflags", + "cfg-if", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.105" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013" +[[package]] +name = "linked-hash-map" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" + [[package]] name = "local-channel" version = "0.1.2" @@ -943,6 +1136,23 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "maybe-async" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.4.1" @@ -964,6 +1174,15 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "minidom" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332592c2149fc7dd40a64fc9ef6f0d65607284b474cef9817d1fc8c7e7b3608e" +dependencies = [ + "quick-xml", +] + [[package]] name = "mio" version = "0.7.14" @@ -992,6 +1211,17 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nom" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +dependencies = [ + "lexical-core", + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1001,6 +1231,34 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits 0.2.14", +] + +[[package]] +name = "num-traits" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31" +dependencies = [ + "num-traits 0.2.14", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -1059,6 +1317,16 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "ordered-multimap" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c672c7ad9ec066e428c00eb917124a06f08db19e2584de982cc34b1f4c12485" +dependencies = [ + "dlv-list", + "hashbrown 0.9.1", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -1126,7 +1394,8 @@ dependencies = [ "anyhow", "async-trait", "awc", - "base64", + "base64 0.13.0", + "config", "dashmap", "futures-util", "mime", @@ -1135,14 +1404,16 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "pin-project-lite", - "serde", + "reqwest", + "rust-s3", + "serde 1.0.130", "serde_json", "sha2", "sled", "storage-path-generator", "structopt", "thiserror", - "time", + "time 0.3.4", "tokio", "tokio-uring", "tokio-util", @@ -1292,6 +1563,15 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26aab6b48e2590e4a64d1ed808749ba06257882b461d01ca71baeb747074a6dd" +dependencies = [ + "memchr", +] + [[package]] name = "quote" version = "1.0.10" @@ -1350,6 +1630,16 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" +dependencies = [ + "getrandom", + "redox_syscall", +] + [[package]] name = "regex" version = "1.5.4" @@ -1385,6 +1675,42 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d2927ca2f685faf0fc620ac4834690d29e7abb153add10f5812eef20b5e280" +dependencies = [ + "base64 0.13.0", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "percent-encoding", + "pin-project-lite", + "rustls 0.19.1", + "serde 1.0.130", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls 0.22.0", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.21.1", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -1409,6 +1735,54 @@ dependencies = [ "libc", ] +[[package]] +name = "rust-ini" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e52c148ef37f8c375d49d5a73aa70713125b7f19095948a923f80afdeb22ec2" + +[[package]] +name = "rust-ini" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63471c4aa97a1cf8332a5f97709a79a4234698de6a1f5087faf66f2dae810e22" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2f26775d15f43dc848ef0ec65f83de8775549e486c7a3a576652049a7122d32" +dependencies = [ + "anyhow", + "async-trait", + "aws-creds", + "aws-region", + "base64 0.13.0", + "cfg-if", + "chrono", + "futures", + "hex", + "hmac", + "http", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "reqwest", + "serde 1.0.130", + "serde-xml-rs", + "serde_derive", + "sha2", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rustc_version" version = "0.3.3" @@ -1418,6 +1792,32 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d1126dcf58e93cee7d098dbda643b5f92ed724f1f6a63007c1116eed6700c81" +dependencies = [ + "base64 0.12.3", + "log", + "ring", + "sct 0.6.1", + "webpki 0.21.4", +] + +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64 0.13.0", + "log", + "ring", + "sct 0.6.1", + "webpki 0.21.4", +] + [[package]] name = "rustls" version = "0.20.0" @@ -1426,8 +1826,8 @@ checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" dependencies = [ "log", "ring", - "sct", - "webpki", + "sct 0.7.0", + "webpki 0.22.0", ] [[package]] @@ -1448,6 +1848,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sct" version = "0.7.0" @@ -1476,6 +1886,12 @@ dependencies = [ "pest", ] +[[package]] +name = "serde" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dad3f759919b92c3068c696c15c3d17238234498bbdcc80f2c469606f948ac8" + [[package]] name = "serde" version = "1.0.130" @@ -1485,6 +1901,30 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-hjson" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a4e0ea8a88553209f6cc6cfe8724ecad22e1acf372793c27d995290fe74f8" +dependencies = [ + "lazy_static", + "num-traits 0.1.43", + "regex", + "serde 0.8.23", +] + +[[package]] +name = "serde-xml-rs" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa" +dependencies = [ + "log", + "serde 1.0.130", + "thiserror", + "xml-rs", +] + [[package]] name = "serde_derive" version = "1.0.130" @@ -1504,7 +1944,7 @@ checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "itoa", "ryu", - "serde", + "serde 1.0.130", ] [[package]] @@ -1516,7 +1956,7 @@ dependencies = [ "form_urlencoded", "itoa", "ryu", - "serde", + "serde 1.0.130", ] [[package]] @@ -1608,6 +2048,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "storage-path-generator" version = "0.1.0" @@ -1647,6 +2093,12 @@ dependencies = [ "syn", ] +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.81" @@ -1710,6 +2162,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "time" version = "0.3.4" @@ -1718,7 +2180,7 @@ checksum = "99beeb0daeac2bd1e86ac2c21caddecb244b39a093594da1a661ec2060c7aedd" dependencies = [ "itoa", "libc", - "serde", + "serde 1.0.130", ] [[package]] @@ -1777,15 +2239,26 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls 0.19.1", + "tokio", + "webpki 0.21.4", +] + [[package]] name = "tokio-rustls" version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d49194a46b06a69f2498a34a595ab4a9c1babd2642ffa3dbccf6c6778d1426f2" dependencies = [ - "rustls", + "rustls 0.20.0", "tokio", - "webpki", + "webpki 0.22.0", ] [[package]] @@ -1826,6 +2299,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde 1.0.130", +] + [[package]] name = "tonic" version = "0.5.2" @@ -1834,7 +2316,7 @@ checksum = "796c5e1cd49905e65dd8e700d4cb1dffcbfdb4fc9d017de08c1a537afd83627c" dependencies = [ "async-stream", "async-trait", - "base64", + "base64 0.13.0", "bytes", "futures-core", "futures-util", @@ -1954,7 +2436,7 @@ dependencies = [ "futures-core", "mime", "opentelemetry", - "serde", + "serde 1.0.130", "tracing", "tracing-futures", "tracing-opentelemetry", @@ -2114,6 +2596,7 @@ dependencies = [ "idna", "matches", "percent-encoding", + "serde 1.0.130", ] [[package]] @@ -2123,7 +2606,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ "getrandom", - "serde", + "serde 1.0.130", ] [[package]] @@ -2179,6 +2662,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.78" @@ -2218,6 +2713,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki" version = "0.22.0" @@ -2228,13 +2733,31 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-roots" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8eff4b7516a57307f9349c64bf34caa34b940b66fed4b2fb3136cb7386e5739" +dependencies = [ + "webpki 0.21.4", +] + +[[package]] +name = "webpki-roots" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +dependencies = [ + "webpki 0.21.4", +] + [[package]] name = "webpki-roots" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c475786c6f47219345717a043a37ec04cb4bc185e28853adcc4fa0a947eba630" dependencies = [ - "webpki", + "webpki 0.22.0", ] [[package]] @@ -2248,6 +2771,12 @@ dependencies = [ "libc", ] +[[package]] +name = "wildmatch" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f44b95f62d34113cf558c93511ac93027e03e9c29a60dd0fd70e6e025c7270a" + [[package]] name = "winapi" version = "0.3.9" @@ -2269,3 +2798,27 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi", +] + +[[package]] +name = "xml-rs" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" + +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] diff --git a/Cargo.toml b/Cargo.toml index ed2e309..a1c8384 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = [] +object-storage = ["reqwest", "rust-s3"] io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring", "sled/io_uring"] [dependencies] @@ -22,6 +23,7 @@ anyhow = "1.0" async-trait = "0.1.51" awc = { version = "3.0.0-beta.9", default-features = false, features = ["rustls"] } base64 = "0.13.0" +config = "0.11.0" dashmap = "4.0.2" futures-util = "0.3.17" mime = "0.3.1" @@ -30,6 +32,8 @@ once_cell = "1.4.0" opentelemetry = { version = "0.16", features = ["rt-tokio"] } opentelemetry-otlp = "0.9" pin-project-lite = "0.2.7" +reqwest = { version = "0.11.5", default-features = false, features = ["stream"], optional = true} +rust-s3 = { version = "0.27.0", default-features = false, features = ["tokio-rustls-tls"], optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.9.0" @@ -47,8 +51,8 @@ tracing-futures = "0.2.4" tracing-log = "0.1.2" tracing-opentelemetry = "0.16" tracing-subscriber = { version = "0.3.0", features = ["env-filter", "fmt", "tracing-log"] } -url = "2.2" -uuid = { version = "0.8.2", features = ["v4", "serde"]} +url = { version = "2.2", features = ["serde"] } +uuid = { version = "0.8.2", features = ["v4", "serde"] } [dependencies.tracing-actix-web] version = "0.5.0-beta.1" diff --git a/pict-rs.toml b/pict-rs.toml new file mode 100644 index 0000000..cf129b2 --- /dev/null +++ b/pict-rs.toml @@ -0,0 +1,65 @@ +## Required: path to store pict-rs database +path = './data' + +## Optional: pict-rs binding address +# default: 0.0.0.0:8080 +addr = '0.0.0.0:8080' + +## Optional: format to transcode all uploaded images +# valid options: 'jpeg', 'png', 'webp' +# default: empty +# +# Not specifying image-format means images will be stored in their original format +# This does not affect gif or mp4 uploads +image-format = 'jpeg' + +## Optional: permitted image processing filters +# valid options: 'identity', 'thumbnail', 'resize', 'crop', 'blur' +# default: empty +# +# Not specifying filters implies all filters are permitted +filters = [ + 'identity', + 'thumbnail', + 'resize', + 'crop', + 'blur', +] + +## Optional: image bounds +# default: 40 +max-file-size = 40 # in Megabytes +# default: 10,000 +max-image-width = 10000 # in Pixels +# default: 10,000 +max-image-height = 10000 # in Pixels +# default: 40,000 +max-image-area = 40000 # in Pixels + +## Optional: +# default: false +skip-validate-imports = false + +## Optional: url for exporting otlp traces +# default: empty +# +# Not specifying opentelemetry-url means no traces will be exported +opentelemetry-url = 'http://localhost:4317/' + +## Optional: store definition +# default: empty +# +# Not specifying a store will default to using a file-store in the same directory provided by the `path` field defined above +[store] +type = 'file-store' +path = './data' + +## Example s3 store +# [store] +# type = 's3-store' +# bucket-name = 'rust-s3' +# region = 'eu-central-1' # can also be endpoint of local s3 store +# access-key = 'ACCESS_KEY' +# secret-key = 'SECRET_KEY' +# security-token = 'SECURITY_TOKEN' +# session-token = 'SESSION-TOKEN' diff --git a/src/config.rs b/src/config.rs index f3b07e1..040155b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,21 @@ use std::{collections::HashSet, net::SocketAddr, path::PathBuf}; +use structopt::StructOpt; use url::Url; use crate::magick::ValidInputType; -#[derive(Clone, Debug, structopt::StructOpt)] -pub(crate) struct Config { +#[derive(Clone, Debug, StructOpt)] +pub(crate) struct Args { + #[structopt(short, long, help = "Path to the pict-rs configuration file")] + config_file: Option, + + #[structopt(flatten)] + overrides: Overrides, +} + +#[derive(Clone, Debug, serde::Serialize, structopt::StructOpt)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct Overrides { #[structopt( short, long, @@ -12,27 +23,15 @@ pub(crate) struct Config { )] skip_validate_imports: bool, - #[structopt( - short, - long, - env = "PICTRS_ADDR", - default_value = "0.0.0.0:8080", - help = "The address and port the server binds to." - )] - addr: SocketAddr, + #[structopt(short, long, help = "The address and port the server binds to.")] + addr: Option, + + #[structopt(short, long, help = "The path to the data directory, e.g. data/")] + path: Option, #[structopt( short, long, - env = "PICTRS_PATH", - help = "The path to the data directory, e.g. data/" - )] - path: PathBuf, - - #[structopt( - short, - long, - env = "PICTRS_FORMAT", help = "An optional image format to convert all uploaded files into, supports 'jpg', 'png', and 'webp'" )] image_format: Option, @@ -40,7 +39,6 @@ pub(crate) struct Config { #[structopt( short, long, - env = "PICTRS_ALLOWED_FILTERS", help = "An optional list of filters to permit, supports 'identity', 'thumbnail', 'resize', 'crop', and 'blur'" )] filters: Option>, @@ -48,31 +46,21 @@ pub(crate) struct Config { #[structopt( short, long, - env = "PICTRS_MAX_FILE_SIZE", - help = "Specify the maximum allowed uploaded file size (in Megabytes)", - default_value = "40" + help = "Specify the maximum allowed uploaded file size (in Megabytes)" )] - max_file_size: usize, + max_file_size: Option, + + #[structopt(long, help = "Specify the maximum width in pixels allowed on an image")] + max_image_width: Option, + + #[structopt(long, help = "Specify the maximum width in pixels allowed on an image")] + max_image_height: Option, + + #[structopt(long, help = "Specify the maximum area in pixels allowed in an image")] + max_image_area: Option, #[structopt( long, - env = "PICTRS_MAX_IMAGE_WIDTH", - help = "Specify the maximum width in pixels allowed on an image", - default_value = "10000" - )] - max_image_width: usize, - - #[structopt( - long, - env = "PICTRS_MAX_IMAGE_HEIGHT", - help = "Specify the maximum width in pixels allowed on an image", - default_value = "10000" - )] - max_image_height: usize, - - #[structopt( - long, - env = "PICTRS_API_KEY", help = "An optional string to be checked on requests to privileged endpoints" )] api_key: Option, @@ -80,13 +68,96 @@ pub(crate) struct Config { #[structopt( short, long, - env = "PICTRS_OPENTELEMETRY_URL", help = "Enable OpenTelemetry Tracing exports to the given OpenTelemetry collector" )] opentelemetry_url: Option, + + #[structopt(subcommand)] + store: Option, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, structopt::StructOpt)] +#[serde(rename_all = "kebab-case")] +#[serde(tag = "type")] +pub(crate) enum Store { + FileStore { + // defaults to {config.path} + path: Option, + }, + #[cfg(feature = "object-storage")] + S3Store { + bucket_name: String, + region: crate::serde_str::Serde, + access_key: Option, + secret_key: Option, + security_token: Option, + session_token: Option, + }, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct Config { + skip_validate_imports: bool, + addr: SocketAddr, + path: PathBuf, + image_format: Option, + filters: Option>, + max_file_size: usize, + max_image_width: usize, + max_image_height: usize, + max_image_area: usize, + api_key: Option, + opentelemetry_url: Option, + store: Store, +} + +#[derive(serde::Serialize)] +#[serde(rename_all = "kebab-case")] +pub(crate) struct Defaults { + skip_validate_imports: bool, + addr: SocketAddr, + max_file_size: usize, + max_image_width: usize, + max_image_height: usize, + max_image_area: usize, + store: Store, +} + +impl Defaults { + fn new() -> Self { + Defaults { + skip_validate_imports: false, + addr: ([0, 0, 0, 0], 8080).into(), + max_file_size: 40, + max_image_width: 10_000, + max_image_height: 10_000, + max_image_area: 40_000, + store: Store::FileStore { path: None }, + } + } } impl Config { + pub(crate) fn build() -> anyhow::Result { + let args = Args::from_args(); + let mut base_config = config::Config::try_from(&Defaults::new())?; + + if let Some(path) = args.config_file { + base_config.merge(config::File::from(path))?; + }; + + base_config.merge(config::Config::try_from(&args.overrides)?)?; + + base_config.merge(config::Environment::with_prefix("PICTRS"))?; + + Ok(base_config.try_into()?) + } + + pub(crate) fn store(&self) -> &Store { + &self.store + } + pub(crate) fn bind_address(&self) -> SocketAddr { self.addr } @@ -96,7 +167,7 @@ impl Config { } pub(crate) fn format(&self) -> Option { - self.image_format.clone() + self.image_format } pub(crate) fn allowed_filters(&self) -> Option> { @@ -119,6 +190,10 @@ impl Config { self.max_image_height } + pub(crate) fn max_area(&self) -> usize { + self.max_image_area + } + pub(crate) fn api_key(&self) -> Option<&str> { self.api_key.as_deref() } @@ -132,7 +207,8 @@ impl Config { #[error("Invalid format supplied, {0}")] pub(crate) struct FormatError(String); -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "kebab-case")] pub(crate) enum Format { Jpeg, Png, @@ -140,7 +216,7 @@ pub(crate) enum Format { } impl Format { - pub(crate) fn to_magick_format(&self) -> &'static str { + pub(crate) fn as_magick_format(&self) -> &'static str { match self { Format::Jpeg => "JPEG", Format::Png => "PNG", @@ -148,7 +224,7 @@ impl Format { } } - pub(crate) fn to_hint(&self) -> Option { + pub(crate) fn as_hint(&self) -> Option { match self { Format::Jpeg => Some(ValidInputType::Jpeg), Format::Png => Some(ValidInputType::Png), diff --git a/src/error.rs b/src/error.rs index 279b87d..69abbe7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,13 +8,13 @@ pub(crate) struct Error { impl std::fmt::Debug for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}\n", self.kind) + writeln!(f, "{}", self.kind) } } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}\n", self.kind)?; + writeln!(f, "{}", self.kind)?; std::fmt::Display::fmt(&self.context, f) } } @@ -72,6 +72,10 @@ pub(crate) enum UploadError { #[error(transparent)] FileStore(#[from] crate::store::file_store::FileError), + #[cfg(feature = "object-storage")] + #[error(transparent)] + ObjectStore(#[from] crate::store::object_store::ObjectError), + #[error("Provided process path is invalid")] ParsePath, diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index ec226ac..1ec9c6a 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -70,7 +70,7 @@ pub(crate) async fn to_mp4_bytes( "ffmpeg", &[ "-i", - &input_file_str, + input_file_str, "-pix_fmt", "yuv420p", "-vf", @@ -80,7 +80,7 @@ pub(crate) async fn to_mp4_bytes( "h264", "-f", "mp4", - &output_file_str, + output_file_str, ], )?; @@ -123,14 +123,14 @@ where "ffmpeg", &[ "-i", - &input_file_str, + input_file_str, "-vframes", "1", "-codec", format.as_codec(), "-f", format.as_format(), - &output_file_str, + output_file_str, ], )?; diff --git a/src/file.rs b/src/file.rs index a51344b..fb997b8 100644 --- a/src/file.rs +++ b/src/file.rs @@ -317,14 +317,14 @@ mod io_uring { let max_size = (size - cursor).min(65_536); let buf = Vec::with_capacity(max_size.try_into().unwrap()); - let (res, mut buf): (_, Vec) = self.read_at(buf, cursor).await; + let (res, buf): (_, Vec) = self.read_at(buf, cursor).await; let n: usize = res?; if n == 0 { return Err(std::io::ErrorKind::UnexpectedEof.into()); } - writer.write_all(&mut buf[0..n]).await?; + writer.write_all(&buf[0..n]).await?; let n: u64 = n.try_into().unwrap(); cursor += n; diff --git a/src/magick.rs b/src/magick.rs index e015f3f..327052e 100644 --- a/src/magick.rs +++ b/src/magick.rs @@ -37,7 +37,7 @@ pub(crate) enum ValidInputType { } impl ValidInputType { - fn to_str(&self) -> &'static str { + fn as_str(&self) -> &'static str { match self { Self::Mp4 => "MP4", Self::Gif => "GIF", @@ -47,7 +47,7 @@ impl ValidInputType { } } - pub(crate) fn to_ext(&self) -> &'static str { + pub(crate) fn as_ext(&self) -> &'static str { match self { Self::Mp4 => ".mp4", Self::Gif => ".gif", @@ -93,7 +93,7 @@ pub(crate) fn convert_bytes_read( "convert", "-", "-strip", - format!("{}:-", format.to_magick_format()).as_str(), + format!("{}:-", format.as_magick_format()).as_str(), ], )?; @@ -118,7 +118,7 @@ pub(crate) async fn details_bytes( } let last_arg = if let Some(expected_format) = hint { - format!("{}:-", expected_format.to_str()) + format!("{}:-", expected_format.as_str()) } else { "-".to_owned() }; @@ -160,7 +160,7 @@ where } let last_arg = if let Some(expected_format) = hint { - format!("{}:-", expected_format.to_str()) + format!("{}:-", expected_format.as_str()) } else { "-".to_owned() }; @@ -183,7 +183,7 @@ where pub(crate) async fn details_file(path_str: &str) -> Result { let process = Process::run( "magick", - &["identify", "-ping", "-format", "%w %h | %m\n", &path_str], + &["identify", "-ping", "-format", "%w %h | %m\n", path_str], )?; let mut reader = process.read().unwrap(); @@ -262,7 +262,7 @@ pub(crate) fn process_image_store_read( ) -> std::io::Result { let command = "magick"; let convert_args = ["convert", "-"]; - let last_arg = format!("{}:-", format.to_magick_format()); + let last_arg = format!("{}:-", format.as_magick_format()); let process = Process::spawn( Command::new(command) @@ -277,7 +277,10 @@ pub(crate) fn process_image_store_read( impl Details { #[instrument(name = "Validating input type")] fn validate_input(&self) -> Result { - if self.width > crate::CONFIG.max_width() || self.height > crate::CONFIG.max_height() { + if self.width > crate::CONFIG.max_width() + || self.height > crate::CONFIG.max_height() + || self.width * self.height > crate::CONFIG.max_area() + { return Err(UploadError::Dimensions.into()); } diff --git a/src/main.rs b/src/main.rs index 71e7ebf..e0d516c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,6 @@ use std::{ task::{Context, Poll}, time::SystemTime, }; -use structopt::StructOpt; use tokio::{io::AsyncReadExt, sync::Semaphore}; use tracing::{debug, error, info, instrument, Span}; use tracing_actix_web::TracingLogger; @@ -37,6 +36,7 @@ mod migrate; mod process; mod processor; mod range; +mod serde_str; mod store; mod tmp_file; mod upload_manager; @@ -61,7 +61,7 @@ const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; const DAYS: u32 = 24 * HOURS; -static CONFIG: Lazy = Lazy::new(Config::from_args); +static CONFIG: Lazy = Lazy::new(|| Config::build().unwrap()); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); @@ -397,7 +397,7 @@ where drop(permit); - let details = Details::from_bytes(bytes.clone(), format.to_hint()).await?; + let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?; let save_span = tracing::info_span!( parent: None, @@ -835,10 +835,38 @@ async fn main() -> anyhow::Result<()> { let root_dir = CONFIG.data_dir(); let db = LatestDb::exists(root_dir.clone()).migrate()?; - let store = FileStore::build(root_dir, &db)?; + match CONFIG.store() { + config::Store::FileStore { path } => { + let path = path.to_owned().unwrap_or_else(|| root_dir.clone()); - let manager = UploadManager::new(store, db, CONFIG.format()).await?; + let store = FileStore::build(path, &db)?; - manager.restructure().await?; - launch(manager).await + let manager = UploadManager::new(store, db, CONFIG.format()).await?; + + manager.restructure().await?; + launch(manager).await + } + #[cfg(feature = "object-storage")] + config::Store::S3Store { + bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, + } => { + let store = crate::store::object_store::ObjectStore::build( + bucket_name, + (**region).clone(), + access_key.clone(), + secret_key.clone(), + security_token.clone(), + session_token.clone(), + &db, + )?; + + let manager = UploadManager::new(store, db, CONFIG.format()).await?; + launch(manager).await + } + } } diff --git a/src/migrate/mod.rs b/src/migrate.rs similarity index 100% rename from src/migrate/mod.rs rename to src/migrate.rs diff --git a/src/processor.rs b/src/processor.rs index 6a67aa1..feb3c09 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -41,7 +41,7 @@ pub(crate) fn build_chain( } let (path, args) = - args.into_iter() + args.iter() .fold(Ok((PathBuf::default(), vec![])), |inner, (name, value)| { if let Ok(inner) = inner { parse!(inner, Identity, name, value); diff --git a/src/serde_str.rs b/src/serde_str.rs new file mode 100644 index 0000000..53c6d1b --- /dev/null +++ b/src/serde_str.rs @@ -0,0 +1,73 @@ +use std::{ + ops::{Deref, DerefMut}, + str::FromStr, +}; + +#[derive(Clone, Debug)] +pub(crate) struct Serde { + inner: T, +} + +impl Serde { + pub(crate) fn new(inner: T) -> Self { + Serde { inner } + } +} + +impl Deref for Serde { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Serde { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl FromStr for Serde +where + T: FromStr, +{ + type Err = T::Err; + + fn from_str(s: &str) -> Result { + Ok(Serde { + inner: T::from_str(s)?, + }) + } +} + +impl serde::Serialize for Serde +where + T: std::fmt::Display, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let s = self.inner.to_string(); + serde::Serialize::serialize(s.as_str(), serializer) + } +} + +impl<'de, T> serde::Deserialize<'de> for Serde +where + T: std::str::FromStr, + ::Err: std::fmt::Display, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s: String = serde::Deserialize::deserialize(deserializer)?; + let inner = s + .parse::() + .map_err(|e| serde::de::Error::custom(e.to_string()))?; + + Ok(Serde { inner }) + } +} diff --git a/src/store.rs b/src/store.rs index 2edb581..8b20a0f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -5,6 +5,8 @@ use futures_util::stream::Stream; use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod file_store; +#[cfg(feature = "object-storage")] +pub(crate) mod object_store; pub(crate) trait Identifier: Send + Sync + Clone + Debug { type Error: std::error::Error; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 7f5ba4b..6766244 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -18,7 +18,7 @@ pub(crate) use file_id::FileId; // - last-path -> last generated path // - fs-restructure-01-complete -> bool -const GENERATOR_KEY: &'static [u8] = b"last-path"; +const GENERATOR_KEY: &[u8] = b"last-path"; #[derive(Debug, thiserror::Error)] pub(crate) enum FileError { diff --git a/src/store/file_store/restructure.rs b/src/store/file_store/restructure.rs index 1ff70f9..c7dcf35 100644 --- a/src/store/file_store/restructure.rs +++ b/src/store/file_store/restructure.rs @@ -5,8 +5,8 @@ use crate::{ }; use std::path::{Path, PathBuf}; -const RESTRUCTURE_COMPLETE: &'static [u8] = b"fs-restructure-01-complete"; -const DETAILS: &'static [u8] = b"details"; +const RESTRUCTURE_COMPLETE: &[u8] = b"fs-restructure-01-complete"; +const DETAILS: &[u8] = b"details"; impl UploadManager { #[tracing::instrument(skip(self))] diff --git a/src/store/object_store.rs b/src/store/object_store.rs new file mode 100644 index 0000000..067ea5a --- /dev/null +++ b/src/store/object_store.rs @@ -0,0 +1,220 @@ +use crate::store::Store; +use actix_web::web::Bytes; +use futures_util::stream::{Stream, StreamExt}; +use s3::{ + command::Command, creds::Credentials, request::Reqwest, request_trait::Request, Bucket, Region, +}; +use std::{ + pin::Pin, + string::FromUtf8Error, + task::{Context, Poll}, +}; +use storage_path_generator::{Generator, Path}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use uuid::Uuid; + +mod object_id; +pub(crate) use object_id::ObjectId; + +// - Settings Tree +// - last-path -> last generated path + +const GENERATOR_KEY: &[u8] = b"last-path"; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum ObjectError { + #[error(transparent)] + PathGenerator(#[from] storage_path_generator::PathError), + + #[error(transparent)] + Sled(#[from] sled::Error), + + #[error(transparent)] + Utf8(#[from] FromUtf8Error), + + #[error("Invalid length")] + Length, + + #[error("Storage error: {0}")] + Anyhow(#[from] anyhow::Error), +} + +#[derive(Debug, Clone)] +pub(crate) struct ObjectStore { + path_gen: Generator, + settings_tree: sled::Tree, + bucket: Bucket, +} + +pin_project_lite::pin_project! { + struct IoError { + #[pin] + inner: S, + } +} + +#[async_trait::async_trait(?Send)] +impl Store for ObjectStore { + type Error = ObjectError; + type Identifier = ObjectId; + type Stream = Pin>>>; + + async fn save_async_read( + &self, + reader: &mut Reader, + ) -> Result + where + Reader: AsyncRead + Unpin, + { + let path = self.next_file()?; + + self.bucket.put_object_stream(reader, &path).await?; + + Ok(ObjectId::from_string(path)) + } + + async fn save_bytes(&self, bytes: Bytes) -> Result { + let path = self.next_file()?; + + self.bucket.put_object(&path, &bytes).await?; + + Ok(ObjectId::from_string(path)) + } + + async fn to_stream( + &self, + identifier: &Self::Identifier, + from_start: Option, + len: Option, + ) -> Result { + let path = identifier.as_str(); + + let start = from_start.unwrap_or(0); + let end = len.map(|len| start + len); + + let request = Reqwest::new(&self.bucket, path, Command::GetObjectRange { start, end }); + + let response = request.response().await?; + + Ok(Box::pin(io_error(response.bytes_stream()))) + } + + async fn read_into( + &self, + identifier: &Self::Identifier, + writer: &mut Writer, + ) -> Result<(), std::io::Error> + where + Writer: AsyncWrite + Unpin, + { + let mut stream = self + .to_stream(identifier, None, None) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + while let Some(res) = stream.next().await { + let mut bytes = res?; + + writer.write_all_buf(&mut bytes).await?; + } + + Ok(()) + } + + async fn len(&self, identifier: &Self::Identifier) -> Result { + let path = identifier.as_str(); + + let (head, _) = self.bucket.head_object(path).await?; + let length = head.content_length.ok_or(ObjectError::Length)?; + + Ok(length as u64) + } + + async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> { + let path = identifier.as_str(); + + self.bucket.delete_object(path).await?; + Ok(()) + } +} + +impl ObjectStore { + pub(crate) fn build( + bucket_name: &str, + region: Region, + access_key: Option, + secret_key: Option, + security_token: Option, + session_token: Option, + db: &sled::Db, + ) -> Result { + let settings_tree = db.open_tree("settings")?; + + let path_gen = init_generator(&settings_tree)?; + + Ok(ObjectStore { + path_gen, + settings_tree, + bucket: Bucket::new( + bucket_name, + region, + Credentials { + access_key, + secret_key, + security_token, + session_token, + }, + )?, + }) + } + + fn next_directory(&self) -> Result { + let path = self.path_gen.next(); + + self.settings_tree + .insert(GENERATOR_KEY, path.to_be_bytes())?; + + Ok(path) + } + + fn next_file(&self) -> Result { + let filename = Uuid::new_v4().to_string(); + let path = self.next_directory()?.to_strings().join("/"); + + Ok(format!("/{}/{}", path, filename)) + } +} + +fn init_generator(settings: &sled::Tree) -> Result { + if let Some(ivec) = settings.get(GENERATOR_KEY)? { + Ok(Generator::from_existing( + storage_path_generator::Path::from_be_bytes(ivec.to_vec())?, + )) + } else { + Ok(Generator::new()) + } +} + +fn io_error(stream: S) -> impl Stream> +where + S: Stream>, + E: Into>, +{ + IoError { inner: stream } +} + +impl Stream for IoError +where + S: Stream>, + E: Into>, +{ + type Item = std::io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.as_mut().project(); + + this.inner.poll_next(cx).map(|opt| { + opt.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))) + }) + } +} diff --git a/src/store/object_store/object_id.rs b/src/store/object_store/object_id.rs new file mode 100644 index 0000000..6b3bb32 --- /dev/null +++ b/src/store/object_store/object_id.rs @@ -0,0 +1,26 @@ +use crate::store::{object_store::ObjectError, Identifier}; + +#[derive(Debug, Clone)] +pub(crate) struct ObjectId(String); + +impl Identifier for ObjectId { + type Error = ObjectError; + + fn to_bytes(&self) -> Result, Self::Error> { + Ok(self.0.as_bytes().to_vec()) + } + + fn from_bytes(bytes: Vec) -> Result { + Ok(ObjectId(String::from_utf8(bytes)?)) + } +} + +impl ObjectId { + pub(super) fn from_string(string: String) -> Self { + ObjectId(string) + } + + pub(super) fn as_str(&self) -> &str { + &self.0 + } +} diff --git a/src/upload_manager/mod.rs b/src/upload_manager.rs similarity index 93% rename from src/upload_manager/mod.rs rename to src/upload_manager.rs index 24757bc..8f74863 100644 --- a/src/upload_manager/mod.rs +++ b/src/upload_manager.rs @@ -4,15 +4,12 @@ use crate::{ ffmpeg::{InputFormat, ThumbnailFormat}, magick::{details_hint, ValidInputType}, migrate::{alias_id_key, alias_key, alias_key_bounds}, + serde_str::Serde, store::{Identifier, Store}, }; use actix_web::web; use sha2::Digest; -use std::{ - ops::{Deref, DerefMut}, - string::FromUtf8Error, - sync::Arc, -}; +use std::{string::FromUtf8Error, sync::Arc}; use tracing::{debug, error, info, instrument, warn, Span}; use tracing_futures::Instrument; @@ -58,11 +55,6 @@ pub(crate) struct UploadManagerInner { db: sled::Db, } -#[derive(Clone, Debug)] -pub(crate) struct Serde { - inner: T, -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Details { width: usize, @@ -482,7 +474,7 @@ where let identifier = S::Identifier::from_bytes(identifier.to_vec())?; debug!("Deleting {:?}", identifier); if let Err(e) = self.store.remove(&identifier).await { - errors.push(e.into()); + errors.push(e); } } @@ -554,12 +546,6 @@ where } } -impl Serde { - pub(crate) fn new(inner: T) -> Self { - Serde { inner } - } -} - impl Details { fn is_motion(&self) -> bool { self.content_type.type_() == "video" @@ -608,7 +594,7 @@ impl Details { } pub(crate) fn content_type(&self) -> mime::Mime { - self.content_type.inner.clone() + (*self.content_type).clone() } pub(crate) fn system_time(&self) -> std::time::SystemTime { @@ -643,57 +629,12 @@ fn delete_key(alias: &str) -> String { format!("{}/delete", alias) } -impl Deref for Serde { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Serde { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - impl std::fmt::Debug for UploadManager { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("UploadManager").finish() } } -impl serde::Serialize for Serde -where - T: std::fmt::Display, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let s = self.inner.to_string(); - serde::Serialize::serialize(s.as_str(), serializer) - } -} - -impl<'de, T> serde::Deserialize<'de> for Serde -where - T: std::str::FromStr, - ::Err: std::fmt::Display, -{ - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let s: String = serde::Deserialize::deserialize(deserializer)?; - let inner = s - .parse::() - .map_err(|e| serde::de::Error::custom(e.to_string()))?; - - Ok(Serde { inner }) - } -} - impl std::fmt::Debug for FilenameIVec { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{:?}", String::from_utf8(self.inner.to_vec())) diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 1f368a5..6ea4a8e 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -157,7 +157,7 @@ where debug!("Validating bytes"); let (content_type, validated_reader) = crate::validate::validate_image_bytes( bytes_mut.freeze(), - self.manager.inner.format.clone(), + self.manager.inner.format, validate, ) .await?; @@ -199,7 +199,7 @@ where debug!("Validating bytes"); let (input_type, validated_reader) = crate::validate::validate_image_bytes( bytes_mut.freeze(), - self.manager.inner.format.clone(), + self.manager.inner.format, true, ) .await?; @@ -236,11 +236,11 @@ where if dup.exists() { debug!("Duplicate exists, removing file"); - self.manager.store.remove(&identifier).await?; + self.manager.store.remove(identifier).await?; return Ok(()); } - self.manager.store_identifier(name, &identifier).await?; + self.manager.store_identifier(name, identifier).await?; Ok(()) } @@ -412,5 +412,5 @@ where } fn file_name(name: Uuid, input_type: ValidInputType) -> String { - format!("{}{}", name, input_type.to_ext()) + format!("{}{}", name, input_type.as_ext()) }