Simplify some repo functions, remove 0.3 migration

This commit is contained in:
asonix 2023-07-25 20:08:18 -05:00
parent ccff0506e5
commit bd3975f455
11 changed files with 220 additions and 843 deletions

View file

@ -3,7 +3,7 @@ use crate::{
either::Either,
error::{Error, UploadError},
formats::{InternalFormat, Validations},
repo::{Alias, AliasRepo, AlreadyExists, DeleteToken, FullRepo, HashRepo},
repo::{Alias, AliasRepo, DeleteToken, FullRepo, HashRepo},
store::Store,
};
use actix_web::web::Bytes;
@ -21,6 +21,7 @@ where
S: Store,
{
repo: R,
delete_token: DeleteToken,
hash: Option<Vec<u8>>,
alias: Option<Alias>,
identifier: Option<S::Identifier>,
@ -105,6 +106,7 @@ where
let mut session = Session {
repo: repo.clone(),
delete_token: DeleteToken::generate(),
hash: None,
alias: None,
identifier: Some(identifier.clone()),
@ -117,8 +119,8 @@ where
if let Some(alias) = declared_alias {
session.add_existing_alias(&hash, alias).await?
} else {
session.create_alias(&hash, input_type).await?;
}
session.create_alias(&hash, input_type).await?
};
Ok(session)
}
@ -135,7 +137,10 @@ where
S: Store,
R: FullRepo,
{
if HashRepo::create(repo, hash.to_vec().into()).await?.is_err() {
if HashRepo::create(repo, hash.to_vec().into(), identifier)
.await?
.is_err()
{
// duplicate upload
store.remove(identifier).await?;
session.identifier.take();
@ -145,9 +150,6 @@ where
// Set hash after upload uniquness check so we don't clean existing files on failure
session.hash = Some(Vec::from(hash));
repo.relate_identifier(hash.to_vec().into(), identifier)
.await?;
Ok(())
}
@ -156,60 +158,48 @@ where
R: FullRepo + 'static,
S: Store,
{
pub(crate) fn disarm(&mut self) {
pub(crate) fn disarm(mut self) -> DeleteToken {
let _ = self.hash.take();
let _ = self.alias.take();
let _ = self.identifier.take();
self.delete_token.clone()
}
pub(crate) fn alias(&self) -> Option<&Alias> {
self.alias.as_ref()
}
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) async fn delete_token(&self) -> Result<DeleteToken, Error> {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
tracing::trace!("Generating delete token");
let delete_token = DeleteToken::generate();
tracing::trace!("Saving delete token");
let res = self.repo.relate_delete_token(&alias, &delete_token).await?;
if let Err(AlreadyExists(delete_token)) = res {
tracing::trace!("Returning existing delete token, {:?}", delete_token);
return Ok(delete_token);
}
tracing::trace!("Returning new delete token, {:?}", delete_token);
Ok(delete_token)
pub(crate) fn delete_token(&self) -> &DeleteToken {
&self.delete_token
}
#[tracing::instrument(skip(self, hash))]
async fn add_existing_alias(&mut self, hash: &[u8], alias: Alias) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias)
let hash: R::Bytes = hash.to_vec().into();
AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone())
.await?
.map_err(|_| UploadError::DuplicateAlias)?;
self.alias = Some(alias.clone());
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, hash))]
async fn create_alias(&mut self, hash: &[u8], input_type: InternalFormat) -> Result<(), Error> {
let hash: R::Bytes = hash.to_vec().into();
loop {
let alias = Alias::generate(input_type.file_extension().to_string());
if AliasRepo::create(&self.repo, &alias).await?.is_ok() {
if AliasRepo::create(&self.repo, &alias, &self.delete_token, hash.clone())
.await?
.is_ok()
{
self.alias = Some(alias.clone());
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
return Ok(());
}
@ -249,20 +239,14 @@ where
if let Some(alias) = self.alias.take() {
let repo = self.repo.clone();
let token = self.delete_token.clone();
let cleanup_span = tracing::info_span!(parent: &cleanup_parent_span, "Session cleanup alias", alias = ?alias);
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(
async move {
if let Ok(Some(token)) = repo.delete_token(&alias).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
} else {
let token = DeleteToken::generate();
if let Ok(Ok(())) = repo.relate_delete_token(&alias, &token).await {
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
}
let _ = crate::queue::cleanup_alias(&repo, alias, token).await;
}
.instrument(cleanup_span),
)

View file

@ -303,7 +303,7 @@ async fn handle_upload<R: FullRepo, S: Store + 'static>(
for image in &images {
if let Some(alias) = image.result.alias() {
tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = image.result.delete_token().await?;
let delete_token = image.result.delete_token();
let details = ensure_details(&repo, &store, &config, alias).await?;
@ -315,7 +315,7 @@ async fn handle_upload<R: FullRepo, S: Store + 'static>(
}
}
for mut image in images {
for image in images {
image.result.disarm();
}
@ -489,14 +489,13 @@ async fn ingest_inline<R: FullRepo, S: Store + 'static>(
store: &S,
config: &Configuration,
) -> Result<(Alias, DeleteToken, Details), Error> {
let mut session = ingest::ingest(repo, store, stream, None, &config.media).await?;
let session = ingest::ingest(repo, store, stream, None, &config.media).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
let details = ensure_details(repo, store, config, &alias).await?;
session.disarm();
let delete_token = session.disarm();
Ok((alias, delete_token, details))
}
@ -1886,7 +1885,6 @@ impl PictRsConfiguration {
let PictRsConfiguration { config, operation } = self;
let repo = Repo::open(config.repo.clone())?;
repo.migrate_from_db(config.old_db.path.clone()).await?;
let client = build_client(&config)?;
match operation {
@ -1949,8 +1947,6 @@ impl PictRsConfiguration {
match config.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => {
repo.migrate_identifiers().await?;
let store = FileStore::build(path, repo.clone()).await?;
match repo {
Repo::Sled(sled_repo) => {

View file

@ -283,7 +283,7 @@ where
match migrate_file(repo, from, to, &original_identifier, *skip_missing_files).await {
Ok(new_identifier) => {
migrate_details(repo, &original_identifier, &new_identifier).await?;
repo.relate_identifier(hash.clone().into(), &new_identifier)
repo.update_identifier(hash.clone().into(), &new_identifier)
.await?;
repo.mark_migrated(&original_identifier, &new_identifier)
.await?;

View file

@ -96,7 +96,7 @@ where
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
let aliases = repo.for_hash(hash.clone()).await?;
if !aliases.is_empty() {
for alias in aliases {
@ -151,9 +151,7 @@ where
return Ok(());
};
repo.remove_alias(hash.clone(), &alias).await?;
if repo.aliases(hash.clone()).await?.is_empty() {
if repo.for_hash(hash.clone()).await?.is_empty() {
super::cleanup_hash(repo, hash).await?;
}

View file

@ -5,7 +5,7 @@ use crate::{
formats::InputProcessableFormat,
ingest::Session,
queue::{Base64Bytes, LocalBoxFuture, Process},
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
repo::{Alias, FullRepo, UploadId, UploadResult},
serde_str::Serde,
store::{Identifier, Store},
};
@ -99,9 +99,7 @@ where
let session =
crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?;
let token = session.delete_token().await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
Ok(session) as Result<Session<R, S>, Error>
})
.await;
@ -111,10 +109,10 @@ where
};
let result = match fut.await {
Ok((mut session, token)) => {
Ok(session) => {
let alias = session.alias().take().expect("Alias should exist").clone();
let token = session.disarm();
let result = UploadResult::Success { alias, token };
session.disarm();
result
}
Err(e) => {

View file

@ -1,16 +1,14 @@
use crate::{
config,
details::Details,
store::{file_store::FileId, Identifier, StoreError},
store::{Identifier, StoreError},
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::Stream;
use std::{fmt::Debug, path::PathBuf};
use tracing::Instrument;
use std::fmt::Debug;
use url::Url;
use uuid::Uuid;
mod old;
pub(crate) mod sled;
#[derive(Clone, Debug)]
@ -30,7 +28,7 @@ pub(crate) struct Alias {
extension: Option<String>,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct DeleteToken {
id: MaybeUuid,
}
@ -38,8 +36,6 @@ pub(crate) struct DeleteToken {
pub(crate) struct HashAlreadyExists;
pub(crate) struct AliasAlreadyExists;
pub(crate) struct AlreadyExists(pub(super) DeleteToken);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
id: Uuid,
@ -60,9 +56,6 @@ pub(crate) enum RepoError {
#[error("Panic in blocking operation")]
Canceled,
#[error("Required field is missing {0}")]
Missing(&'static str),
}
#[async_trait::async_trait(?Send)]
@ -102,7 +95,7 @@ pub(crate) trait FullRepo:
return Ok(vec![]);
};
self.aliases(hash).await
self.for_hash(hash).await
}
#[tracing::instrument(skip(self))]
@ -427,17 +420,18 @@ pub(crate) trait HashRepo: BaseRepo {
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError>;
async fn create<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<Result<(), HashAlreadyExists>, StoreError>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError>;
async fn relate_identifier<I: Identifier>(
async fn update_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), StoreError>;
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
@ -488,28 +482,20 @@ where
T::hashes(self).await
}
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError> {
T::create(self, hash).await
async fn create<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<Result<(), HashAlreadyExists>, StoreError> {
T::create(self, hash, identifier).await
}
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
T::relate_alias(self, hash, alias).await
}
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
T::remove_alias(self, hash, alias).await
}
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
T::aliases(self, hash).await
}
async fn relate_identifier<I: Identifier>(
async fn update_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), StoreError> {
T::relate_identifier(self, hash, identifier).await
T::update_identifier(self, hash, identifier).await
}
async fn identifier<I: Identifier + 'static>(
@ -569,18 +555,19 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo: BaseRepo {
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError>;
async fn relate_delete_token(
async fn create(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, RepoError>;
hash: Self::Bytes,
) -> Result<Result<(), AliasAlreadyExists>, RepoError>;
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>;
async fn hash(&self, alias: &Alias) -> Result<Option<Self::Bytes>, RepoError>;
async fn for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError>;
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>;
}
@ -589,30 +576,27 @@ impl<T> AliasRepo for actix_web::web::Data<T>
where
T: AliasRepo,
{
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
T::create(self, alias).await
}
async fn relate_delete_token(
async fn create(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, RepoError> {
T::relate_delete_token(self, alias, delete_token).await
hash: Self::Bytes,
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
T::create(self, alias, delete_token, hash).await
}
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
T::delete_token(self, alias).await
}
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> {
T::relate_hash(self, alias, hash).await
}
async fn hash(&self, alias: &Alias) -> Result<Option<Self::Bytes>, RepoError> {
T::hash(self, alias).await
}
async fn for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
T::for_hash(self, hash).await
}
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> {
T::cleanup(self, alias).await
}
@ -633,223 +617,6 @@ impl Repo {
}
}
}
pub(crate) async fn migrate_from_db(&self, path: PathBuf) -> color_eyre::Result<()> {
if self.has_migrated().await? {
return Ok(());
}
if let Some(old) = self::old::Old::open(path)? {
let span = tracing::warn_span!("Migrating Database from 0.3 layout to 0.4 layout");
match self {
Self::Sled(repo) => {
async {
for hash in old.hashes() {
if let Err(e) = migrate_hash(repo, &old, hash).await {
tracing::error!("Failed to migrate hash: {}", format!("{e:?}"));
}
}
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) {
tracing::warn!("Setting STORE_MIGRATION_PROGRESS");
let _ = repo.set(STORE_MIGRATION_PROGRESS, value).await;
}
if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) {
tracing::warn!("Setting GENERATOR_KEY");
let _ = repo.set(GENERATOR_KEY, value).await;
}
}
.instrument(span)
.await;
}
}
}
self.mark_migrated().await?;
Ok(())
}
pub(crate) async fn migrate_identifiers(&self) -> color_eyre::Result<()> {
if self.has_migrated_identifiers().await? {
return Ok(());
}
let span = tracing::warn_span!("Migrating File Identifiers from 0.3 format to 0.4 format");
match self {
Self::Sled(repo) => {
async {
use futures_util::StreamExt;
let mut hashes = repo.hashes().await;
while let Some(res) = hashes.next().await {
let hash = res?;
if let Err(e) = migrate_identifiers_for_hash(repo, hash).await {
tracing::error!(
"Failed to migrate identifiers for hash: {}",
format!("{e:?}")
);
}
}
Ok(()) as color_eyre::Result<()>
}
.instrument(span)
.await?;
}
}
self.mark_migrated_identifiers().await?;
Ok(())
}
async fn has_migrated(&self) -> color_eyre::Result<bool> {
match self {
Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_O1).await?.is_some()),
}
}
async fn has_migrated_identifiers(&self) -> color_eyre::Result<bool> {
match self {
Self::Sled(repo) => Ok(repo.get(REPO_MIGRATION_02).await?.is_some()),
}
}
async fn mark_migrated(&self) -> color_eyre::Result<()> {
match self {
Self::Sled(repo) => {
repo.set(REPO_MIGRATION_O1, b"1".to_vec().into()).await?;
}
}
Ok(())
}
async fn mark_migrated_identifiers(&self) -> color_eyre::Result<()> {
match self {
Self::Sled(repo) => {
repo.set(REPO_MIGRATION_02, b"1".to_vec().into()).await?;
}
}
Ok(())
}
}
const REPO_MIGRATION_O1: &str = "repo-migration-01";
const REPO_MIGRATION_02: &str = "repo-migration-02";
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const GENERATOR_KEY: &str = "last-path";
#[tracing::instrument(skip(repo, hash), fields(hash = hex::encode(&hash)))]
async fn migrate_identifiers_for_hash<T>(repo: &T, hash: ::sled::IVec) -> color_eyre::Result<()>
where
T: FullRepo,
{
let hash: T::Bytes = hash.to_vec().into();
if let Some(motion_identifier) = repo.motion_identifier::<FileId>(hash.clone()).await? {
if let Some(new_motion_identifier) = motion_identifier.normalize_for_migration() {
migrate_identifier_details(repo, &motion_identifier, &new_motion_identifier).await?;
repo.relate_motion_identifier(hash.clone(), &new_motion_identifier)
.await?;
}
}
for (variant_path, variant_identifier) in repo.variants::<FileId>(hash.clone()).await? {
if let Some(new_variant_identifier) = variant_identifier.normalize_for_migration() {
migrate_identifier_details(repo, &variant_identifier, &new_variant_identifier).await?;
repo.relate_variant_identifier(hash.clone(), variant_path, &new_variant_identifier)
.await?;
}
}
let Some(main_identifier) = repo.identifier::<FileId>(hash.clone()).await? else {
tracing::warn!("Missing identifier for hash {}, queueing cleanup", hex::encode(&hash));
crate::queue::cleanup_hash(repo, hash.clone()).await?;
return Err(RepoError::Missing("hash -> identifier").into());
};
if let Some(new_main_identifier) = main_identifier.normalize_for_migration() {
migrate_identifier_details(repo, &main_identifier, &new_main_identifier).await?;
repo.relate_identifier(hash, &new_main_identifier).await?;
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(repo))]
async fn migrate_identifier_details<T>(
repo: &T,
old: &FileId,
new: &FileId,
) -> color_eyre::Result<()>
where
T: FullRepo,
{
if old == new {
tracing::warn!("Old FileId and new FileId are identical");
return Ok(());
}
if let Some(details) = repo.details(old).await? {
repo.relate_details(new, &details).await?;
IdentifierRepo::cleanup(repo, old).await?;
}
Ok(())
}
#[tracing::instrument(skip(repo, old, hash), fields(hash = hex::encode(&hash)))]
async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()>
where
T: IdentifierRepo + HashRepo + AliasRepo + SettingsRepo + Debug,
{
let new_hash: T::Bytes = hash.to_vec().into();
let main_ident = old.main_identifier(&hash)?.to_vec();
if HashRepo::create(repo, new_hash.clone()).await?.is_err() {
tracing::warn!("Duplicate hash detected");
return Ok(());
}
repo.relate_identifier(new_hash.clone(), &main_ident)
.await?;
for alias in old.aliases(&hash) {
if let Ok(Ok(())) = AliasRepo::create(repo, &alias).await {
let _ = repo.relate_alias(new_hash.clone(), &alias).await;
let _ = repo.relate_hash(&alias, new_hash.clone()).await;
if let Ok(Some(delete_token)) = old.delete_token(&alias) {
let _ = repo.relate_delete_token(&alias, &delete_token).await;
}
}
}
if let Ok(Some(identifier)) = old.motion_identifier(&hash) {
let _ = repo
.relate_motion_identifier(new_hash.clone(), &identifier.to_vec())
.await;
}
for (variant_path, identifier) in old.variants(&hash)? {
let variant = variant_path.to_string_lossy().to_string();
let _ = repo
.relate_variant_identifier(new_hash.clone(), variant, &identifier.to_vec())
.await;
}
for (identifier, details) in old.details(&hash)? {
let _ = repo.relate_details(&identifier.to_vec(), &details).await;
}
Ok(())
}
impl MaybeUuid {

View file

@ -1,189 +0,0 @@
// TREE STRUCTURE
// - Alias Tree
// - alias -> hash
// - alias / id -> u64(id)
// - alias / delete -> delete token
// - Main Tree
// - hash -> filename
// - hash 0 u64(id) -> alias
// - Filename Tree
// - filename -> hash
// - Details Tree
// - filename / S::Identifier -> details
// - Identifier Tree
// - filename -> S::Identifier
// - filename / variant path -> S::Identifier
// - filename / motion -> S::Identifier
// - Settings Tree
// - store-migration-progress -> Path Tree Key
use super::{Alias, DeleteToken, Details};
use std::path::PathBuf;
mod migrate;
#[derive(Debug)]
struct OldDbError(&'static str);
impl std::fmt::Display for OldDbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for OldDbError {}
pub(super) struct Old {
alias_tree: ::sled::Tree,
filename_tree: ::sled::Tree,
main_tree: ::sled::Tree,
details_tree: ::sled::Tree,
settings_tree: ::sled::Tree,
identifier_tree: ::sled::Tree,
_db: ::sled::Db,
}
impl Old {
#[tracing::instrument]
pub(super) fn open(path: PathBuf) -> color_eyre::Result<Option<Self>> {
if let Some(db) = migrate::LatestDb::exists(path).migrate()? {
Ok(Some(Self {
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
main_tree: db.open_tree("main")?,
details_tree: db.open_tree("details")?,
settings_tree: db.open_tree("settings")?,
identifier_tree: db.open_tree("path")?,
_db: db,
}))
} else {
Ok(None)
}
}
pub(super) fn setting(&self, key: &[u8]) -> color_eyre::Result<Option<sled::IVec>> {
Ok(self.settings_tree.get(key)?)
}
pub(super) fn hashes(&self) -> impl std::iter::Iterator<Item = sled::IVec> {
let length = self.filename_tree.len();
tracing::info!("FILENAME_TREE_LEN: {}", length);
self.filename_tree
.iter()
.values()
.filter_map(|res| res.ok())
}
pub(super) fn details(
&self,
hash: &sled::IVec,
) -> color_eyre::Result<Vec<(sled::IVec, Details)>> {
let filename = self
.main_tree
.get(hash)?
.ok_or(OldDbError("Missing filename"))?;
let filename = String::from_utf8_lossy(&filename);
Ok(self
.identifier_tree
.scan_prefix(filename.as_bytes())
.values()
.filter_map(Result::ok)
.filter_map(|identifier| {
let mut key = filename.as_bytes().to_vec();
key.push(b'/');
key.extend_from_slice(&identifier);
let details = self.details_tree.get(key).ok()??;
let details = serde_json::from_slice(&details).ok()?;
Some((identifier, details))
})
.collect())
}
pub(super) fn main_identifier(&self, hash: &sled::IVec) -> color_eyre::Result<sled::IVec> {
let filename = self
.main_tree
.get(hash)?
.ok_or(OldDbError("Missing filename"))?;
Ok(self
.identifier_tree
.get(filename)?
.ok_or(OldDbError("Missing identifier"))?)
}
pub(super) fn variants(
&self,
hash: &sled::IVec,
) -> color_eyre::Result<Vec<(PathBuf, sled::IVec)>> {
let filename = self
.main_tree
.get(hash)?
.ok_or(OldDbError("Missing filename"))?;
let filename_string = String::from_utf8_lossy(&filename);
let variant_prefix = format!("{filename_string}/");
Ok(self
.identifier_tree
.scan_prefix(&variant_prefix)
.filter_map(|res| res.ok())
.filter_map(|(key, value)| {
let variant_path_bytes = &key[variant_prefix.as_bytes().len()..];
if variant_path_bytes == b"motion" {
return None;
}
let path = String::from_utf8(variant_path_bytes.to_vec()).ok()?;
let mut path = PathBuf::from(path);
let extension = path.extension()?.to_str()?.to_string();
path.pop();
path.push(extension);
Some((path, value))
})
.collect())
}
pub(super) fn motion_identifier(
&self,
hash: &sled::IVec,
) -> color_eyre::Result<Option<sled::IVec>> {
let filename = self
.main_tree
.get(hash)?
.ok_or(OldDbError("Missing filename"))?;
let filename_string = String::from_utf8_lossy(&filename);
let motion_key = format!("{filename_string}/motion");
Ok(self.filename_tree.get(motion_key)?)
}
pub(super) fn aliases(&self, hash: &sled::IVec) -> Vec<Alias> {
let mut key = hash.to_vec();
key.push(0);
self.main_tree
.scan_prefix(key)
.values()
.filter_map(|res| res.ok())
.filter_map(|alias| Alias::from_slice(&alias))
.collect()
}
pub(super) fn delete_token(&self, alias: &Alias) -> color_eyre::Result<Option<DeleteToken>> {
let key = format!("{alias}/delete");
if let Some(ivec) = self.alias_tree.get(key)? {
return Ok(DeleteToken::from_slice(&ivec));
}
Ok(None)
}
}

View file

@ -1,98 +0,0 @@
use crate::Error;
use std::path::PathBuf;
mod s034;
type SledIter = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>>>;
trait SledDb {
type SledTree: SledTree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, Error>;
fn self_tree(&self) -> &Self::SledTree;
}
impl<T> SledDb for &T
where
T: SledDb,
{
type SledTree = T::SledTree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, Error> {
(*self).open_tree(name)
}
fn self_tree(&self) -> &Self::SledTree {
(*self).self_tree()
}
}
trait SledTree {
fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>;
fn insert<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>;
fn iter(&self) -> SledIter;
fn range<K, R>(&self, range: R) -> SledIter
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>;
fn flush(&self) -> Result<(), Error>;
}
pub(crate) struct LatestDb {
root_dir: PathBuf,
version: DbVersion,
}
impl LatestDb {
pub(crate) fn exists(root_dir: PathBuf) -> Self {
let version = DbVersion::exists(root_dir.clone());
LatestDb { root_dir, version }
}
pub(crate) fn migrate(self) -> Result<Option<sled::Db>, Error> {
let LatestDb { root_dir, version } = self;
loop {
let root_dir2 = root_dir.clone();
let res = std::panic::catch_unwind(move || version.migrate(root_dir2));
if let Ok(res) = res {
return res;
}
}
}
}
#[derive(Clone, Copy)]
enum DbVersion {
Sled034,
Fresh,
}
impl DbVersion {
fn exists(root: PathBuf) -> Self {
if s034::exists(root) {
return DbVersion::Sled034;
}
DbVersion::Fresh
}
fn migrate(self, root: PathBuf) -> Result<Option<sled::Db>, Error> {
match self {
DbVersion::Sled034 => Some(s034::open(root)).transpose(),
DbVersion::Fresh => Ok(None),
}
}
}

View file

@ -1,78 +0,0 @@
use crate::{
error::Error,
repo::old::migrate::{SledDb, SledIter, SledTree},
};
use sled as sled034;
use std::path::PathBuf;
const SLED_034: &str = "db-0.34";
pub(crate) fn exists(mut base: PathBuf) -> bool {
base.push("sled");
base.push(SLED_034);
std::fs::metadata(base).is_ok()
}
pub(crate) fn open(mut base: PathBuf) -> Result<sled034::Db, Error> {
base.push("sled");
base.push(SLED_034);
let db = sled034::Config::default()
.cache_capacity(1024 * 1024 * 64)
.path(base)
.open()?;
Ok(db)
}
impl SledDb for sled034::Db {
type SledTree = sled034::Tree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, Error> {
Ok(sled034::Db::open_tree(self, name)?)
}
fn self_tree(&self) -> &Self::SledTree {
self
}
}
impl SledTree for sled034::Tree {
fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>, Error>
where
K: AsRef<[u8]>,
{
Ok(sled034::Tree::get(self, key)?.map(|v| Vec::from(v.as_ref())))
}
fn insert<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
Ok(sled034::Tree::insert(self, key, value.as_ref().to_vec()).map(|_| ())?)
}
fn iter(&self) -> SledIter {
Box::new(sled034::Tree::iter(self).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(Error::from)
}))
}
fn range<K, R>(&self, range: R) -> SledIter
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>,
{
Box::new(sled034::Tree::range(self, range).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(Error::from)
}))
}
fn flush(&self) -> Result<(), Error> {
sled034::Tree::flush(self).map(|_| ()).map_err(Error::from)
}
}

View file

@ -1,16 +1,16 @@
use crate::{
details::MaybeHumanDate,
repo::{
Alias, AliasAlreadyExists, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details,
FullRepo, HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo,
QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details, FullRepo,
HashAlreadyExists, HashRepo, Identifier, IdentifierRepo, MigrationRepo, QueueRepo,
SettingsRepo, UploadId, UploadRepo, UploadResult,
},
serde_str::Serde,
store::StoreError,
stream::from_iterator,
};
use futures_util::{Future, Stream};
use sled::{CompareAndSwapError, Db, IVec, Tree};
use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree};
use std::{
collections::HashMap,
path::PathBuf,
@ -936,12 +936,6 @@ impl MigrationRepo for SledRepo {
type StreamItem = Result<IVec, RepoError>;
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
let mut v = hash.to_vec();
v.append(&mut alias.to_bytes());
v
}
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
type Stream = LocalBoxStream<'static, StreamItem>;
@ -965,57 +959,50 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), HashAlreadyExists>, RepoError> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
hashes.compare_and_swap(hash, None as Option<Self::Bytes>, Some(hash2))
});
async fn create<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<Result<(), HashAlreadyExists>, StoreError> {
let identifier: sled::IVec = identifier.to_bytes()?.into();
Ok(res.map_err(|_| HashAlreadyExists))
let hashes = self.hashes.clone();
let hash_identifiers = self.hash_identifiers.clone();
let res = actix_web::web::block(move || {
(&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| {
if hashes.get(&hash)?.is_some() {
return Ok(Err(HashAlreadyExists));
}
hashes.insert(&hash, &hash)?;
hash_identifiers.insert(&hash, &identifier)?;
Ok(Ok(()))
})
})
.await
.map_err(|_| RepoError::Canceled)?;
match res {
Ok(res) => Ok(res),
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
Err(StoreError::from(RepoError::from(SledError::from(e))))
}
}
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
b!(self.hash_aliases, hash_aliases.insert(key, value));
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
let key = hash_alias_key(&hash, alias);
b!(self.hash_aliases, hash_aliases.remove(key));
Ok(())
}
#[tracing::instrument(skip_all)]
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases
.scan_prefix(hash)
.values()
.filter_map(Result::ok)
.filter_map(|ivec| Alias::from_slice(&ivec))
.collect::<Vec<_>>()) as Result<_, sled::Error>
});
Ok(v)
}
#[tracing::instrument(level = "trace", skip(self, hash, identifier), fields(hash = hex::encode(&hash), identifier = identifier.string_repr()))]
async fn relate_identifier<I: Identifier>(
async fn update_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), StoreError> {
let bytes = identifier.to_bytes()?;
let identifier = identifier.to_bytes()?;
b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes));
b!(
self.hash_identifiers,
hash_identifiers.insert(hash, identifier)
);
Ok(())
}
@ -1141,102 +1128,102 @@ impl HashRepo for SledRepo {
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> {
let hash2 = hash.clone();
b!(self.hashes, hashes.remove(hash2));
let hashes = self.hashes.clone();
let hash_identifiers = self.hash_identifiers.clone();
let hash_motion_identifiers = self.hash_motion_identifiers.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
let hash2 = hash.clone();
b!(self.hash_identifiers, hash_identifiers.remove(hash2));
let hash2 = hash.clone();
b!(
self.hash_motion_identifiers,
hash_motion_identifiers.remove(hash2)
);
let aliases = self.aliases(hash.clone()).await?;
let hash2 = hash.clone();
b!(self.hash_aliases, {
for alias in aliases {
let key = hash_alias_key(&hash2, &alias);
let _ = hash_aliases.remove(key);
}
Ok(()) as Result<(), SledError>
});
let variant_keys = b!(self.hash_variant_identifiers, {
let v = hash_variant_identifiers
.scan_prefix(hash)
.scan_prefix(hash2)
.keys()
.filter_map(Result::ok)
.collect::<Vec<_>>();
Ok(v) as Result<Vec<_>, SledError>
});
b!(self.hash_variant_identifiers, {
for key in variant_keys {
let _ = hash_variant_identifiers.remove(key);
}
Ok(()) as Result<(), SledError>
});
Ok(())
let res = actix_web::web::block(move || {
(
&hashes,
&hash_identifiers,
&hash_motion_identifiers,
&hash_variant_identifiers,
)
.transaction(
|(
hashes,
hash_identifiers,
hash_motion_identifiers,
hash_variant_identifiers,
)| {
hashes.remove(&hash)?;
hash_identifiers.remove(&hash)?;
hash_motion_identifiers.remove(&hash)?;
for key in &variant_keys {
hash_variant_identifiers.remove(key)?;
}
Ok(())
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;
match res {
Ok(()) => Ok(()),
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
Err(SledError::from(e).into())
}
}
}
}
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, alias: &Alias) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
let res = b!(
self.aliases,
aliases.compare_and_swap(bytes, None as Option<Self::Bytes>, Some(bytes2))
);
Ok(res.map_err(|_| AliasAlreadyExists))
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_delete_token(
async fn create(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, RepoError> {
let key = alias.to_bytes();
let token = delete_token.to_bytes();
hash: Self::Bytes,
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
let alias: sled::IVec = alias.to_bytes().into();
let delete_token: sled::IVec = delete_token.to_bytes().into();
let res = b!(self.alias_delete_tokens, {
let mut prev: Option<Self::Bytes> = None;
let aliases = self.aliases.clone();
let alias_hashes = self.alias_hashes.clone();
let hash_aliases = self.hash_aliases.clone();
let alias_delete_tokens = self.alias_delete_tokens.clone();
loop {
let key = key.clone();
let token = token.clone();
let res = alias_delete_tokens.compare_and_swap(key, prev, Some(token))?;
match res {
Ok(()) => return Ok(Ok(())) as Result<_, SledError>,
Err(CompareAndSwapError {
current: Some(token),
..
}) => {
if let Some(token) = DeleteToken::from_slice(&token) {
return Ok(Err(AlreadyExists(token)));
} else {
prev = Some(token);
};
let res = actix_web::web::block(move || {
(&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction(
|(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| {
if aliases.get(&alias)?.is_some() {
return Ok(Err(AliasAlreadyExists));
}
Err(CompareAndSwapError { current: None, .. }) => {
prev = None;
}
}
aliases.insert(&alias, &alias)?;
alias_hashes.insert(&alias, &hash)?;
hash_aliases.insert(&hash, &alias)?;
alias_delete_tokens.insert(&alias, &delete_token)?;
Ok(Ok(()))
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;
match res {
Ok(res) => Ok(res),
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
Err(SledError::from(e).into())
}
});
Ok(res)
}
}
#[tracing::instrument(level = "trace", skip(self))]
@ -1254,15 +1241,6 @@ impl AliasRepo for SledRepo {
Ok(Some(token))
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> {
let key = alias.to_bytes();
b!(self.alias_hashes, alias_hashes.insert(key, hash));
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn hash(&self, alias: &Alias) -> Result<Option<Self::Bytes>, RepoError> {
let key = alias.to_bytes();
@ -1272,19 +1250,50 @@ impl AliasRepo for SledRepo {
Ok(opt)
}
#[tracing::instrument(skip_all)]
async fn for_hash(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases
.scan_prefix(hash)
.values()
.filter_map(Result::ok)
.filter_map(|ivec| Alias::from_slice(&ivec))
.collect::<Vec<_>>()) as Result<_, sled::Error>
});
Ok(v)
}
#[tracing::instrument(skip(self))]
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> {
let key = alias.to_bytes();
let alias: sled::IVec = alias.to_bytes().into();
let key2 = key.clone();
b!(self.aliases, aliases.remove(key2));
let aliases = self.aliases.clone();
let alias_hashes = self.alias_hashes.clone();
let hash_aliases = self.hash_aliases.clone();
let alias_delete_tokens = self.alias_delete_tokens.clone();
let key2 = key.clone();
b!(self.alias_delete_tokens, alias_delete_tokens.remove(key2));
let res = actix_web::web::block(move || {
(&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction(
|(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| {
aliases.remove(&alias)?;
if let Some(hash) = alias_hashes.remove(&alias)? {
hash_aliases.remove(hash)?;
}
alias_delete_tokens.remove(&alias)?;
Ok(())
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;
b!(self.alias_hashes, alias_hashes.remove(key));
Ok(())
match res {
Ok(()) => Ok(()),
Err(TransactionError::Abort(e)) | Err(TransactionError::Storage(e)) => {
Err(SledError::from(e).into())
}
}
}
}

View file

@ -35,16 +35,6 @@ impl Identifier for FileId {
}
}
impl FileId {
pub(crate) fn normalize_for_migration(&self) -> Option<Self> {
if self.0.starts_with("files") {
Some(Self(self.0.components().skip(1).collect::<PathBuf>()))
} else {
None
}
}
}
impl FileStore {
pub(super) fn file_id_from_path(&self, path: PathBuf) -> Result<FileId, FileError> {
let stripped = path