From 2961aae6e345150fdc353f52b715af30f8ab12e6 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 16 Jul 2023 22:07:42 -0500 Subject: [PATCH] Port migration changes from 0.4.1 --- src/details.rs | 2 +- src/discover.rs | 2 +- src/lib.rs | 378 +------------------------------------ src/migrate_store.rs | 440 +++++++++++++++++++++++++++++++++++++++++++ src/repo.rs | 42 +++++ src/repo/sled.rs | 41 +++- 6 files changed, 529 insertions(+), 376 deletions(-) create mode 100644 src/migrate_store.rs diff --git a/src/details.rs b/src/details.rs index bd4e945..1a2b15b 100644 --- a/src/details.rs +++ b/src/details.rs @@ -42,7 +42,7 @@ impl Details { Ok(Details::from_parts(format, width, height, frames)) } - pub(crate) async fn from_store( + pub(crate) async fn from_store( store: &S, identifier: &S::Identifier, ) -> Result { diff --git a/src/discover.rs b/src/discover.rs index 5142eae..eb8c557 100644 --- a/src/discover.rs +++ b/src/discover.rs @@ -54,7 +54,7 @@ pub(crate) async fn discover_store_lite( identifier: &S::Identifier, ) -> Result where - S: Store + 'static, + S: Store, { if let Some(discovery) = ffmpeg::discover_stream_lite(store.to_stream(identifier, None, None).await?).await? diff --git a/src/lib.rs b/src/lib.rs index 7825067..86ef82e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ mod ingest; mod init_tracing; mod magick; mod middleware; +mod migrate_store; mod process; mod processor; mod queue; @@ -46,7 +47,7 @@ use std::{ path::Path, path::PathBuf, sync::atomic::{AtomicU64, Ordering}, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime}, }; use tokio::sync::Semaphore; use tracing_actix_web::TracingLogger; @@ -62,6 +63,7 @@ use self::{ ingest::Session, init_tracing::init_tracing, middleware::{Deadline, Internal}, + migrate_store::migrate_store, queue::queue_generate, repo::{ Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo, @@ -1297,7 +1299,7 @@ async fn launch_object_store< } async fn migrate_inner( - repo: &Repo, + repo: Repo, client: Client, from: S1, to: config::primitives::Store, @@ -1441,7 +1443,7 @@ pub async fn run() -> color_eyre::Result<()> { match from { config::primitives::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; - migrate_inner(&repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; } config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { endpoint, @@ -1475,7 +1477,7 @@ pub async fn run() -> color_eyre::Result<()> { .await? .build(client.clone()); - migrate_inner(&repo, client, from, to, skip_missing_files).await?; + migrate_inner(repo, client, from, to, skip_missing_files).await?; } } @@ -1545,371 +1547,3 @@ pub async fn run() -> color_eyre::Result<()> { Ok(()) } - -const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; -const STORE_MIGRATION_MOTION: &str = "store-migration-motion"; -const STORE_MIGRATION_VARIANT: &str = "store-migration-variant"; - -async fn migrate_store( - repo: &R, - from: S1, - to: S2, - skip_missing_files: bool, -) -> Result<(), Error> -where - S1: Store + Clone + 'static, - S2: Store + Clone, - R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, -{ - tracing::warn!("Running checks"); - - if let Err(e) = from.health_check().await { - tracing::warn!("Old store is not configured correctly"); - return Err(e.into()); - } - if let Err(e) = to.health_check().await { - tracing::warn!("New store is not configured correctly"); - return Err(e.into()); - } - - tracing::warn!("Checks complete, migrating store"); - - let mut failure_count = 0; - - while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await { - tracing::error!("Migration failed with {}", format!("{e:?}")); - - failure_count += 1; - - if failure_count >= 50 { - tracing::error!("Exceeded 50 errors"); - return Err(e); - } else { - tracing::warn!("Retrying migration +{failure_count}"); - } - - tokio::time::sleep(Duration::from_secs(3)).await; - } - - Ok(()) -} -async fn do_migrate_store( - repo: &R, - from: S1, - to: S2, - skip_missing_files: bool, -) -> Result<(), Error> -where - S1: Store + 'static, - S2: Store, - R: IdentifierRepo + HashRepo + SettingsRepo + QueueRepo, -{ - let mut repo_size = repo.size().await?; - - let mut progress_opt = repo.get(STORE_MIGRATION_PROGRESS).await?; - - if progress_opt.is_some() { - tracing::warn!("Continuing previous migration of {repo_size} total hashes"); - } else { - tracing::warn!("{repo_size} hashes will be migrated"); - } - - if repo_size == 0 { - return Ok(()); - } - - let mut pct = repo_size / 100; - - // Hashes are read in a consistent order - let stream = repo.hashes().await; - let mut stream = Box::pin(stream); - - let now = Instant::now(); - let mut index = 0; - while let Some(hash) = stream.next().await { - index += 1; - - let hash = hash?; - - if let Some(progress) = &progress_opt { - // we've reached the most recently migrated hash. - if progress.as_ref() == hash.as_ref() { - progress_opt.take(); - - // update repo size to remaining size - repo_size = repo_size.saturating_sub(index); - // update pct to be proportional to remainging size - pct = repo_size / 100; - - // reset index to 0 for proper percent scaling - index = 0; - - tracing::warn!( - "Caught up to previous migration's end. {repo_size} hashes will be migrated" - ); - } - continue; - } - - let original_identifier = match repo.identifier(hash.as_ref().to_vec().into()).await { - Ok(Some(identifier)) => identifier, - Ok(None) => { - tracing::warn!( - "Original File identifier for hash {} is missing, queue cleanup task", - hex::encode(&hash) - ); - crate::queue::cleanup_hash(repo, hash).await?; - continue; - } - Err(e) => return Err(e.into()), - }; - - if let Some(identifier) = repo - .motion_identifier(hash.as_ref().to_vec().into()) - .await? - { - if repo.get(STORE_MIGRATION_MOTION).await?.is_none() { - match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, identifier, &new_identifier).await?; - repo.relate_motion_identifier( - hash.as_ref().to_vec().into(), - &new_identifier, - ) - .await?; - repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into()) - .await?; - } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for motion file for hash {}", - hex::encode(&hash) - ); - return Err(e); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating motion file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating motion file to new store"); - return Err(e.into()); - } - } - } - } - - let mut variant_progress_opt = repo.get(STORE_MIGRATION_VARIANT).await?; - - for (variant, identifier) in repo.variants(hash.as_ref().to_vec().into()).await? { - if let Some(variant_progress) = &variant_progress_opt { - if variant.as_bytes() == variant_progress.as_ref() { - variant_progress_opt.take(); - } - continue; - } - - match migrate_file(repo, &from, &to, &identifier, skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, identifier, &new_identifier).await?; - repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone()) - .await?; - repo.relate_variant_identifier( - hash.as_ref().to_vec().into(), - variant, - &new_identifier, - ) - .await?; - - repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into()) - .await?; - } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - tracing::warn!( - "Skipping variant {} for hash {}", - variant, - hex::encode(&hash) - ); - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for variant file for hash {}", - hex::encode(&hash) - ); - return Err(e); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating variant file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating variant file to new store"); - return Err(e.into()); - } - } - } - - match migrate_file(repo, &from, &to, &original_identifier, skip_missing_files).await { - Ok(new_identifier) => { - migrate_details(repo, original_identifier, &new_identifier).await?; - repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier) - .await?; - } - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); - } - Err(MigrateError::Details(e)) => { - tracing::warn!( - "Error generating details for original file for hash {}", - hex::encode(&hash) - ); - return Err(e); - } - Err(MigrateError::From(e)) => { - tracing::warn!("Error migrating original file from old store"); - return Err(e.into()); - } - Err(MigrateError::To(e)) => { - tracing::warn!("Error migrating original file to new store"); - return Err(e.into()); - } - } - - repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into()) - .await?; - repo.remove(STORE_MIGRATION_VARIANT).await?; - repo.remove(STORE_MIGRATION_MOTION).await?; - - if pct > 0 && index % pct == 0 { - let percent = u32::try_from(index / pct).expect("values 0-100 are always in u32 range"); - if percent == 0 { - continue; - } - - let elapsed = now.elapsed(); - let estimated_duration_percent = elapsed / percent; - let estimated_duration_remaining = - (100u32.saturating_sub(percent)) * estimated_duration_percent; - - tracing::warn!("Migrated {percent}% of hashes ({index}/{repo_size} total hashes)"); - tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); - } - } - - // clean up the migration key to avoid interfering with future migrations - repo.remove(STORE_MIGRATION_PROGRESS).await?; - - tracing::warn!("Migration completed successfully"); - - Ok(()) -} - -async fn migrate_file( - repo: &R, - from: &S1, - to: &S2, - identifier: &S1::Identifier, - skip_missing_files: bool, -) -> Result -where - R: IdentifierRepo, - S1: Store + 'static, - S2: Store, -{ - let mut failure_count = 0; - - loop { - match do_migrate_file(repo, from, to, identifier).await { - Ok(identifier) => return Ok(identifier), - Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { - return Err(MigrateError::From(e)); - } - Err(migrate_error) => { - failure_count += 1; - - if failure_count > 10 { - tracing::error!("Error migrating file, not retrying"); - return Err(migrate_error); - } else { - tracing::warn!("Failed moving file. Retrying +{failure_count}"); - } - - tokio::time::sleep(Duration::from_secs(3)).await; - } - } - } -} - -#[derive(Debug)] -enum MigrateError { - From(crate::store::StoreError), - Details(crate::error::Error), - To(crate::store::StoreError), -} - -async fn do_migrate_file( - repo: &R, - from: &S1, - to: &S2, - identifier: &S1::Identifier, -) -> Result -where - R: IdentifierRepo, - S1: Store + 'static, - S2: Store, -{ - let stream = from - .to_stream(identifier, None, None) - .await - .map_err(MigrateError::From)?; - - let details_opt = repo - .details(identifier) - .await - .map_err(Error::from) - .map_err(MigrateError::Details)? - .and_then(|details| { - if details.internal_format().is_some() { - Some(details) - } else { - None - } - }); - - let details = if let Some(details) = details_opt { - details - } else { - let new_details = Details::from_store(from, identifier) - .await - .map_err(MigrateError::Details)?; - repo.relate_details(identifier, &new_details) - .await - .map_err(Error::from) - .map_err(MigrateError::Details)?; - new_details - }; - - let new_identifier = to - .save_stream(stream, details.media_type()) - .await - .map_err(MigrateError::To)?; - - Ok(new_identifier) -} - -async fn migrate_details(repo: &R, from: I1, to: &I2) -> Result<(), Error> -where - R: IdentifierRepo, - I1: Identifier, - I2: Identifier, -{ - if let Some(details) = repo.details(&from).await? { - repo.relate_details(to, &details).await?; - repo.cleanup(&from).await?; - } - - Ok(()) -} diff --git a/src/migrate_store.rs b/src/migrate_store.rs new file mode 100644 index 0000000..df9e8fe --- /dev/null +++ b/src/migrate_store.rs @@ -0,0 +1,440 @@ +use futures_util::StreamExt; +use std::{ + rc::Rc, + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, Instant}, +}; + +use crate::{ + details::Details, + error::{Error, UploadError}, + repo::{HashRepo, IdentifierRepo, MigrationRepo, QueueRepo}, + store::{Identifier, Store}, +}; + +pub(super) async fn migrate_store( + repo: R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> +where + S1: Store + Clone + 'static, + S2: Store + Clone + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, +{ + tracing::warn!("Running checks"); + + if let Err(e) = from.health_check().await { + tracing::warn!("Old store is not configured correctly"); + return Err(e.into()); + } + if let Err(e) = to.health_check().await { + tracing::warn!("New store is not configured correctly"); + return Err(e.into()); + } + + tracing::warn!("Checks complete, migrating store"); + + let mut failure_count = 0; + + while let Err(e) = + do_migrate_store(repo.clone(), from.clone(), to.clone(), skip_missing_files).await + { + tracing::error!("Migration failed with {}", format!("{e:?}")); + + failure_count += 1; + + if failure_count >= 50 { + tracing::error!("Exceeded 50 errors"); + return Err(e); + } else { + tracing::warn!("Retrying migration +{failure_count}"); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + } + + Ok(()) +} + +struct MigrateState { + repo: R, + from: S1, + to: S2, + continuing_migration: bool, + skip_missing_files: bool, + initial_repo_size: u64, + repo_size: AtomicU64, + pct: AtomicU64, + index: AtomicU64, + started_at: Instant, +} + +async fn do_migrate_store( + repo: R, + from: S1, + to: S2, + skip_missing_files: bool, +) -> Result<(), Error> +where + S1: Store + 'static, + S2: Store + 'static, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo + Clone + 'static, +{ + let continuing_migration = repo.is_continuing_migration().await?; + let initial_repo_size = repo.size().await?; + + if continuing_migration { + tracing::warn!("Continuing previous migration of {initial_repo_size} total hashes"); + } else { + tracing::warn!("{initial_repo_size} hashes will be migrated"); + } + + if initial_repo_size == 0 { + return Ok(()); + } + + // Hashes are read in a consistent order + let stream = repo.hashes().await; + let mut stream = Box::pin(stream); + + let state = Rc::new(MigrateState { + repo: repo.clone(), + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size: AtomicU64::new(initial_repo_size), + pct: AtomicU64::new(initial_repo_size / 100), + index: AtomicU64::new(0), + started_at: Instant::now(), + }); + + let mut joinset = tokio::task::JoinSet::new(); + + while let Some(hash) = stream.next().await { + let hash = hash?.as_ref().to_vec(); + + if joinset.len() >= 32 { + if let Some(res) = joinset.join_next().await { + res.map_err(|_| UploadError::Canceled)??; + } + } + + let state = Rc::clone(&state); + joinset.spawn_local(async move { migrate_hash(&state, hash).await }); + } + + while let Some(res) = joinset.join_next().await { + res.map_err(|_| UploadError::Canceled)??; + } + + // clean up the migration table to avoid interfering with future migrations + repo.clear().await?; + + tracing::warn!("Migration completed successfully"); + + Ok(()) +} + +#[tracing::instrument(skip(state, hash), fields(hash = %hex::encode(&hash)))] +async fn migrate_hash( + state: &MigrateState, + hash: Vec, +) -> Result<(), Error> +where + S1: Store, + S2: Store, + R: IdentifierRepo + HashRepo + QueueRepo + MigrationRepo, +{ + let MigrateState { + repo, + from, + to, + continuing_migration, + skip_missing_files, + initial_repo_size, + repo_size, + pct, + index, + started_at, + } = state; + + let current_index = index.fetch_add(1, Ordering::Relaxed); + + let original_identifier = match repo.identifier(hash.clone().into()).await { + Ok(Some(identifier)) => identifier, + Ok(None) => { + tracing::warn!( + "Original File identifier for hash {} is missing, queue cleanup task", + hex::encode(&hash) + ); + crate::queue::cleanup_hash(repo, hash.clone().into()).await?; + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + if repo.is_migrated(&original_identifier).await? { + // migrated original for hash - this means we can skip + return Ok(()); + } + + let current_repo_size = repo_size.load(Ordering::Acquire); + + if *continuing_migration && current_repo_size == *initial_repo_size { + // first time reaching unmigrated hash + + let new_repo_size = initial_repo_size.saturating_sub(current_index); + + if repo_size + .compare_exchange( + current_repo_size, + new_repo_size, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() + { + // we successfully updated the count, we're now in charge of setting up pct and + // index and printing migration message + + pct.store(new_repo_size / 100, Ordering::Release); + index.store(0, Ordering::Release); + + tracing::warn!( + "Caught up to previous migration's end. {new_repo_size} hashes will be migrated" + ); + } + } + + if let Some(identifier) = repo.motion_identifier(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &identifier, &new_identifier).await?; + repo.relate_motion_identifier(hash.clone().into(), &new_identifier) + .await?; + + repo.mark_migrated(&identifier, &new_identifier).await?; + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating motion file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating motion file to new store"); + return Err(e.into()); + } + } + } + } + + for (variant, identifier) in repo.variants(hash.clone().into()).await? { + if !repo.is_migrated(&identifier).await? { + match migrate_file(repo, from, to, &identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &identifier, &new_identifier).await?; + repo.remove_variant(hash.clone().into(), variant.clone()) + .await?; + repo.relate_variant_identifier(hash.clone().into(), variant, &new_identifier) + .await?; + + repo.mark_migrated(&identifier, &new_identifier).await?; + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!( + "Skipping variant {} for hash {}", + variant, + hex::encode(&hash) + ); + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating variant file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating variant file to new store"); + return Err(e.into()); + } + } + } + } + + match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await { + Ok(new_identifier) => { + migrate_details(repo, &original_identifier, &new_identifier).await?; + repo.relate_identifier(hash.clone().into(), &new_identifier) + .await?; + repo.mark_migrated(&original_identifier, &new_identifier) + .await?; + } + Err(MigrateError::From(e)) if e.is_not_found() && *skip_missing_files => { + tracing::warn!("Skipping original file for hash {}", hex::encode(&hash)); + } + Err(MigrateError::Details(e)) => { + tracing::warn!( + "Error generating details for motion file for hash {}", + hex::encode(&hash) + ); + return Err(e); + } + Err(MigrateError::From(e)) => { + tracing::warn!("Error migrating original file from old store"); + return Err(e.into()); + } + Err(MigrateError::To(e)) => { + tracing::warn!("Error migrating original file to new store"); + return Err(e.into()); + } + } + + let current_pct = pct.load(Ordering::Relaxed); + if current_pct > 0 && current_index % current_pct == 0 { + let percent = u32::try_from(current_index / current_pct) + .expect("values 0-100 are always in u32 range"); + if percent == 0 { + return Ok(()); + } + + let elapsed = started_at.elapsed(); + let estimated_duration_percent = elapsed / percent; + let estimated_duration_remaining = + (100u32.saturating_sub(percent)) * estimated_duration_percent; + + let current_repo_size = repo_size.load(Ordering::Relaxed); + + tracing::warn!( + "Migrated {percent}% of hashes ({current_index}/{current_repo_size} total hashes)" + ); + tracing::warn!("ETA: {estimated_duration_remaining:?} from now"); + } + + Ok(()) +} + +async fn migrate_file( + repo: &R, + from: &S1, + to: &S2, + identifier: &S1::Identifier, + skip_missing_files: bool, +) -> Result +where + R: IdentifierRepo, + S1: Store, + S2: Store, +{ + let mut failure_count = 0; + + loop { + match do_migrate_file(repo, from, to, identifier).await { + Ok(identifier) => return Ok(identifier), + Err(MigrateError::From(e)) if e.is_not_found() && skip_missing_files => { + return Err(MigrateError::From(e)); + } + Err(migrate_error) => { + failure_count += 1; + + if failure_count > 10 { + tracing::error!("Error migrating file, not retrying"); + return Err(migrate_error); + } else { + tracing::warn!("Failed moving file. Retrying +{failure_count}"); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + } +} + +#[derive(Debug)] +enum MigrateError { + From(crate::store::StoreError), + Details(crate::error::Error), + To(crate::store::StoreError), +} + +async fn do_migrate_file( + repo: &R, + from: &S1, + to: &S2, + identifier: &S1::Identifier, +) -> Result +where + R: IdentifierRepo, + S1: Store, + S2: Store, +{ + let stream = from + .to_stream(identifier, None, None) + .await + .map_err(MigrateError::From)?; + + let details_opt = repo + .details(identifier) + .await + .map_err(Error::from) + .map_err(MigrateError::Details)? + .and_then(|details| { + if details.internal_format().is_some() { + Some(details) + } else { + None + } + }); + + let details = if let Some(details) = details_opt { + details + } else { + let new_details = Details::from_store(from, identifier) + .await + .map_err(MigrateError::Details)?; + repo.relate_details(identifier, &new_details) + .await + .map_err(Error::from) + .map_err(MigrateError::Details)?; + new_details + }; + + let new_identifier = to + .save_stream(stream, details.media_type()) + .await + .map_err(MigrateError::To)?; + + Ok(new_identifier) +} + +async fn migrate_details(repo: &R, from: &I1, to: &I2) -> Result<(), Error> +where + R: IdentifierRepo, + I1: Identifier, + I2: Identifier, +{ + if let Some(details) = repo.details(from).await? { + repo.relate_details(to, &details).await?; + repo.cleanup(from).await?; + } + + Ok(()) +} diff --git a/src/repo.rs b/src/repo.rs index 3af7521..429f925 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -72,6 +72,7 @@ pub(crate) trait FullRepo: + AliasRepo + QueueRepo + HashRepo + + MigrationRepo + Send + Sync + Clone @@ -261,6 +262,47 @@ where } } +#[async_trait::async_trait(?Send)] +pub(crate) trait MigrationRepo: BaseRepo { + async fn is_continuing_migration(&self) -> Result; + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError>; + + async fn is_migrated(&self, identifier: &I) -> Result; + + async fn clear(&self) -> Result<(), RepoError>; +} + +#[async_trait::async_trait(?Send)] +impl MigrationRepo for actix_web::web::Data +where + T: MigrationRepo, +{ + async fn is_continuing_migration(&self) -> Result { + T::is_continuing_migration(self).await + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + T::mark_migrated(self, old_identifier, new_identifier).await + } + + async fn is_migrated(&self, identifier: &I) -> Result { + T::is_migrated(self, identifier).await + } + + async fn clear(&self) -> Result<(), RepoError> { + T::clear(self).await + } +} + #[async_trait::async_trait(?Send)] pub(crate) trait HashRepo: BaseRepo { type Stream: Stream>; diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 4e9f7a3..e752582 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -2,8 +2,8 @@ use crate::{ details::MaybeHumanDate, repo::{ Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, - FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, - UploadId, UploadRepo, UploadResult, + FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, + QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult, }, serde_str::Serde, store::StoreError, @@ -69,6 +69,7 @@ pub(crate) struct SledRepo { in_progress_queue: Tree, queue_notifier: Arc>>>, uploads: Tree, + migration_identifiers: Tree, cache_capacity: u64, export_path: PathBuf, db: Db, @@ -100,6 +101,7 @@ impl SledRepo { in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, queue_notifier: Arc::new(RwLock::new(HashMap::new())), uploads: db.open_tree("pict-rs-uploads-tree")?, + migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?, cache_capacity, export_path, db, @@ -461,6 +463,41 @@ impl IdentifierRepo for SledRepo { } } +#[async_trait::async_trait(?Send)] +impl MigrationRepo for SledRepo { + async fn is_continuing_migration(&self) -> Result { + Ok(!self.migration_identifiers.is_empty()) + } + + async fn mark_migrated( + &self, + old_identifier: &I1, + new_identifier: &I2, + ) -> Result<(), StoreError> { + let key = new_identifier.to_bytes()?; + let value = old_identifier.to_bytes()?; + + b!( + self.migration_identifiers, + migration_identifiers.insert(key, value) + ); + + Ok(()) + } + + async fn is_migrated(&self, identifier: &I) -> Result { + let key = identifier.to_bytes()?; + + Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some()) + } + + async fn clear(&self) -> Result<(), RepoError> { + b!(self.migration_identifiers, migration_identifiers.clear()); + + Ok(()) + } +} + type StreamItem = Result; type LocalBoxStream<'a, T> = Pin + 'a>>;