Clean migration, don't use db tree

This commit is contained in:
asonix 2020-09-14 16:42:31 -05:00
parent fd10934bf8
commit 6de19d17b6
4 changed files with 273 additions and 92 deletions

View file

@ -1,12 +1,53 @@
use crate::UploadError;
use sled as sled034;
use sled032;
use sled;
use std::path::PathBuf;
use tracing::{debug, info, warn};
const SLED_034: &str = "db-0.34";
const SLED_032: &str = "db-0.32";
const SLED_0320_RC1: &str = "db";
mod s032;
mod s034;
type SledIter = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), UploadError>>>;
trait SledDb {
type SledTree: SledTree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, UploadError>;
fn self_tree(&self) -> &Self::SledTree;
}
impl<T> SledDb for &T
where
T: SledDb,
{
type SledTree = T::SledTree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, UploadError> {
(*self).open_tree(name)
}
fn self_tree(&self) -> &Self::SledTree {
(*self).self_tree()
}
}
trait SledTree {
fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>, UploadError>
where
K: AsRef<[u8]>;
fn insert<K, V>(&self, key: K, value: V) -> Result<(), UploadError>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>;
fn iter(&self) -> SledIter;
fn range<K, R>(&self, range: R) -> SledIter
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>;
}
pub(crate) struct LatestDb {
root_dir: PathBuf,
@ -20,7 +61,7 @@ impl LatestDb {
LatestDb { root_dir, version }
}
pub(crate) fn migrate(self) -> Result<sled034::Db, UploadError> {
pub(crate) fn migrate(self) -> Result<sled::Db, UploadError> {
let LatestDb { root_dir, version } = self;
version.migrate(root_dir)
@ -36,84 +77,62 @@ enum DbVersion {
impl DbVersion {
fn exists(root: PathBuf) -> Self {
let mut sled_dir = root.clone();
sled_dir.push("sled");
sled_dir.push(SLED_034);
if std::fs::metadata(sled_dir).is_ok() {
if s034::exists(root.clone()) {
return DbVersion::Sled034;
}
let mut sled_dir = root.clone();
sled_dir.push("sled");
sled_dir.push(SLED_032);
if std::fs::metadata(sled_dir).is_ok() {
if s032::exists(root.clone()) {
return DbVersion::Sled032;
}
let mut sled_dir = root;
sled_dir.push(SLED_0320_RC1);
if std::fs::metadata(sled_dir).is_ok() {
if s032::exists_rc1(root.clone()) {
return DbVersion::Sled0320Rc1;
}
DbVersion::Fresh
}
fn migrate(self, root: PathBuf) -> Result<sled034::Db, UploadError> {
fn migrate(self, root: PathBuf) -> Result<sled::Db, UploadError> {
match self {
DbVersion::Sled0320Rc1 => {
migrate_0_32_0_rc1(root.clone())?;
migrate_0_32(root)
}
DbVersion::Sled0320Rc1 => migrate_0_32_0_rc1(root),
DbVersion::Sled032 => migrate_0_32(root),
DbVersion::Sled034 | DbVersion::Fresh => {
let mut sled_dir = root;
sled_dir.push("sled");
sled_dir.push(SLED_034);
Ok(sled034::open(sled_dir)?)
}
DbVersion::Sled034 | DbVersion::Fresh => s034::open(root),
}
}
}
fn migrate_0_32(mut root: PathBuf) -> Result<sled034::Db, UploadError> {
info!("Migrating database from 0.32 to 0.34");
root.push("sled");
fn migrate_0_32_0_rc1(root: PathBuf) -> Result<sled::Db, UploadError> {
info!("Migrating database from 0.32.0-rc1 to 0.32.0");
let mut sled_dir = root.clone();
sled_dir.push(SLED_032);
let old_db = s032::open_rc1(root.clone())?;
let new_db = s034::open(root)?;
let mut new_sled_dir = root.clone();
new_sled_dir.push(SLED_034);
let old_db = sled032::open(sled_dir)?;
let new_db = sled034::open(new_sled_dir)?;
new_db.import(old_db.export());
Ok(new_db)
migrate(old_db, new_db)
}
fn migrate_0_32_0_rc1(root: PathBuf) -> Result<sled032::Db, UploadError> {
info!("Migrating database from 0.32.0-rc1 to 0.32.0");
let mut sled_dir = root.clone();
sled_dir.push("db");
fn migrate_0_32(root: PathBuf) -> Result<sled::Db, UploadError> {
info!("Migrating database from 0.32 to 0.34");
let mut new_sled_dir = root;
new_sled_dir.push("sled");
new_sled_dir.push(SLED_032);
let old_db = s032::open(root.clone())?;
let new_db = s034::open(root)?;
let old_db = sled032::open(sled_dir)?;
let new_db = sled032::open(new_sled_dir)?;
migrate(old_db, new_db)
}
fn migrate<Old, New>(old_db: Old, new_db: New) -> Result<New, UploadError>
where
Old: SledDb,
New: SledDb,
{
let old_alias_tree = old_db.open_tree("alias")?;
let new_alias_tree = new_db.open_tree("alias")?;
let old_fname_tree = old_db.open_tree("filename")?;
let new_fname_tree = new_db.open_tree("filename")?;
for res in old_alias_tree.iter().keys() {
let k = res?;
for res in old_alias_tree.iter() {
let (k, _) = res?;
if let Some(v) = old_alias_tree.get(&k)? {
if !k.contains(&b"/"[0]) {
// k is an alias
@ -135,8 +154,8 @@ fn migrate_0_32_0_rc1(root: PathBuf) -> Result<sled032::Db, UploadError> {
}
}
for res in old_fname_tree.iter().keys() {
let k = res?;
for res in old_fname_tree.iter() {
let (k, _) = res?;
if let Some(v) = old_fname_tree.get(&k)? {
debug!(
"Moving file -> hash for file {}",
@ -148,37 +167,43 @@ fn migrate_0_32_0_rc1(root: PathBuf) -> Result<sled032::Db, UploadError> {
}
}
Ok(new_db) as Result<sled032::Db, UploadError>
Ok(new_db)
}
fn migrate_main_tree(
alias: &sled032::IVec,
hash: &sled032::IVec,
old_db: &sled032::Db,
new_db: &sled032::Db,
) -> Result<(), UploadError> {
fn migrate_main_tree<Old, New>(
alias: &[u8],
hash: &[u8],
old_db: Old,
new_db: New,
) -> Result<(), UploadError>
where
Old: SledDb,
New: SledDb,
{
let main_tree = new_db.open_tree("main")?;
debug!(
"Migrating files for {}",
String::from_utf8_lossy(alias.as_ref())
);
if let Some(v) = old_db.get(&hash)? {
new_db.insert(&hash, v)?;
if let Some(v) = old_db.self_tree().get(&hash)? {
main_tree.insert(&hash, v)?;
} else {
warn!("Missing filename");
}
let (start, end) = alias_key_bounds(&hash);
for res in old_db.range(start..end) {
for res in old_db.self_tree().range(start..end) {
let (k, v) = res?;
debug!("Moving alias {}", String::from_utf8_lossy(v.as_ref()));
new_db.insert(k, v)?;
main_tree.insert(k, v)?;
}
let (start, end) = variant_key_bounds(&hash);
for res in old_db.range(start..end) {
for res in old_db.self_tree().range(start..end) {
let (k, v) = res?;
debug!("Moving variant {}", String::from_utf8_lossy(v.as_ref()));
new_db.insert(k, v)?;
main_tree.insert(k, v)?;
}
Ok(())

81
src/migrate/s032.rs Normal file
View file

@ -0,0 +1,81 @@
use crate::{
migrate::{SledDb, SledIter, SledTree},
UploadError,
};
use std::path::PathBuf;
const SLED_032: &str = "db-0.32";
const SLED_0320_RC1: &str = "db";
pub(crate) fn exists_rc1(mut base: PathBuf) -> bool {
base.push(SLED_0320_RC1);
std::fs::metadata(base).is_ok()
}
pub(crate) fn open_rc1(mut base: PathBuf) -> Result<sled032::Db, UploadError> {
base.push(SLED_0320_RC1);
Ok(sled032::open(base)?)
}
pub(crate) fn exists(mut base: PathBuf) -> bool {
base.push("sled");
base.push(SLED_032);
std::fs::metadata(base).is_ok()
}
pub(crate) fn open(mut base: PathBuf) -> Result<sled032::Db, UploadError> {
base.push("sled");
base.push(SLED_032);
Ok(sled032::open(base)?)
}
impl SledDb for sled032::Db {
type SledTree = sled032::Tree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, UploadError> {
Ok(sled032::Db::open_tree(self, name)?)
}
fn self_tree(&self) -> &Self::SledTree {
&*self
}
}
impl SledTree for sled032::Tree {
fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>, UploadError>
where
K: AsRef<[u8]>,
{
Ok(sled032::Tree::get(self, key)?.map(|v| Vec::from(v.as_ref())))
}
fn insert<K, V>(&self, key: K, value: V) -> Result<(), UploadError>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
Ok(sled032::Tree::insert(self, key, value.as_ref().to_vec()).map(|_| ())?)
}
fn iter(&self) -> SledIter {
Box::new(sled032::Tree::iter(self).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(UploadError::from)
}))
}
fn range<K, R>(&self, range: R) -> SledIter
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>,
{
Box::new(sled032::Tree::range(self, range).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(UploadError::from)
}))
}
}

69
src/migrate/s034.rs Normal file
View file

@ -0,0 +1,69 @@
use crate::{
migrate::{SledDb, SledIter, SledTree},
UploadError,
};
use sled as sled034;
use std::path::PathBuf;
const SLED_034: &str = "db-0.34";
pub(crate) fn exists(mut base: PathBuf) -> bool {
base.push("sled");
base.push(SLED_034);
std::fs::metadata(base).is_ok()
}
pub(crate) fn open(mut base: PathBuf) -> Result<sled034::Db, UploadError> {
base.push("sled");
base.push(SLED_034);
Ok(sled034::open(base)?)
}
impl SledDb for sled034::Db {
type SledTree = sled034::Tree;
fn open_tree(&self, name: &str) -> Result<Self::SledTree, UploadError> {
Ok(sled034::Db::open_tree(self, name)?)
}
fn self_tree(&self) -> &Self::SledTree {
&*self
}
}
impl SledTree for sled034::Tree {
fn get<K>(&self, key: K) -> Result<Option<Vec<u8>>, UploadError>
where
K: AsRef<[u8]>,
{
Ok(sled034::Tree::get(self, key)?.map(|v| Vec::from(v.as_ref())))
}
fn insert<K, V>(&self, key: K, value: V) -> Result<(), UploadError>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
Ok(sled034::Tree::insert(self, key, value.as_ref().to_vec()).map(|_| ())?)
}
fn iter(&self) -> SledIter {
Box::new(sled034::Tree::iter(self).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(UploadError::from)
}))
}
fn range<K, R>(&self, range: R) -> SledIter
where
K: AsRef<[u8]>,
R: std::ops::RangeBounds<K>,
{
Box::new(sled034::Tree::range(self, range).map(|res| {
res.map(|(k, v)| (k.as_ref().to_vec(), v.as_ref().to_vec()))
.map_err(UploadError::from)
}))
}
}

View file

@ -22,6 +22,7 @@ struct UploadManagerInner {
image_dir: PathBuf,
alias_tree: sled::Tree,
filename_tree: sled::Tree,
main_tree: sled::Tree,
db: sled::Db,
}
@ -106,6 +107,7 @@ impl UploadManager {
image_dir: root_dir,
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
main_tree: db.open_tree("main")?,
db,
}),
})
@ -127,9 +129,9 @@ impl UploadManager {
.ok_or(UploadError::MissingFilename)?;
let key = variant_key(&hash, &path_string);
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
debug!("Storing variant");
web::block(move || db.insert(key, path_string.as_bytes())).await?;
web::block(move || main_tree.insert(key, path_string.as_bytes())).await?;
debug!("Stored variant");
Ok(())
@ -160,10 +162,14 @@ impl UploadManager {
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, UploadError> {
let (start, end) = alias_key_bounds(hash);
let db = self.inner.db.clone();
let aliases =
web::block(move || db.range(start..end).values().collect::<Result<Vec<_>, _>>())
.await?;
let main_tree = self.inner.main_tree.clone();
let aliases = web::block(move || {
main_tree
.range(start..end)
.values()
.collect::<Result<Vec<_>, _>>()
})
.await?;
debug!("Got {} aliases for hash", aliases.len());
let aliases = aliases
@ -193,15 +199,15 @@ impl UploadManager {
#[instrument(skip(self, alias, token))]
pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), UploadError> {
use sled::Transactional;
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
let alias_tree = self.inner.alias_tree.clone();
let span = Span::current();
let alias2 = alias.clone();
let hash = web::block(move || {
[&*db, &alias_tree].transaction(|v| {
[&main_tree, &alias_tree].transaction(|v| {
let entered = span.enter();
let db = &v[0];
let main_tree = &v[0];
let alias_tree = &v[1];
// -- GET TOKEN --
@ -231,7 +237,7 @@ impl UploadManager {
// -- REMOVE HASH TREE ELEMENT --
debug!("Deleting hash -> alias mapping");
db.remove(alias_key(&hash, &id))?;
main_tree.remove(alias_key(&hash, &id))?;
drop(entered);
Ok(hash)
})
@ -239,11 +245,11 @@ impl UploadManager {
.await?;
// -- CHECK IF ANY OTHER ALIASES EXIST --
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
let (start, end) = alias_key_bounds(&hash);
debug!("Checking for additional aliases referencing hash");
let any_aliases = web::block(move || {
Ok(db.range(start..end).next().is_some()) as Result<bool, UploadError>
Ok(main_tree.range(start..end).next().is_some()) as Result<bool, UploadError>
})
.await?;
@ -254,10 +260,10 @@ impl UploadManager {
}
// -- DELETE HASH ENTRY --
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
let hash2 = hash.clone();
debug!("Deleting hash -> filename mapping");
let filename = web::block(move || db.remove(&hash2))
let filename = web::block(move || main_tree.remove(&hash2))
.await?
.ok_or(UploadError::MissingFile)?;
@ -403,9 +409,9 @@ impl UploadManager {
.await?
.ok_or(UploadError::MissingAlias)?;
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
debug!("Getting filename from hash");
let filename = web::block(move || db.get(hash))
let filename = web::block(move || main_tree.get(hash))
.await?
.ok_or(UploadError::MissingFile)?;
@ -435,11 +441,11 @@ impl UploadManager {
.ok_or(UploadError::MissingFile)?;
let (start, end) = variant_key_bounds(&hash);
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
debug!("Fetching file variants");
let keys = web::block(move || {
let mut keys = Vec::new();
for key in db.range(start..end).keys() {
for key in main_tree.range(start..end).keys() {
keys.push(key?.to_owned());
}
@ -450,8 +456,8 @@ impl UploadManager {
debug!("{} files prepared for deletion", keys.len());
for key in keys {
let db = self.inner.db.clone();
if let Some(path) = web::block(move || db.remove(key)).await? {
let main_tree = self.inner.main_tree.clone();
if let Some(path) = web::block(move || main_tree.remove(key)).await? {
debug!("Deleting {:?}", String::from_utf8(path.to_vec()));
if let Err(e) = remove_path(path).await {
errors.push(e);
@ -518,14 +524,14 @@ impl UploadManager {
hash: Hash,
content_type: mime::Mime,
) -> Result<(Dup, String), UploadError> {
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
let filename = self.next_file(content_type).await?;
let filename2 = filename.clone();
let hash2 = hash.inner.clone();
debug!("Inserting filename for hash");
let res = web::block(move || {
db.compare_and_swap(
main_tree.compare_and_swap(
hash2,
None as Option<sled::IVec>,
Some(filename2.as_bytes()),
@ -618,11 +624,11 @@ impl UploadManager {
let id = web::block(move || db.generate_id()).await?.to_string();
let key = alias_key(&hash.inner, &id);
let db = self.inner.db.clone();
let main_tree = self.inner.main_tree.clone();
let alias2 = alias.clone();
debug!("Saving hash/id -> alias mapping");
let res = web::block(move || {
db.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
main_tree.compare_and_swap(key, None as Option<sled::IVec>, Some(alias2.as_bytes()))
})
.await?;