From 29cab025c609a6025d087d87086e4faeb6527ddb Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 27 Sep 2022 23:19:52 -0500 Subject: [PATCH] Improve 0.3 migration code, repo traces --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/repo.rs | 99 ++++++++++++++++++++------------- src/repo/sled.rs | 52 ++++++++--------- src/store/file_store/file_id.rs | 2 +- 5 files changed, 94 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a8e531..01b5c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -965,6 +965,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.12.1" @@ -1568,6 +1574,7 @@ dependencies = [ "console-subscriber", "dashmap", "futures-util", + "hex", "md-5", "mime", "num_cpus", diff --git a/Cargo.toml b/Cargo.toml index 58d6a6b..249326c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" futures-util = "0.3.17" +hex = "0.4.3" md-5 = "0.10.5" mime = "0.3.1" num_cpus = "1.13" diff --git a/src/repo.rs b/src/repo.rs index aa75244..47fdcb6 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -6,6 +6,7 @@ use crate::{ }; use futures_util::Stream; use std::{fmt::Debug, path::PathBuf}; +use tracing::Instrument; use uuid::Uuid; mod old; @@ -59,7 +60,7 @@ pub(crate) trait FullRepo: + Clone + Debug { - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn identifier_from_alias( &self, alias: &Alias, @@ -68,13 +69,13 @@ pub(crate) trait FullRepo: self.identifier(hash).await } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn aliases_from_alias(&self, alias: &Alias) -> Result, Error> { let hash = self.hash(alias).await?; self.aliases(hash).await } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn still_identifier_from_alias( &self, alias: &Alias, @@ -89,7 +90,7 @@ pub(crate) trait FullRepo: } } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn check_cached(&self, alias: &Alias) -> Result<(), Error> { let aliases = CachedRepo::update(self, alias).await?; @@ -461,22 +462,35 @@ impl Repo { } } - #[tracing::instrument(skip_all)] pub(crate) async fn from_db(&self, path: PathBuf) -> color_eyre::Result<()> { if self.has_migrated().await? { return Ok(()); } if let Some(old) = self::old::Old::open(path)? { - tracing::warn!("Migrating Database from 0.3 layout to 0.4 layout"); + let span = tracing::warn_span!("Migrating Database from 0.3 layout to 0.4 layout"); - for hash in old.hashes() { - match self { - Self::Sled(repo) => { - if let Err(e) = migrate_hash(repo, &old, hash).await { - tracing::error!("Failed to migrate hash: {}", e); + match self { + Self::Sled(repo) => { + async { + for hash in old.hashes() { + if let Err(e) = migrate_hash(repo, &old, hash).await { + tracing::error!("Failed to migrate hash: {}", format!("{:?}", e)); + } + } + + if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) { + tracing::warn!("Setting STORE_MIGRATION_PROGRESS"); + let _ = repo.set(STORE_MIGRATION_PROGRESS, value).await; + } + + if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) { + tracing::warn!("Setting GENERATOR_KEY"); + let _ = repo.set(GENERATOR_KEY, value).await; } } + .instrument(span) + .await; } } } @@ -486,25 +500,33 @@ impl Repo { Ok(()) } - #[tracing::instrument(skip_all)] pub(crate) async fn migrate_identifiers(&self) -> color_eyre::Result<()> { if self.has_migrated_identifiers().await? { return Ok(()); } - tracing::warn!("Migrating File Identifiers from 0.3 format to 0.4 format"); + let span = tracing::warn_span!("Migrating File Identifiers from 0.3 format to 0.4 format"); match self { Self::Sled(repo) => { - use futures_util::StreamExt; - let mut hashes = repo.hashes().await; + async { + use futures_util::StreamExt; + let mut hashes = repo.hashes().await; - while let Some(res) = hashes.next().await { - let hash = res?; - if let Err(e) = migrate_identifiers_for_hash(repo, hash).await { - tracing::error!("Failed to migrate identifiers for hash: {}", e); + while let Some(res) = hashes.next().await { + let hash = res?; + if let Err(e) = migrate_identifiers_for_hash(repo, hash).await { + tracing::error!( + "Failed to migrate identifiers for hash: {}", + format!("{:?}", e) + ); + } } + + Ok(()) as color_eyre::Result<()> } + .instrument(span) + .await?; } } @@ -551,7 +573,7 @@ const REPO_MIGRATION_02: &str = "repo-migration-02"; const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; const GENERATOR_KEY: &str = "last-path"; -#[tracing::instrument] +#[tracing::instrument(skip(repo, hash), fields(hash = hex::encode(&hash)))] async fn migrate_identifiers_for_hash(repo: &T, hash: ::sled::IVec) -> color_eyre::Result<()> where T: FullRepo, @@ -584,7 +606,7 @@ where Ok(()) } -#[tracing::instrument] +#[tracing::instrument(skip(repo))] async fn migrate_identifier_details( repo: &T, old: &FileId, @@ -593,6 +615,11 @@ async fn migrate_identifier_details( where T: FullRepo, { + if old == new { + tracing::warn!("Old FileId and new FileId are identical"); + return Ok(()); + } + if let Some(details) = repo.details(old).await? { repo.relate_details(new, &details).await?; IdentifierRepo::cleanup(repo, old).await?; @@ -601,25 +628,26 @@ where Ok(()) } -#[tracing::instrument(skip(old))] +#[tracing::instrument(skip(repo, old, hash), fields(hash = hex::encode(&hash)))] async fn migrate_hash(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()> where T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo + Debug, { - if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() { - tracing::debug!("Duplicate hash detected"); + let new_hash: T::Bytes = hash.to_vec().into(); + let main_ident = old.main_identifier(&hash)?.to_vec(); + + if HashRepo::create(repo, new_hash.clone()).await?.is_err() { + tracing::warn!("Duplicate hash detected"); return Ok(()); } - let main_ident = old.main_identifier(&hash)?.to_vec(); - - repo.relate_identifier(hash.to_vec().into(), &main_ident) + repo.relate_identifier(new_hash.clone(), &main_ident) .await?; for alias in old.aliases(&hash) { if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await { - let _ = repo.relate_alias(hash.to_vec().into(), &alias).await; - let _ = repo.relate_hash(&alias, hash.to_vec().into()).await; + let _ = repo.relate_alias(new_hash.clone(), &alias).await; + let _ = repo.relate_hash(&alias, new_hash.clone()).await; if let Ok(Some(delete_token)) = old.delete_token(&alias) { let _ = repo.relate_delete_token(&alias, &delete_token).await; @@ -629,7 +657,7 @@ where if let Ok(Some(identifier)) = old.motion_identifier(&hash) { let _ = repo - .relate_motion_identifier(hash.to_vec().into(), &identifier.to_vec()) + .relate_motion_identifier(new_hash.clone(), &identifier.to_vec()) .await; } @@ -637,7 +665,7 @@ where let variant = variant_path.to_string_lossy().to_string(); let _ = repo - .relate_variant_identifier(hash.to_vec().into(), variant, &identifier.to_vec()) + .relate_variant_identifier(new_hash.clone(), variant, &identifier.to_vec()) .await; } @@ -645,15 +673,6 @@ where let _ = repo.relate_details(&identifier.to_vec(), &details).await; } - if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) { - repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into()) - .await?; - } - - if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) { - repo.set(GENERATOR_KEY, value.to_vec().into()).await?; - } - Ok(()) } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index c2d0472..ba69220 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -170,7 +170,7 @@ fn insert_cache_inverse( #[async_trait::async_trait(?Send)] impl CachedRepo for SledRepo { - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> { let now = DateTime::now(); let now_bytes = serde_json::to_vec(&now)?; @@ -186,7 +186,7 @@ impl CachedRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn update(&self, alias: &Alias) -> Result, Error> { let now = DateTime::now(); let now_bytes = serde_json::to_vec(&now)?; @@ -475,14 +475,14 @@ impl SettingsRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn get(&self, key: &'static str) -> Result, Error> { let opt = b!(self.settings, settings.get(key)); Ok(opt) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn remove(&self, key: &'static str) -> Result<(), Error> { b!(self.settings, settings.remove(key)); @@ -505,7 +505,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option { #[async_trait::async_trait(?Send)] impl IdentifierRepo for SledRepo { - #[tracing::instrument] + #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn relate_details( &self, identifier: &I, @@ -522,7 +522,7 @@ impl IdentifierRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn details(&self, identifier: &I) -> Result, Error> { let key = identifier.to_bytes()?; @@ -535,7 +535,7 @@ impl IdentifierRepo for SledRepo { } } - #[tracing::instrument] + #[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))] async fn cleanup(&self, identifier: &I) -> Result<(), Error> { let key = identifier.to_bytes()?; @@ -568,7 +568,7 @@ impl HashRepo for SledRepo { Box::pin(from_iterator(iter, 8)) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn create(&self, hash: Self::Bytes) -> Result, Error> { let res = b!(self.hashes, { let hash2 = hash.clone(); @@ -578,7 +578,7 @@ impl HashRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); let value = alias.to_bytes(); @@ -588,7 +588,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> { let key = hash_alias_key(&hash, alias); @@ -597,7 +597,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn aliases(&self, hash: Self::Bytes) -> Result, Error> { let v = b!(self.hash_aliases, { Ok(hash_aliases @@ -611,7 +611,7 @@ impl HashRepo for SledRepo { Ok(v) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_identifier( &self, hash: Self::Bytes, @@ -624,7 +624,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn identifier(&self, hash: Self::Bytes) -> Result { let opt = b!(self.hash_identifiers, hash_identifiers.get(hash)); @@ -633,7 +633,7 @@ impl HashRepo for SledRepo { .and_then(|ivec| I::from_bytes(ivec.to_vec())) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_variant_identifier( &self, hash: Self::Bytes, @@ -651,7 +651,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn variant_identifier( &self, hash: Self::Bytes, @@ -671,7 +671,7 @@ impl HashRepo for SledRepo { } } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn variants( &self, hash: Self::Bytes, @@ -693,7 +693,7 @@ impl HashRepo for SledRepo { Ok(vec) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> { let key = variant_key(&hash, &variant); @@ -705,7 +705,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))] async fn relate_motion_identifier( &self, hash: Self::Bytes, @@ -721,7 +721,7 @@ impl HashRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn motion_identifier( &self, hash: Self::Bytes, @@ -738,7 +738,7 @@ impl HashRepo for SledRepo { } } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> { let hash2 = hash.clone(); b!(self.hashes, hashes.remove(hash2)); @@ -785,7 +785,7 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn create(&self, alias: &Alias) -> Result, Error> { let bytes = alias.to_bytes(); let bytes2 = bytes.clone(); @@ -798,7 +798,7 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn relate_delete_token( &self, alias: &Alias, @@ -815,7 +815,7 @@ impl AliasRepo for SledRepo { Ok(res.map_err(|_| AlreadyExists)) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn delete_token(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); @@ -826,7 +826,7 @@ impl AliasRepo for SledRepo { .map_err(Error::from) } - #[tracing::instrument] + #[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))] async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> { let key = alias.to_bytes(); @@ -835,7 +835,7 @@ impl AliasRepo for SledRepo { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn hash(&self, alias: &Alias) -> Result { let key = alias.to_bytes(); @@ -844,7 +844,7 @@ impl AliasRepo for SledRepo { opt.ok_or(SledError::Missing).map_err(Error::from) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn cleanup(&self, alias: &Alias) -> Result<(), Error> { let key = alias.to_bytes(); diff --git a/src/store/file_store/file_id.rs b/src/store/file_store/file_id.rs index dab2eb2..dfb776f 100644 --- a/src/store/file_store/file_id.rs +++ b/src/store/file_store/file_id.rs @@ -7,7 +7,7 @@ use crate::{ }; use std::path::PathBuf; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct FileId(PathBuf); impl Identifier for FileId {