From 323016f994bc7bde857898f09cff011db40f1cb1 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 25 Mar 2022 18:47:50 -0500 Subject: [PATCH] Build out migration path --- Cargo.lock | 20 +-- Cargo.toml | 7 +- src/config.rs | 222 +++++++++++++++++++++---------- src/error.rs | 1 - src/main.rs | 135 +++++++++---------- src/migrate.rs | 27 ++-- src/migrate/repo.rs | 0 src/migrate/s034.rs | 8 +- src/repo.rs | 285 ++++++++++++++++++++++++++++++++-------- src/repo/old.rs | 171 ++++++++++++++++++++++++ src/repo/sled.rs | 134 +++++++++++++++---- src/store.rs | 1 - src/store/file_store.rs | 2 +- 13 files changed, 755 insertions(+), 258 deletions(-) delete mode 100644 src/migrate/repo.rs create mode 100644 src/repo/old.rs diff --git a/Cargo.lock b/Cargo.lock index a7a97e8..76fc925 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1064,9 +1064,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.17" +version = "0.14.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "043f0e083e9901b6cc658a77d1eb86f4fc650bbb977a4337dd63192826aa85dd" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" dependencies = [ "bytes", "futures-channel", @@ -1245,9 +1245,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if", ] @@ -1820,9 +1820,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" +checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" dependencies = [ "bitflags", ] @@ -2317,9 +2317,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.7" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" +checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" dependencies = [ "itoa", "libc", @@ -2330,9 +2330,9 @@ dependencies = [ [[package]] name = "time-macros" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" [[package]] name = "tinyvec" diff --git a/Cargo.toml b/Cargo.toml index c70255e..478f4d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["object-storage"] -object-storage = ["reqwest", "rust-s3"] +default = [] io-uring = [ "actix-rt/io-uring", "actix-server/io-uring", @@ -42,11 +41,11 @@ pin-project-lite = "0.2.7" reqwest = { version = "0.11.5", default-features = false, features = [ "rustls-tls", "stream", -], optional = true } +] } rust-s3 = { version = "0.29.0", default-features = false, features = [ "fail-on-err", "with-reqwest", -], optional = true, git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } +], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10.0" diff --git a/src/config.rs b/src/config.rs index 8ff247f..1c68b96 100644 --- a/src/config.rs +++ b/src/config.rs @@ -76,13 +76,6 @@ pub(crate) struct Overrides { #[serde(skip_serializing_if = "Option::is_none")] max_image_area: Option, - #[clap( - long, - help = "Specify the number of bytes sled is allowed to use for it's cache" - )] - #[serde(skip_serializing_if = "Option::is_none")] - sled_cache_capacity: Option, - #[clap( long, help = "Specify the number of events the console subscriber is allowed to buffer" @@ -106,12 +99,22 @@ pub(crate) struct Overrides { opentelemetry_url: Option, #[serde(skip_serializing_if = "Option::is_none")] + #[clap( + short = 'R', + long, + help = "Set the database implementation. Available options are 'sled'. Default is 'sled'" + )] repo: Option, #[clap(flatten)] - sled_repo: SledRepo, + sled: Sled, #[serde(skip_serializing_if = "Option::is_none")] + #[clap( + short = 'S', + long, + help = "Set the image store. Available options are 'object-storage' or 'filesystem'. Default is 'filesystem'" + )] store: Option, #[clap(flatten)] @@ -125,19 +128,20 @@ impl ObjectStorage { pub(crate) fn required(&self) -> Result { Ok(RequiredObjectStorage { bucket_name: self - .s3_store_bucket_name + .object_store_bucket_name .as_ref() .cloned() - .ok_or(RequiredError)?, + .ok_or(RequiredError("object-store-bucket-name"))?, region: self - .s3_store_region + .object_store_region .as_ref() .cloned() .map(Serde::into_inner) - .ok_or(RequiredError)?, - access_key: self.s3_store_access_key.as_ref().cloned(), - security_token: self.s3_store_security_token.as_ref().cloned(), - session_token: self.s3_store_session_token.as_ref().cloned(), + .ok_or(RequiredError("object-store-region"))?, + access_key: self.object_store_access_key.as_ref().cloned(), + secret_key: self.object_store_secret_key.as_ref().cloned(), + security_token: self.object_store_security_token.as_ref().cloned(), + session_token: self.object_store_session_token.as_ref().cloned(), }) } } @@ -153,7 +157,6 @@ impl Overrides { && self.max_image_width.is_none() && self.max_image_height.is_none() && self.max_image_area.is_none() - && self.sled_cache_capacity.is_none() && self.console_buffer_capacity.is_none() && self.api_key.is_none() && self.opentelemetry_url.is_none() @@ -171,37 +174,47 @@ pub(crate) enum Command { MigrateRepo { to: Repo }, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] +pub(crate) enum CommandConfig { + Run, + MigrateStore { + to: Storage, + }, + MigrateRepo { + #[allow(dead_code)] + to: Repository, + }, +} + +#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] #[serde(rename_all = "snake_case")] pub(crate) enum Repo { Sled, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)] +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)] #[serde(rename_all = "snake_case")] -pub(crate) struct SledRepo { +pub(crate) struct Sled { // defaults to {config.path} #[clap(long, help = "Path in which pict-rs will create it's 'repo' directory")] #[serde(skip_serializing_if = "Option::is_none")] - sled_repo_path: Option, + pub(crate) sled_path: Option, #[clap( long, help = "The number of bytes sled is allowed to use for it's in-memory cache" )] #[serde(skip_serializing_if = "Option::is_none")] - sled_repo_cache_capacity: Option, + pub(crate) sled_cache_capacity: Option, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] +#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] #[serde(rename_all = "snake_case")] pub(crate) enum Store { Filesystem, - #[cfg(feature = "object-storage")] ObjectStorage, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)] +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)] #[serde(rename_all = "snake_case")] pub(crate) struct FilesystemStorage { // defaults to {config.path} @@ -210,51 +223,71 @@ pub(crate) struct FilesystemStorage { help = "Path in which pict-rs will create it's 'files' directory" )] #[serde(skip_serializing_if = "Option::is_none")] - filesystem_storage_path: Option, + pub(crate) filesystem_storage_path: Option, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Parser)] +#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, Parser)] #[serde(rename_all = "snake_case")] pub(crate) struct ObjectStorage { #[serde(skip_serializing_if = "Option::is_none")] #[clap(long, help = "Name of the bucket in which pict-rs will store images")] - s3_store_bucket_name: Option, + object_store_bucket_name: Option, #[serde(skip_serializing_if = "Option::is_none")] #[clap( long, help = "Region in which the bucket exists, can be an http endpoint" )] - s3_store_region: Option>, + object_store_region: Option>, #[serde(skip_serializing_if = "Option::is_none")] #[clap(long)] - s3_store_access_key: Option, + object_store_access_key: Option, #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] - s3_store_secret_key: Option, + object_store_secret_key: Option, #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] - s3_store_security_token: Option, + object_store_security_token: Option, #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] - s3_store_session_token: Option, + object_store_session_token: Option, +} + +pub(crate) struct RequiredSledRepo { + pub(crate) path: PathBuf, + pub(crate) cache_capacity: u64, } pub(crate) struct RequiredObjectStorage { pub(crate) bucket_name: String, pub(crate) region: s3::Region, pub(crate) access_key: Option, + pub(crate) secret_key: Option, pub(crate) security_token: Option, pub(crate) session_token: Option, } +pub(crate) struct RequiredFilesystemStorage { + pub(crate) path: PathBuf, +} + +pub(crate) enum Storage { + ObjectStorage(RequiredObjectStorage), + Filesystem(RequiredFilesystemStorage), +} + +pub(crate) enum Repository { + Sled(RequiredSledRepo), +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Config { + command: Command, skip_validate_imports: bool, addr: SocketAddr, path: PathBuf, @@ -264,59 +297,52 @@ pub(crate) struct Config { max_image_width: usize, max_image_height: usize, max_image_area: usize, - sled_cache_capacity: u64, console_buffer_capacity: Option, api_key: Option, opentelemetry_url: Option, repo: Repo, - sled_repo: SledRepo, + sled: Option, store: Store, - filesystem_storage: FilesystemStorage, - object_storage: ObjectStorage, + filesystem_storage: Option, + object_storage: Option, } #[derive(serde::Serialize)] #[serde(rename_all = "snake_case")] pub(crate) struct Defaults { + command: Command, skip_validate_imports: bool, addr: SocketAddr, max_file_size: usize, max_image_width: usize, max_image_height: usize, max_image_area: usize, - sled_cache_capacity: u64, repo: Repo, - sled_repo: SledRepoDefaults, + sled: SledDefaults, store: Store, - filesystem_store: FilesystemDefaults, } #[derive(serde::Serialize)] #[serde(rename_all = "snake_case")] -struct SledRepoDefaults { - sled_repo_cache_capacity: usize, +struct SledDefaults { + sled_cache_capacity: usize, } -#[derive(serde::Serialize)] -#[serde(rename_all = "snake_case")] -struct FilesystemDefaults {} - impl Defaults { fn new() -> Self { Defaults { + command: Command::Run, skip_validate_imports: false, addr: ([0, 0, 0, 0], 8080).into(), max_file_size: 40, max_image_width: 10_000, max_image_height: 10_000, max_image_area: 40_000_000, - sled_cache_capacity: 1024 * 1024 * 64, // 16 times smaller than sled's default of 1GB repo: Repo::Sled, - sled_repo: SledRepoDefaults { - sled_repo_cache_capacity: 1024 * 1024 * 64, + sled: SledDefaults { + sled_cache_capacity: 1024 * 1024 * 64, }, store: Store::Filesystem, - filesystem_store: FilesystemDefaults {}, } } } @@ -332,8 +358,6 @@ impl Config { base_config = base_config.add_source(config::File::from(path)); }; - // TODO: Command parsing - if !args.overrides.is_default() { let merging = config::Config::try_from(&args.overrides)?; @@ -348,20 +372,88 @@ impl Config { Ok(config) } - pub(crate) fn store(&self) -> &Store { - &self.store + pub(crate) fn command(&self) -> anyhow::Result { + Ok(match &self.command { + Command::Run => CommandConfig::Run, + Command::MigrateStore { to } => CommandConfig::MigrateStore { + to: match to { + Store::ObjectStorage => Storage::ObjectStorage( + self.object_storage + .as_ref() + .cloned() + .unwrap_or_default() + .required()?, + ), + Store::Filesystem => Storage::Filesystem(RequiredFilesystemStorage { + path: self + .filesystem_storage + .as_ref() + .and_then(|f| f.filesystem_storage_path.clone()) + .unwrap_or_else(|| { + let mut path = self.path.clone(); + path.push("files"); + path + }), + }), + }, + }, + Command::MigrateRepo { to } => CommandConfig::MigrateRepo { + to: match to { + Repo::Sled => { + let sled = self.sled.as_ref().cloned().unwrap_or_default(); + + Repository::Sled(RequiredSledRepo { + path: sled.sled_path.unwrap_or_else(|| { + let mut path = self.path.clone(); + path.push("sled-repo"); + path + }), + cache_capacity: sled.sled_cache_capacity.unwrap_or(1024 * 1024 * 64), + }) + } + }, + }, + }) } - pub(crate) fn repo(&self) -> &Repo { - &self.repo + pub(crate) fn store(&self) -> anyhow::Result { + Ok(match self.store { + Store::Filesystem => Storage::Filesystem(RequiredFilesystemStorage { + path: self + .filesystem_storage + .as_ref() + .and_then(|f| f.filesystem_storage_path.clone()) + .unwrap_or_else(|| { + let mut path = self.path.clone(); + path.push("files"); + path + }), + }), + Store::ObjectStorage => Storage::ObjectStorage( + self.object_storage + .as_ref() + .cloned() + .unwrap_or_default() + .required()?, + ), + }) } - pub(crate) fn object_storage(&self) -> Result { - self.object_storage.required() - } + pub(crate) fn repo(&self) -> Repository { + match self.repo { + Repo::Sled => { + let sled = self.sled.as_ref().cloned().unwrap_or_default(); - pub(crate) fn filesystem_storage_path(&self) -> Option<&PathBuf> { - self.filesystem_storage.filesystem_storage_path.as_ref() + Repository::Sled(RequiredSledRepo { + path: sled.sled_path.unwrap_or_else(|| { + let mut path = self.path.clone(); + path.push("sled-repo"); + path + }), + cache_capacity: sled.sled_cache_capacity.unwrap_or(1024 * 1024 * 64), + }) + } + } } pub(crate) fn bind_address(&self) -> SocketAddr { @@ -372,10 +464,6 @@ impl Config { self.path.clone() } - pub(crate) fn sled_cache_capacity(&self) -> u64 { - self.sled_cache_capacity - } - pub(crate) fn console_buffer_capacity(&self) -> Option { self.console_buffer_capacity } @@ -430,10 +518,10 @@ pub(crate) struct StoreError(String); pub(crate) struct RepoError(String); #[derive(Debug, thiserror::Error)] -#[error("Missing required fields")] -pub(crate) struct RequiredError; +#[error("Missing required {0} field")] +pub(crate) struct RequiredError(&'static str); -#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] +#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, ArgEnum)] #[serde(rename_all = "snake_case")] pub(crate) enum Format { Jpeg, diff --git a/src/error.rs b/src/error.rs index deec103..7e02205 100644 --- a/src/error.rs +++ b/src/error.rs @@ -69,7 +69,6 @@ pub(crate) enum UploadError { #[error(transparent)] FileStore(#[from] crate::store::file_store::FileError), - #[cfg(feature = "object-storage")] #[error(transparent)] ObjectStore(#[from] crate::store::object_store::ObjectError), diff --git a/src/main.rs b/src/main.rs index 6013799..305df02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use futures_util::{ stream::{empty, once}, Stream, }; -use once_cell::sync::{Lazy, OnceCell}; +use once_cell::sync::Lazy; use std::{ collections::HashSet, future::ready, @@ -47,18 +47,17 @@ mod tmp_file; mod upload_manager; mod validate; -use crate::{magick::details_hint, store::file_store::FileStore}; - use self::{ concurrent_processor::CancelSafeProcessor, - config::{Config, Format, Migrate}, + config::{CommandConfig, Config, Format, RequiredFilesystemStorage, RequiredObjectStorage}, details::Details, either::Either, error::{Error, UploadError}, init_tracing::init_tracing, + magick::details_hint, middleware::{Deadline, Internal}, migrate::LatestDb, - store::Store, + store::{file_store::FileStore, object_store::ObjectStore, Store}, upload_manager::{UploadManager, UploadManagerSession}, }; @@ -67,7 +66,6 @@ const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; const DAYS: u32 = 24 * HOURS; -static MIGRATE: OnceCell = OnceCell::new(); static CONFIG: Lazy = Lazy::new(|| Config::build().unwrap()); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); @@ -694,7 +692,6 @@ fn build_client() -> awc::Client { .finish() } -#[cfg(feature = "object-storage")] fn build_reqwest_client() -> reqwest::Result { reqwest::Client::builder() .user_agent("pict-rs v0.3.0-main") @@ -839,35 +836,30 @@ async fn migrate_inner( manager: &UploadManager, db: &sled::Db, from: S1, - to: &config::Store, + to: &config::Storage, ) -> anyhow::Result<()> where S1: Store, Error: From, { match to { - config::Store::FileStore { path } => { - let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); - - let to = FileStore::build(path, db)?; + config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { + let to = FileStore::build(path.clone(), db)?; manager.restructure(&to).await?; manager.migrate_store::(from, to).await?; } - #[cfg(feature = "object-storage")] - config::Store::S3Store { + config::Storage::ObjectStorage(RequiredObjectStorage { bucket_name, region, access_key, secret_key, security_token, session_token, - } => { - use store::object_store::ObjectStore; - + }) => { let to = ObjectStore::build( bucket_name, - (**region).clone(), + region.clone(), access_key.clone(), secret_key.clone(), security_token.clone(), @@ -891,75 +883,78 @@ async fn main() -> anyhow::Result<()> { CONFIG.console_buffer_capacity(), )?; - let db = LatestDb::exists(CONFIG.data_dir(), CONFIG.sled_cache_capacity()).migrate()?; + let db = LatestDb::exists(CONFIG.data_dir()).migrate()?; + + let repo = self::repo::Repo::open(CONFIG.repo())?; + + repo.from_db(db).await?; let manager = UploadManager::new(db.clone(), CONFIG.format()).await?; - if let Some(m) = MIGRATE.get() { - let from = m.from(); - let to = m.to(); - - match from { - config::Store::FileStore { path } => { - let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); - - let from = FileStore::build(path, &db)?; - manager.restructure(&from).await?; - - migrate_inner(&manager, &db, from, to).await?; - } - #[cfg(feature = "object-storage")] - config::Store::S3Store { - bucket_name, - region, - access_key, - secret_key, - security_token, - session_token, - } => { - let from = crate::store::object_store::ObjectStore::build( - bucket_name, - (**region).clone(), - access_key.clone(), - secret_key.clone(), - security_token.clone(), - session_token.clone(), - &db, - build_reqwest_client()?, - )?; - - migrate_inner(&manager, &db, from, to).await?; - } + match CONFIG.command()? { + CommandConfig::Run => (), + CommandConfig::MigrateRepo { to: _ } => { + unimplemented!("Repo migrations are currently unsupported") } + CommandConfig::MigrateStore { to } => { + let from = CONFIG.store()?; - return Ok(()); + match from { + config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { + let from = FileStore::build(path.clone(), &db)?; + manager.restructure(&from).await?; + + migrate_inner(&manager, &db, from, &to).await?; + } + config::Storage::ObjectStorage(RequiredObjectStorage { + bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, + }) => { + let from = ObjectStore::build( + &bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, + &db, + build_reqwest_client()?, + )?; + + migrate_inner(&manager, &db, from, &to).await?; + } + } + + return Ok(()); + } } - match CONFIG.store() { - config::Store::FileStore { path } => { - let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); - - let store = FileStore::build(path.clone(), &db)?; + match CONFIG.store()? { + config::Storage::Filesystem(RequiredFilesystemStorage { path }) => { + let store = FileStore::build(path, &db)?; manager.restructure(&store).await?; launch(manager, store).await } - #[cfg(feature = "object-storage")] - config::Store::S3Store { + config::Storage::ObjectStorage(RequiredObjectStorage { bucket_name, region, access_key, secret_key, security_token, session_token, - } => { - let store = crate::store::object_store::ObjectStore::build( - bucket_name, - (**region).clone(), - access_key.clone(), - secret_key.clone(), - security_token.clone(), - session_token.clone(), + }) => { + let store = ObjectStore::build( + &bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, &db, build_reqwest_client()?, )?; diff --git a/src/migrate.rs b/src/migrate.rs index 90a391d..8ac65d6 100644 --- a/src/migrate.rs +++ b/src/migrate.rs @@ -51,30 +51,21 @@ trait SledTree { pub(crate) struct LatestDb { root_dir: PathBuf, version: DbVersion, - cache_capacity: u64, } impl LatestDb { - pub(crate) fn exists(root_dir: PathBuf, cache_capacity: u64) -> Self { - let version = DbVersion::exists(root_dir.clone(), cache_capacity); + pub(crate) fn exists(root_dir: PathBuf) -> Self { + let version = DbVersion::exists(root_dir.clone()); - LatestDb { - root_dir, - version, - cache_capacity, - } + LatestDb { root_dir, version } } pub(crate) fn migrate(self) -> Result { - let LatestDb { - root_dir, - version, - cache_capacity, - } = self; + let LatestDb { root_dir, version } = self; loop { let root_dir2 = root_dir.clone(); - let res = std::panic::catch_unwind(move || version.migrate(root_dir2, cache_capacity)); + let res = std::panic::catch_unwind(move || version.migrate(root_dir2)); if let Ok(res) = res { return res; @@ -90,17 +81,17 @@ enum DbVersion { } impl DbVersion { - fn exists(root: PathBuf, cache_capacity: u64) -> Self { - if s034::exists(root.clone()) && !s034::migrating(root, cache_capacity) { + fn exists(root: PathBuf) -> Self { + if s034::exists(root.clone()) && !s034::migrating(root) { return DbVersion::Sled034; } DbVersion::Fresh } - fn migrate(self, root: PathBuf, cache_capacity: u64) -> Result { + fn migrate(self, root: PathBuf) -> Result { match self { - DbVersion::Sled034 | DbVersion::Fresh => s034::open(root, cache_capacity), + DbVersion::Sled034 | DbVersion::Fresh => s034::open(root), } } } diff --git a/src/migrate/repo.rs b/src/migrate/repo.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/migrate/s034.rs b/src/migrate/s034.rs index 5638b46..5bcad24 100644 --- a/src/migrate/s034.rs +++ b/src/migrate/s034.rs @@ -14,8 +14,8 @@ pub(crate) fn exists(mut base: PathBuf) -> bool { std::fs::metadata(base).is_ok() } -pub(crate) fn migrating(base: PathBuf, cache_capacity: u64) -> bool { - if let Ok(db) = open(base, cache_capacity) { +pub(crate) fn migrating(base: PathBuf) -> bool { + if let Ok(db) = open(base) { if let Ok(tree) = db.open_tree("migrate") { if let Ok(Some(_)) = tree.get("done") { return false; @@ -26,12 +26,12 @@ pub(crate) fn migrating(base: PathBuf, cache_capacity: u64) -> bool { true } -pub(crate) fn open(mut base: PathBuf, cache_capacity: u64) -> Result { +pub(crate) fn open(mut base: PathBuf) -> Result { base.push("sled"); base.push(SLED_034); let db = sled034::Config::default() - .cache_capacity(cache_capacity) + .cache_capacity(1024 * 1024 * 64) .path(base) .open()?; diff --git a/src/repo.rs b/src/repo.rs index 87d751a..13d1b78 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,9 +1,17 @@ -use crate::{details::Details, store::Identifier}; +use crate::config::RequiredSledRepo; +use crate::{config::Repository, details::Details, store::Identifier}; use futures_util::Stream; use uuid::Uuid; +mod old; pub(crate) mod sled; +#[derive(Debug)] +pub(crate) enum Repo { + Sled(self::sled::SledRepo), +} + +#[derive(Clone, Debug)] pub(crate) struct Alias { id: Uuid, extension: String, @@ -15,6 +23,201 @@ pub(crate) struct DeleteToken { pub(crate) struct AlreadyExists; +#[async_trait::async_trait] +pub(crate) trait SettingsRepo { + type Bytes: AsRef<[u8]> + From>; + type Error: std::error::Error; + + async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>; + async fn get(&self, key: &'static [u8]) -> Result, Self::Error>; + async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>; +} + +#[async_trait::async_trait] +pub(crate) trait IdentifierRepo { + type Error: std::error::Error; + + async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>; + async fn details(&self, identifier: I) -> Result, Self::Error>; + + async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>; +} + +#[async_trait::async_trait] +pub(crate) trait HashRepo { + type Bytes: AsRef<[u8]> + From>; + type Error: std::error::Error; + type Stream: Stream>; + + async fn hashes(&self) -> Self::Stream; + + async fn create(&self, hash: Self::Bytes) -> Result, Self::Error>; + + async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; + async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; + async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error>; + + async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>; + async fn identifier(&self, hash: Self::Bytes) -> Result; + + async fn relate_variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + identifier: I, + ) -> Result<(), Self::Error>; + async fn variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result, Self::Error>; + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: I, + ) -> Result<(), Self::Error>; + async fn motion_identifier(&self, hash: Self::Bytes) -> Result, Self::Error>; + + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>; +} + +#[async_trait::async_trait] +pub(crate) trait AliasRepo { + type Bytes: AsRef<[u8]> + From>; + type Error: std::error::Error; + + async fn create(&self, alias: Alias) -> Result, Self::Error>; + + async fn relate_delete_token( + &self, + alias: Alias, + delete_token: DeleteToken, + ) -> Result, Self::Error>; + async fn delete_token(&self, alias: Alias) -> Result; + + async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>; + async fn hash(&self, alias: Alias) -> Result; + + async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>; +} + +impl Repo { + pub(crate) fn open(config: Repository) -> anyhow::Result { + match config { + Repository::Sled(RequiredSledRepo { + mut path, + cache_capacity, + }) => { + path.push("v0.4.0-alpha.1"); + + let db = ::sled::Config::new() + .cache_capacity(cache_capacity) + .path(path) + .open()?; + + Ok(Self::Sled(self::sled::SledRepo::new(db)?)) + } + } + } + + #[tracing::instrument(skip_all)] + pub(crate) async fn from_db(&self, db: ::sled::Db) -> anyhow::Result<()> { + if self.has_migrated().await? { + return Ok(()); + } + + let old = self::old::Old::open(db)?; + + for hash in old.hashes() { + match self { + Self::Sled(repo) => { + if let Err(e) = migrate_hash(repo, &old, hash).await { + tracing::error!("Failed to migrate hash: {}", e); + } + } + } + } + + self.mark_migrated().await?; + + Ok(()) + } + + async fn has_migrated(&self) -> anyhow::Result { + match self { + Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_O1).await?.is_some()), + } + } + + async fn mark_migrated(&self) -> anyhow::Result<()> { + match self { + Self::Sled(repo) => { + repo.set(REPO_MIGRATION_O1, b"1".to_vec().into()).await?; + } + } + + Ok(()) + } +} + +const REPO_MIGRATION_O1: &[u8] = b"repo-migration-01"; +const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; +const GENERATOR_KEY: &[u8] = b"last-path"; + +async fn migrate_hash(repo: &T, old: &old::Old, hash: ::sled::IVec) -> anyhow::Result<()> +where + T: IdentifierRepo<::sled::IVec>, + >::Error: Send + Sync + 'static, + T: HashRepo<::sled::IVec>, + >::Error: Send + Sync + 'static, + T: AliasRepo, + ::Error: Send + Sync + 'static, + T: SettingsRepo, + ::Error: Send + Sync + 'static, +{ + HashRepo::create(repo, hash.to_vec().into()).await?; + + let main_ident = old.main_identifier(&hash)?; + + HashRepo::relate_identifier(repo, hash.to_vec().into(), main_ident.clone()).await?; + + for alias in old.aliases(&hash) { + if let Ok(Ok(())) = AliasRepo::create(repo, alias.clone()).await { + let _ = HashRepo::relate_alias(repo, hash.to_vec().into(), alias.clone()).await; + let _ = AliasRepo::relate_hash(repo, alias.clone(), hash.to_vec().into()).await; + + if let Ok(Some(delete_token)) = old.delete_token(&alias) { + let _ = AliasRepo::relate_delete_token(repo, alias, delete_token).await; + } + } + } + + if let Ok(Some(identifier)) = old.motion_identifier(&hash) { + HashRepo::relate_motion_identifier(repo, hash.to_vec().into(), identifier).await; + } + + for (variant, identifier) in old.variants(&hash)? { + let _ = + HashRepo::relate_variant_identifier(repo, hash.to_vec().into(), variant, identifier) + .await; + } + + for (identifier, details) in old.details(&hash)? { + let _ = IdentifierRepo::relate_details(repo, identifier, details).await; + } + + if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) { + SettingsRepo::set(repo, STORE_MIGRATION_PROGRESS, value.to_vec().into()).await?; + } + + if let Ok(Some(value)) = old.setting(GENERATOR_KEY) { + SettingsRepo::set(repo, GENERATOR_KEY, value.to_vec().into()).await?; + } + + Ok(()) +} + impl Alias { fn to_bytes(&self) -> Vec { let mut v = self.id.as_bytes().to_vec(); @@ -48,66 +251,38 @@ impl DeleteToken { } } -#[async_trait::async_trait] -pub(crate) trait SettingsRepo { - type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; - - async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Self::Error>; - async fn get(&self, key: &'static [u8]) -> Result, Self::Error>; - async fn remove(&self, key: &'static [u8]) -> Result<(), Self::Error>; +impl std::fmt::Display for Alias { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}", self.id, self.extension) + } } -#[async_trait::async_trait] -pub(crate) trait IdentifierRepo { - type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; +impl Identifier for Vec { + type Error = std::convert::Infallible; - async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error>; - async fn details(&self, identifier: I) -> Result, Self::Error>; + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized, + { + Ok(bytes) + } - async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error>; - async fn hash(&self, identifier: I) -> Result; - - async fn cleanup(&self, identifier: I) -> Result<(), Self::Error>; + fn to_bytes(&self) -> Result, Self::Error> { + Ok(self.clone()) + } } -#[async_trait::async_trait] -pub(crate) trait HashRepo { - type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; - type Stream: Stream>; +impl Identifier for ::sled::IVec { + type Error = std::convert::Infallible; - async fn hashes(&self) -> Self::Stream; + fn from_bytes(bytes: Vec) -> Result + where + Self: Sized, + { + Ok(bytes.into()) + } - async fn create(&self, hash: Self::Bytes) -> Result, Self::Error>; - - async fn relate_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; - async fn remove_alias(&self, hash: Self::Bytes, alias: Alias) -> Result<(), Self::Error>; - async fn aliases(&self, hash: Self::Bytes) -> Result, Self::Error>; - - async fn relate_identifier(&self, hash: Self::Bytes, identifier: I) -> Result<(), Self::Error>; - async fn identifier(&self, hash: Self::Bytes) -> Result; - - async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error>; -} - -#[async_trait::async_trait] -pub(crate) trait AliasRepo { - type Bytes: AsRef<[u8]> + From>; - type Error: std::error::Error; - - async fn create(&self, alias: Alias) -> Result, Self::Error>; - - async fn relate_delete_token( - &self, - alias: Alias, - delete_token: DeleteToken, - ) -> Result, Self::Error>; - async fn delete_token(&self, alias: Alias) -> Result; - - async fn relate_hash(&self, alias: Alias, hash: Self::Bytes) -> Result<(), Self::Error>; - async fn hash(&self, alias: Alias) -> Result; - - async fn cleanup(&self, alias: Alias) -> Result<(), Self::Error>; + fn to_bytes(&self) -> Result, Self::Error> { + Ok(self.to_vec()) + } } diff --git a/src/repo/old.rs b/src/repo/old.rs new file mode 100644 index 0000000..a61b402 --- /dev/null +++ b/src/repo/old.rs @@ -0,0 +1,171 @@ +// TREE STRUCTURE +// - Alias Tree +// - alias -> hash +// - alias / id -> u64(id) +// - alias / delete -> delete token +// - Main Tree +// - hash -> filename +// - hash 0 u64(id) -> alias +// - Filename Tree +// - filename -> hash +// - Details Tree +// - filename / S::Identifier -> details +// - Identifier Tree +// - filename -> S::Identifier +// - filename / variant path -> S::Identifier +// - filename / motion -> S::Identifier +// - Settings Tree +// - store-migration-progress -> Path Tree Key + +use super::{Alias, DeleteToken, Details}; +use uuid::Uuid; + +pub(super) struct Old { + alias_tree: ::sled::Tree, + filename_tree: ::sled::Tree, + main_tree: ::sled::Tree, + details_tree: ::sled::Tree, + settings_tree: ::sled::Tree, + identifier_tree: ::sled::Tree, + _db: ::sled::Db, +} + +impl Old { + pub(super) fn open(db: sled::Db) -> anyhow::Result { + Ok(Self { + alias_tree: db.open_tree("alias")?, + filename_tree: db.open_tree("filename")?, + main_tree: db.open_tree("main")?, + details_tree: db.open_tree("details")?, + settings_tree: db.open_tree("settings")?, + identifier_tree: db.open_tree("path")?, + _db: db, + }) + } + + pub(super) fn setting(&self, key: &[u8]) -> anyhow::Result> { + Ok(self.settings_tree.get(key)?) + } + + pub(super) fn hashes(&self) -> impl std::iter::Iterator { + self.filename_tree + .iter() + .values() + .filter_map(|res| res.ok()) + } + + pub(super) fn details(&self, hash: &sled::IVec) -> anyhow::Result> { + let filename = self + .main_tree + .get(hash)? + .ok_or_else(|| anyhow::anyhow!("missing filename"))?; + + let filename = String::from_utf8_lossy(&filename); + + Ok(self + .identifier_tree + .scan_prefix(filename.as_bytes()) + .values() + .filter_map(Result::ok) + .filter_map(|identifier| { + let mut key = filename.as_bytes().to_vec(); + key.push(b'/'); + key.extend_from_slice(&identifier); + + let details = self.details_tree.get(key).ok()??; + let details = serde_json::from_slice(&details).ok()?; + + Some((identifier, details)) + }) + .collect()) + } + + pub(super) fn main_identifier(&self, hash: &sled::IVec) -> anyhow::Result { + let filename = self + .main_tree + .get(hash)? + .ok_or_else(|| anyhow::anyhow!("Missing filename"))?; + + self.filename_tree + .get(filename)? + .ok_or_else(|| anyhow::anyhow!("Missing identifier")) + } + + pub(super) fn variants(&self, hash: &sled::IVec) -> anyhow::Result> { + let filename = self + .main_tree + .get(hash)? + .ok_or_else(|| anyhow::anyhow!("Missing filename"))?; + + let filename_string = String::from_utf8_lossy(&filename); + + let variant_prefix = format!("{}/", filename_string); + + Ok(self + .identifier_tree + .scan_prefix(&variant_prefix) + .filter_map(|res| res.ok()) + .filter_map(|(key, value)| { + let key_str = String::from_utf8_lossy(&key); + let variant_path = key_str.trim_start_matches(&variant_prefix); + if variant_path == "motion" { + return None; + } + + Some((variant_path.to_string(), value)) + }) + .collect()) + } + + pub(super) fn motion_identifier( + &self, + hash: &sled::IVec, + ) -> anyhow::Result> { + let filename = self + .main_tree + .get(hash)? + .ok_or_else(|| anyhow::anyhow!("Missing filename"))?; + + let filename_string = String::from_utf8_lossy(&filename); + + let motion_key = format!("{}/motion", filename_string); + + Ok(self.filename_tree.get(motion_key)?) + } + + pub(super) fn aliases(&self, hash: &sled::IVec) -> Vec { + let mut key = hash.to_vec(); + key.push(0); + + self.main_tree + .scan_prefix(key) + .values() + .filter_map(|res| res.ok()) + .filter_map(|alias| { + let alias_str = String::from_utf8_lossy(&alias); + + let (uuid, ext) = alias_str.split_once('.')?; + + let uuid = uuid.parse::().ok()?; + + Some(Alias { + id: uuid, + extension: ext.to_string(), + }) + }) + .collect() + } + + pub(super) fn delete_token(&self, alias: &Alias) -> anyhow::Result> { + let key = format!("{}{}/delete", alias.id, alias.extension); + + if let Some(ivec) = self.alias_tree.get(key)? { + let token_str = String::from_utf8_lossy(&ivec); + if let Ok(uuid) = token_str.parse::() { + return Ok(Some(DeleteToken { id: uuid })); + } + } + + Ok(None) + } +} diff --git a/src/repo/sled.rs b/src/repo/sled.rs index b543202..b35500b 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -18,7 +18,7 @@ pub(crate) enum Error { Sled(#[from] sled::Error), #[error("Invalid identifier")] - Identifier(#[source] Box), + Identifier(#[source] Box), #[error("Invalid details json")] Details(#[from] serde_json::Error), @@ -32,11 +32,12 @@ pub(crate) enum Error { pub(crate) struct SledRepo { settings: Tree, - identifier_hashes: Tree, identifier_details: Tree, hashes: Tree, hash_aliases: Tree, hash_identifiers: Tree, + hash_variant_identifiers: Tree, + hash_motion_identifiers: Tree, aliases: Tree, alias_hashes: Tree, alias_delete_tokens: Tree, @@ -47,11 +48,12 @@ impl SledRepo { pub(crate) fn new(db: Db) -> Result { Ok(SledRepo { settings: db.open_tree("pict-rs-settings-tree")?, - identifier_hashes: db.open_tree("pict-rs-identifier-hashes-tree")?, identifier_details: db.open_tree("pict-rs-identifier-details-tree")?, hashes: db.open_tree("pict-rs-hashes-tree")?, hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?, hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?, + hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?, + hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?, aliases: db.open_tree("pict-rs-aliases-tree")?, alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, @@ -87,20 +89,26 @@ impl SettingsRepo for SledRepo { fn identifier_bytes(identifier: &I) -> Result, Error> where I: Identifier, - I::Error: Send + 'static, + I::Error: Send + Sync + 'static, { identifier .to_bytes() .map_err(|e| Error::Identifier(Box::new(e))) } +fn variant_key(hash: &[u8], variant: &str) -> Result, Error> { + let mut bytes = hash.to_vec(); + bytes.push(b'/'); + bytes.extend_from_slice(variant.as_bytes()); + Ok(bytes) +} + #[async_trait::async_trait] impl IdentifierRepo for SledRepo where I: Identifier + 'static, - I::Error: Send + 'static, + I::Error: Send + Sync + 'static, { - type Bytes = IVec; type Error = Error; async fn relate_details(&self, identifier: I, details: Details) -> Result<(), Self::Error> { @@ -128,27 +136,9 @@ where } } - async fn relate_hash(&self, identifier: I, hash: Self::Bytes) -> Result<(), Self::Error> { - let key = identifier_bytes(&identifier)?; - - b!(self.identifier_hashes, identifier_hashes.insert(key, hash)); - - Ok(()) - } - - async fn hash(&self, identifier: I) -> Result { - let key = identifier_bytes(&identifier)?; - - let opt = b!(self.identifier_hashes, identifier_hashes.get(key)); - - opt.ok_or(Error::Missing) - } - async fn cleanup(&self, identifier: I) -> Result<(), Self::Error> { let key = identifier_bytes(&identifier)?; - let key2 = key.clone(); - b!(self.identifier_hashes, identifier_hashes.remove(key2)); b!(self.identifier_details, identifier_details.remove(key)); Ok(()) @@ -218,7 +208,7 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { impl HashRepo for SledRepo where I: Identifier + 'static, - I::Error: Send + 'static, + I::Error: Send + Sync + 'static, { type Bytes = IVec; type Error = Error; @@ -290,6 +280,74 @@ where }) } + async fn relate_variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + identifier: I, + ) -> Result<(), Self::Error> { + let key = variant_key(&hash, &variant)?; + let value = identifier_bytes(&identifier)?; + + b!( + self.hash_variant_identifiers, + hash_variant_identifiers.insert(key, value) + ); + + Ok(()) + } + + async fn variant_identifier( + &self, + hash: Self::Bytes, + variant: String, + ) -> Result, Self::Error> { + let key = variant_key(&hash, &variant)?; + + let opt = b!( + self.hash_variant_identifiers, + hash_variant_identifiers.get(key) + ); + + if let Some(ivec) = opt { + Ok(Some( + I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?, + )) + } else { + Ok(None) + } + } + + async fn relate_motion_identifier( + &self, + hash: Self::Bytes, + identifier: I, + ) -> Result<(), Self::Error> { + let bytes = identifier_bytes(&identifier)?; + + b!( + self.hash_motion_identifiers, + hash_motion_identifiers.insert(hash, bytes) + ); + + Ok(()) + } + + async fn motion_identifier(&self, hash: Self::Bytes) -> Result, Self::Error> { + let opt = b!( + self.hash_motion_identifiers, + hash_motion_identifiers.get(hash) + ); + + if let Some(ivec) = opt { + Ok(Some( + I::from_bytes(ivec.to_vec()).map_err(|e| Error::Identifier(Box::new(e)))?, + )) + } else { + Ok(None) + } + } + async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Self::Error> { let hash2 = hash.clone(); b!(self.hashes, hashes.remove(hash2)); @@ -297,17 +355,39 @@ where let hash2 = hash.clone(); b!(self.hash_identifiers, hash_identifiers.remove(hash2)); - let aliases = HashRepo::::aliases(self, hash.clone()).await?; + let hash2 = hash.clone(); + b!( + self.hash_motion_identifiers, + hash_motion_identifiers.remove(hash2) + ); + let aliases = HashRepo::::aliases(self, hash.clone()).await?; + let hash2 = hash.clone(); b!(self.hash_aliases, { for alias in aliases { - let key = hash_alias_key(&hash, &alias); + let key = hash_alias_key(&hash2, &alias); let _ = hash_aliases.remove(key); } Ok(()) as Result<(), Error> }); + let variant_keys = b!(self.hash_variant_identifiers, { + let v = hash_variant_identifiers + .scan_prefix(hash) + .keys() + .filter_map(Result::ok) + .collect::>(); + + Ok(v) as Result, Error> + }); + b!(self.hash_variant_identifiers, { + for key in variant_keys { + let _ = hash_variant_identifiers.remove(key); + } + Ok(()) as Result<(), Error> + }); + Ok(()) } } diff --git a/src/store.rs b/src/store.rs index d9af237..46fad1e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -5,7 +5,6 @@ use futures_util::stream::Stream; use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod file_store; -#[cfg(feature = "object-storage")] pub(crate) mod object_store; pub(crate) trait Identifier: Send + Sync + Clone + Debug { diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 1b885df..84b2f0e 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -159,7 +159,7 @@ impl FileStore { self.settings_tree .insert(GENERATOR_KEY, path.to_be_bytes())?; - let mut target_path = self.root_dir.join("files"); + let mut target_path = self.root_dir.clone(); for dir in path.to_strings() { target_path.push(dir) }