Restructure attempt 1

This commit is contained in:
Aode (Lion) 2021-10-18 20:29:06 -05:00
parent 0f3be6566e
commit 195f614a4b
8 changed files with 255 additions and 132 deletions

10
Cargo.lock generated
View file

@ -1185,11 +1185,11 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"pin-project-lite",
"rand",
"serde",
"serde_json",
"sha2",
"sled",
"storage-path-generator",
"structopt",
"thiserror",
"time 0.3.3",
@ -1747,6 +1747,14 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "storage-path-generator"
version = "0.1.0"
source = "git+https://git.asonix.dog/asonix/storage-path-generator?branch=main#68f70707b0d04e5429f0c16d71ca329a9d4a9557"
dependencies = [
"parking_lot",
]
[[package]]
name = "strsim"
version = "0.8.0"

View file

@ -29,11 +29,11 @@ once_cell = "1.4.0"
opentelemetry = { version = "0.16", features = ["rt-tokio"] }
opentelemetry-otlp = "0.9"
pin-project-lite = "0.2.7"
rand = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.9.0"
sled = { version = "0.34.6" }
storage-path-generator = { version = "0.1.0", git = "https://git.asonix.dog/asonix/storage-path-generator", branch = "main" }
structopt = "0.3.14"
thiserror = "1.0"
time = { version = "0.3.0", features = ["serde"] }

View file

@ -69,6 +69,12 @@ pub(crate) enum UploadError {
#[error("Error interacting with filesystem, {0}")]
Io(#[from] std::io::Error),
#[error(transparent)]
PathGenerator(#[from] storage_path_generator::PathError),
#[error(transparent)]
StripPrefix(#[from] std::path::StripPrefixError),
#[error("Failed to acquire the semaphore")]
Semaphore,

View file

@ -36,6 +36,7 @@ use tracing_error::ErrorLayer;
use tracing_futures::Instrument;
use tracing_log::LogTracer;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter, Registry};
use uuid::Uuid;
mod config;
mod either;
@ -67,17 +68,7 @@ const HOURS: u32 = 60 * MINUTES;
const DAYS: u32 = 24 * HOURS;
static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
use rand::{
distributions::{Alphanumeric, Distribution},
thread_rng,
};
let mut rng = thread_rng();
let tmp_nonce = Alphanumeric
.sample_iter(&mut rng)
.take(7)
.map(char::from)
.collect::<String>();
let tmp_nonce = Uuid::new_v4();
let mut path = std::env::temp_dir();
path.push(format!("pict-rs-{}", tmp_nonce));
@ -262,15 +253,7 @@ async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), Error> {
}
pub(crate) fn tmp_file() -> PathBuf {
use rand::distributions::{Alphanumeric, Distribution};
let limit: usize = 10;
let rng = rand::thread_rng();
let s: String = Alphanumeric
.sample_iter(rng)
.take(limit)
.map(char::from)
.collect();
let s: String = Uuid::new_v4().to_string();
let name = format!("{}.tmp", s);

View file

@ -106,16 +106,6 @@ pub(crate) fn alias_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
(start, end)
}
pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut start = hash.to_vec();
start.extend(&[2]);
let mut end = hash.to_vec();
end.extend(&[3]);
(start, end)
}
pub(crate) fn alias_id_key(alias: &str) -> String {
format!("{}/id", alias)
}

View file

@ -1,15 +1,17 @@
use crate::{
config::Format,
error::{Error, UploadError},
migrate::{alias_id_key, alias_key, alias_key_bounds, variant_key_bounds, LatestDb},
migrate::{alias_id_key, alias_key, alias_key_bounds, LatestDb},
};
use actix_web::web;
use sha2::Digest;
use std::{path::PathBuf, sync::Arc};
use storage_path_generator::{Generator, Path};
use tracing::{debug, error, info, instrument, warn, Span};
use tracing_futures::Instrument;
mod hasher;
mod restructure;
mod session;
pub(super) use session::UploadManagerSession;
@ -22,17 +24,22 @@ pub(super) use session::UploadManagerSession;
// - Main Tree
// - hash -> filename
// - hash 0 u64(id) -> alias
// - hash 2 variant path -> variant path
// - DEPRECATED:
// - hash 2 variant path -> variant path
// - hash 2 vairant path details -> details
// - Filename Tree
// - filename -> hash
// - Details Tree
// - filename / relative path -> details
// - Path Tree
// - filename -> relative path
// - filename / variant operation path -> relative path
// - filename / relative variant path -> relative variant path
// - Settings Tree
// - last-path -> last generated path
// - fs-restructure-01-started -> bool
// - fs-restructure-01-complete -> bool
const GENERATOR_KEY: &'static [u8] = b"last-path";
#[derive(Clone)]
pub struct UploadManager {
inner: Arc<UploadManagerInner>,
@ -45,8 +52,10 @@ struct UploadManagerInner {
alias_tree: sled::Tree,
filename_tree: sled::Tree,
main_tree: sled::Tree,
details_tree: sled::Tree,
path_tree: sled::Tree,
settings_tree: sled::Tree,
path_gen: Generator,
db: sled::Db,
}
@ -86,6 +95,8 @@ impl UploadManager {
let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?;
let manager = UploadManager {
inner: Arc::new(UploadManagerInner {
format,
@ -93,31 +104,35 @@ impl UploadManager {
image_dir: root_dir,
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
details_tree: db.open_tree("details")?,
main_tree: db.open_tree("main")?,
path_tree: db.open_tree("path")?,
settings_tree,
path_gen,
db,
}),
};
manager.restructure().await?;
Ok(manager)
}
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant(&self, path: PathBuf, filename: String) -> Result<(), Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let path_bytes = self
.generalize_path(&path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let fname_tree = self.inner.filename_tree.clone();
debug!("Getting hash");
let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFilename)?;
let key = self.variant_key(&path, &filename)?;
let path_tree = self.inner.path_tree.clone();
let key = variant_key(&hash, &path_string);
let main_tree = self.inner.main_tree.clone();
debug!("Storing variant");
web::block(move || main_tree.insert(key, path_string.as_bytes())).await??;
web::block(move || path_tree.insert(key, path_bytes)).await??;
debug!("Stored variant");
Ok(())
@ -130,18 +145,11 @@ impl UploadManager {
path: PathBuf,
filename: String,
) -> Result<Option<Details>, Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let key = self.details_key(&path, &filename)?;
let details_tree = self.inner.details_tree.clone();
let fname_tree = self.inner.filename_tree.clone();
debug!("Getting hash");
let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFilename)?;
let key = variant_details_key(&hash, &path_string);
let main_tree = self.inner.main_tree.clone();
debug!("Getting details");
let opt = match web::block(move || main_tree.get(key)).await?? {
let opt = match web::block(move || details_tree.get(key)).await?? {
Some(ivec) => match serde_json::from_slice(&ivec) {
Ok(details) => Some(details),
Err(_) => None,
@ -160,19 +168,12 @@ impl UploadManager {
filename: String,
details: &Details,
) -> Result<(), Error> {
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let key = self.details_key(&path, &filename)?;
let details_tree = self.inner.details_tree.clone();
let details_value = serde_json::to_vec(details)?;
let fname_tree = self.inner.filename_tree.clone();
debug!("Getting hash");
let hash: sled::IVec = web::block(move || fname_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFilename)?;
let key = variant_details_key(&hash, &path_string);
let main_tree = self.inner.main_tree.clone();
let details_value = serde_json::to_string(details)?;
debug!("Storing details");
web::block(move || main_tree.insert(key, details_value.as_bytes())).await??;
web::block(move || details_tree.insert(key, details_value)).await??;
debug!("Stored details");
Ok(())
@ -198,6 +199,21 @@ impl UploadManager {
self.aliases_by_hash(&hash).await
}
fn next_directory(&self) -> Result<PathBuf, Error> {
let path = self.inner.path_gen.next();
self.inner
.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
let mut target_path = self.image_dir();
for dir in path.to_strings() {
target_path.push(dir)
}
Ok(target_path)
}
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, Error> {
let (start, end) = alias_key_bounds(hash);
let main_tree = self.inner.main_tree.clone();
@ -375,41 +391,44 @@ impl UploadManager {
errors.push(e.into());
}
let filename2 = filename.clone();
let fname_tree = self.inner.filename_tree.clone();
debug!("Deleting filename -> hash mapping");
let hash = web::block(move || fname_tree.remove(filename))
.await??
.ok_or(UploadError::MissingFile)?;
web::block(move || fname_tree.remove(filename2)).await??;
let (start, end) = variant_key_bounds(&hash);
let main_tree = self.inner.main_tree.clone();
let path_prefix = filename.clone();
let path_tree = self.inner.path_tree.clone();
debug!("Fetching file variants");
let keys = web::block(move || {
let mut keys = Vec::new();
for key in main_tree.range(start..end).keys() {
keys.push(key?.to_owned());
}
Ok(keys) as Result<Vec<sled::IVec>, Error>
let paths = web::block(move || {
path_tree
.scan_prefix(path_prefix)
.values()
.collect::<Result<Vec<sled::IVec>, sled::Error>>()
})
.await??;
debug!("{} files prepared for deletion", keys.len());
debug!("{} files prepared for deletion", paths.len());
for key in keys {
let main_tree = self.inner.main_tree.clone();
if let Some(path) = web::block(move || main_tree.remove(key)).await?? {
let s = String::from_utf8_lossy(&path);
debug!("Deleting {}", s);
// ignore json objects
if !s.starts_with('{') {
if let Err(e) = remove_path(path).await {
errors.push(e);
}
}
for path in paths {
let s = String::from_utf8_lossy(&path);
debug!("Deleting {}", s);
if let Err(e) = remove_path(path).await {
errors.push(e);
}
}
let path_prefix = filename.clone();
let path_tree = self.inner.path_tree.clone();
debug!("Deleting path info");
web::block(move || {
for res in path_tree.scan_prefix(path_prefix).keys() {
let key = res?;
path_tree.remove(key)?;
}
Ok(()) as Result<(), Error>
})
.await??;
for error in errors {
error!("Error deleting files, {}", error);
}
@ -473,6 +492,16 @@ impl FilenameIVec {
}
}
fn init_generator(settings: &sled::Tree) -> Result<Generator, Error> {
if let Some(ivec) = settings.get(GENERATOR_KEY)? {
Ok(Generator::from_existing(Path::from_be_bytes(
ivec.to_vec(),
)?))
} else {
Ok(Generator::new())
}
}
async fn remove_path(path: sled::IVec) -> Result<(), Error> {
let path_string = String::from_utf8(path.to_vec())?;
tokio::fs::remove_file(path_string).await?;
@ -490,21 +519,6 @@ fn delete_key(alias: &str) -> String {
format!("{}/delete", alias)
}
fn variant_key(hash: &[u8], path: &str) -> Vec<u8> {
let mut key = hash.to_vec();
key.extend(&[2]);
key.extend(path.as_bytes());
key
}
fn variant_details_key(hash: &[u8], path: &str) -> Vec<u8> {
let mut key = hash.to_vec();
key.extend(&[2]);
key.extend(path.as_bytes());
key.extend(b"details");
key
}
impl std::fmt::Debug for UploadManager {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager").finish()

View file

@ -0,0 +1,145 @@
use crate::{
error::{Error, UploadError},
safe_move_file,
upload_manager::UploadManager,
};
use std::path::{Path, PathBuf};
const RESTRUCTURE_COMPLETE: &'static [u8] = b"fs-restructure-01-complete";
const DETAILS: &'static [u8] = b"details";
impl UploadManager {
#[tracing::instrument(skip(self))]
pub(super) async fn restructure(&self) -> Result<(), Error> {
if self.restructure_complete()? {
return Ok(());
}
for res in self.inner.filename_tree.iter() {
let (filename, hash) = res?;
let filename = String::from_utf8(filename.to_vec())?;
tracing::info!("Migrating {}", filename);
let mut file_path = self.image_dir();
file_path.push(filename.clone());
if tokio::fs::metadata(&file_path).await.is_ok() {
let mut target_path = self.next_directory()?;
target_path.push(filename.clone());
let target_path_bytes = self
.generalize_path(&target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
self.inner
.path_tree
.insert(filename.as_bytes(), target_path_bytes)?;
safe_move_file(file_path, target_path).await?;
}
let (start, end) = variant_key_bounds(&hash);
for res in self.inner.main_tree.range(start..end) {
let (hash_variant_key, variant_path_or_details) = res?;
if hash_variant_key.ends_with(DETAILS) {
let details = variant_path_or_details;
let start_index = hash.len() + 1;
let end_index = hash_variant_key.len() - DETAILS.len();
let path_bytes = &hash_variant_key[start_index..end_index];
let variant_path = PathBuf::from(String::from_utf8(path_bytes.to_vec())?);
let key = self.details_key(&variant_path, &filename)?;
self.inner.details_tree.insert(key, details)?;
} else {
let variant_path =
PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?);
if tokio::fs::metadata(&variant_path).await.is_ok() {
let mut target_path = self.next_directory()?;
target_path.push(filename.clone());
let relative_target_path_bytes = self
.generalize_path(&target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let variant_key = self.variant_key(&target_path, &filename)?;
self.inner
.path_tree
.insert(variant_key, relative_target_path_bytes)?;
safe_move_file(variant_path, target_path).await?;
}
}
self.inner.main_tree.remove(hash_variant_key)?;
}
}
self.mark_restructure_complete()?;
Ok(())
}
fn restructure_complete(&self) -> Result<bool, Error> {
Ok(!self
.inner
.settings_tree
.get(RESTRUCTURE_COMPLETE)?
.is_some())
}
fn mark_restructure_complete(&self) -> Result<(), Error> {
self.inner
.settings_tree
.insert(RESTRUCTURE_COMPLETE, b"true")?;
Ok(())
}
pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&self.inner.image_dir)?)
}
pub(super) fn details_key(
&self,
variant_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
pub(super) fn variant_key(
&self,
variant_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
}
pub(crate) fn variant_key_bounds(hash: &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut start = hash.to_vec();
start.extend(&[2]);
let mut end = hash.to_vec();
end.extend(&[3]);
(start, end)
}

View file

@ -14,6 +14,7 @@ use std::path::PathBuf;
use tokio::io::AsyncRead;
use tracing::{debug, instrument, warn, Span};
use tracing_futures::Instrument;
use uuid::Uuid;
type UploadStream<E> = LocalBoxStream<'static, Result<web::Bytes, E>>;
@ -98,13 +99,7 @@ impl UploadManagerSession {
let alias = self.alias.clone().ok_or(UploadError::MissingAlias)?;
debug!("Generating delete token");
use rand::distributions::{Alphanumeric, Distribution};
let rng = rand::thread_rng();
let s: String = Alphanumeric
.sample_iter(rng)
.take(10)
.map(char::from)
.collect();
let s: String = Uuid::new_v4().to_string();
let delete_token = s.clone();
debug!("Saving delete token");
@ -286,17 +281,10 @@ impl UploadManagerSession {
#[instrument(skip(self, content_type))]
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
let image_dir = self.manager.image_dir();
use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10;
let mut rng = rand::thread_rng();
loop {
debug!("Filename generation loop");
let mut path = image_dir.clone();
let s: String = Alphanumeric
.sample_iter(&mut rng)
.take(limit)
.map(char::from)
.collect();
let s: String = Uuid::new_v4().to_string();
let filename = file_name(s, content_type.clone())?;
@ -311,8 +299,6 @@ impl UploadManagerSession {
}
debug!("Filename exists, trying again");
limit += 1;
}
}
@ -376,16 +362,9 @@ impl UploadManagerSession {
// Generate an alias to the file
#[instrument(skip(self, hash, content_type))]
async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<String, Error> {
use rand::distributions::{Alphanumeric, Distribution};
let mut limit: usize = 10;
let mut rng = rand::thread_rng();
loop {
debug!("Alias gen loop");
let s: String = Alphanumeric
.sample_iter(&mut rng)
.take(limit)
.map(char::from)
.collect();
let s: String = Uuid::new_v4().to_string();
let alias = file_name(s, content_type.clone())?;
self.alias = Some(alias.clone());
@ -395,8 +374,6 @@ impl UploadManagerSession {
return Ok(alias);
}
debug!("Alias exists, regenning");
limit += 1;
}
}