Improve 0.3 migration code, repo traces

This commit is contained in:
asonix 2022-09-27 23:19:52 -05:00
parent bd1390186d
commit 29cab025c6
5 changed files with 94 additions and 67 deletions

7
Cargo.lock generated
View file

@ -965,6 +965,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.12.1"
@ -1568,6 +1574,7 @@ dependencies = [
"console-subscriber",
"dashmap",
"futures-util",
"hex",
"md-5",
"mime",
"num_cpus",

View file

@ -33,6 +33,7 @@ config = "0.13.0"
console-subscriber = "0.1"
dashmap = "5.1.0"
futures-util = "0.3.17"
hex = "0.4.3"
md-5 = "0.10.5"
mime = "0.3.1"
num_cpus = "1.13"

View file

@ -6,6 +6,7 @@ use crate::{
};
use futures_util::Stream;
use std::{fmt::Debug, path::PathBuf};
use tracing::Instrument;
use uuid::Uuid;
mod old;
@ -59,7 +60,7 @@ pub(crate) trait FullRepo:
+ Clone
+ Debug
{
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
@ -68,13 +69,13 @@ pub(crate) trait FullRepo:
self.identifier(hash).await
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn aliases_from_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let hash = self.hash(alias).await?;
self.aliases(hash).await
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn still_identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
@ -89,7 +90,7 @@ pub(crate) trait FullRepo:
}
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn check_cached(&self, alias: &Alias) -> Result<(), Error> {
let aliases = CachedRepo::update(self, alias).await?;
@ -461,22 +462,35 @@ impl Repo {
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn from_db(&self, path: PathBuf) -> color_eyre::Result<()> {
if self.has_migrated().await? {
return Ok(());
}
if let Some(old) = self::old::Old::open(path)? {
tracing::warn!("Migrating Database from 0.3 layout to 0.4 layout");
let span = tracing::warn_span!("Migrating Database from 0.3 layout to 0.4 layout");
for hash in old.hashes() {
match self {
Self::Sled(repo) => {
if let Err(e) = migrate_hash(repo, &old, hash).await {
tracing::error!("Failed to migrate hash: {}", e);
match self {
Self::Sled(repo) => {
async {
for hash in old.hashes() {
if let Err(e) = migrate_hash(repo, &old, hash).await {
tracing::error!("Failed to migrate hash: {}", format!("{:?}", e));
}
}
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) {
tracing::warn!("Setting STORE_MIGRATION_PROGRESS");
let _ = repo.set(STORE_MIGRATION_PROGRESS, value).await;
}
if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) {
tracing::warn!("Setting GENERATOR_KEY");
let _ = repo.set(GENERATOR_KEY, value).await;
}
}
.instrument(span)
.await;
}
}
}
@ -486,25 +500,33 @@ impl Repo {
Ok(())
}
#[tracing::instrument(skip_all)]
pub(crate) async fn migrate_identifiers(&self) -> color_eyre::Result<()> {
if self.has_migrated_identifiers().await? {
return Ok(());
}
tracing::warn!("Migrating File Identifiers from 0.3 format to 0.4 format");
let span = tracing::warn_span!("Migrating File Identifiers from 0.3 format to 0.4 format");
match self {
Self::Sled(repo) => {
use futures_util::StreamExt;
let mut hashes = repo.hashes().await;
async {
use futures_util::StreamExt;
let mut hashes = repo.hashes().await;
while let Some(res) = hashes.next().await {
let hash = res?;
if let Err(e) = migrate_identifiers_for_hash(repo, hash).await {
tracing::error!("Failed to migrate identifiers for hash: {}", e);
while let Some(res) = hashes.next().await {
let hash = res?;
if let Err(e) = migrate_identifiers_for_hash(repo, hash).await {
tracing::error!(
"Failed to migrate identifiers for hash: {}",
format!("{:?}", e)
);
}
}
Ok(()) as color_eyre::Result<()>
}
.instrument(span)
.await?;
}
}
@ -551,7 +573,7 @@ const REPO_MIGRATION_02: &str = "repo-migration-02";
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const GENERATOR_KEY: &str = "last-path";
#[tracing::instrument]
#[tracing::instrument(skip(repo, hash), fields(hash = hex::encode(&hash)))]
async fn migrate_identifiers_for_hash<T>(repo: &T, hash: ::sled::IVec) -> color_eyre::Result<()>
where
T: FullRepo,
@ -584,7 +606,7 @@ where
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(repo))]
async fn migrate_identifier_details<T>(
repo: &T,
old: &FileId,
@ -593,6 +615,11 @@ async fn migrate_identifier_details<T>(
where
T: FullRepo,
{
if old == new {
tracing::warn!("Old FileId and new FileId are identical");
return Ok(());
}
if let Some(details) = repo.details(old).await? {
repo.relate_details(new, &details).await?;
IdentifierRepo::cleanup(repo, old).await?;
@ -601,25 +628,26 @@ where
Ok(())
}
#[tracing::instrument(skip(old))]
#[tracing::instrument(skip(repo, old, hash), fields(hash = hex::encode(&hash)))]
async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()>
where
T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo + Debug,
{
if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() {
tracing::debug!("Duplicate hash detected");
let new_hash: T::Bytes = hash.to_vec().into();
let main_ident = old.main_identifier(&hash)?.to_vec();
if HashRepo::create(repo, new_hash.clone()).await?.is_err() {
tracing::warn!("Duplicate hash detected");
return Ok(());
}
let main_ident = old.main_identifier(&hash)?.to_vec();
repo.relate_identifier(hash.to_vec().into(), &main_ident)
repo.relate_identifier(new_hash.clone(), &main_ident)
.await?;
for alias in old.aliases(&hash) {
if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await {
let _ = repo.relate_alias(hash.to_vec().into(), &alias).await;
let _ = repo.relate_hash(&alias, hash.to_vec().into()).await;
let _ = repo.relate_alias(new_hash.clone(), &alias).await;
let _ = repo.relate_hash(&alias, new_hash.clone()).await;
if let Ok(Some(delete_token)) = old.delete_token(&alias) {
let _ = repo.relate_delete_token(&alias, &delete_token).await;
@ -629,7 +657,7 @@ where
if let Ok(Some(identifier)) = old.motion_identifier(&hash) {
let _ = repo
.relate_motion_identifier(hash.to_vec().into(), &identifier.to_vec())
.relate_motion_identifier(new_hash.clone(), &identifier.to_vec())
.await;
}
@ -637,7 +665,7 @@ where
let variant = variant_path.to_string_lossy().to_string();
let _ = repo
.relate_variant_identifier(hash.to_vec().into(), variant, &identifier.to_vec())
.relate_variant_identifier(new_hash.clone(), variant, &identifier.to_vec())
.await;
}
@ -645,15 +673,6 @@ where
let _ = repo.relate_details(&identifier.to_vec(), &details).await;
}
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) {
repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into())
.await?;
}
if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) {
repo.set(GENERATOR_KEY, value.to_vec().into()).await?;
}
Ok(())
}

View file

@ -170,7 +170,7 @@ fn insert_cache_inverse(
#[async_trait::async_trait(?Send)]
impl CachedRepo for SledRepo {
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let now = DateTime::now();
let now_bytes = serde_json::to_vec(&now)?;
@ -186,7 +186,7 @@ impl CachedRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let now = DateTime::now();
let now_bytes = serde_json::to_vec(&now)?;
@ -475,14 +475,14 @@ impl SettingsRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error> {
let opt = b!(self.settings, settings.get(key));
Ok(opt)
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), Error> {
b!(self.settings, settings.remove(key));
@ -505,7 +505,7 @@ fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
#[async_trait::async_trait(?Send)]
impl IdentifierRepo for SledRepo {
#[tracing::instrument]
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
@ -522,7 +522,7 @@ impl IdentifierRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error> {
let key = identifier.to_bytes()?;
@ -535,7 +535,7 @@ impl IdentifierRepo for SledRepo {
}
}
#[tracing::instrument]
#[tracing::instrument(skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error> {
let key = identifier.to_bytes()?;
@ -568,7 +568,7 @@ impl HashRepo for SledRepo {
Box::pin(from_iterator(iter, 8))
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
@ -578,7 +578,7 @@ impl HashRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
@ -588,7 +588,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
let key = hash_alias_key(&hash, alias);
@ -597,7 +597,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases
@ -611,7 +611,7 @@ impl HashRepo for SledRepo {
Ok(v)
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -624,7 +624,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
@ -633,7 +633,7 @@ impl HashRepo for SledRepo {
.and_then(|ivec| I::from_bytes(ivec.to_vec()))
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -651,7 +651,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -671,7 +671,7 @@ impl HashRepo for SledRepo {
}
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -693,7 +693,7 @@ impl HashRepo for SledRepo {
Ok(vec)
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> {
let key = variant_key(&hash, &variant);
@ -705,7 +705,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
@ -721,7 +721,7 @@ impl HashRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -738,7 +738,7 @@ impl HashRepo for SledRepo {
}
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> {
let hash2 = hash.clone();
b!(self.hashes, hashes.remove(hash2));
@ -785,7 +785,7 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -798,7 +798,7 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn relate_delete_token(
&self,
alias: &Alias,
@ -815,7 +815,7 @@ impl AliasRepo for SledRepo {
Ok(res.map_err(|_| AlreadyExists))
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error> {
let key = alias.to_bytes();
@ -826,7 +826,7 @@ impl AliasRepo for SledRepo {
.map_err(Error::from)
}
#[tracing::instrument]
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> {
let key = alias.to_bytes();
@ -835,7 +835,7 @@ impl AliasRepo for SledRepo {
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error> {
let key = alias.to_bytes();
@ -844,7 +844,7 @@ impl AliasRepo for SledRepo {
opt.ok_or(SledError::Missing).map_err(Error::from)
}
#[tracing::instrument]
#[tracing::instrument(skip(self))]
async fn cleanup(&self, alias: &Alias) -> Result<(), Error> {
let key = alias.to_bytes();

View file

@ -7,7 +7,7 @@ use crate::{
};
use std::path::PathBuf;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct FileId(PathBuf);
impl Identifier for FileId {