Simplify object and file path generation

This commit is contained in:
asonix 2024-02-26 15:43:30 -06:00
parent 7c6112e631
commit c17a8722c6
7 changed files with 43 additions and 122 deletions

7
Cargo.lock generated
View file

@ -1875,7 +1875,6 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sha2", "sha2",
"sled", "sled",
"storage-path-generator",
"streem", "streem",
"subtle", "subtle",
"thiserror", "thiserror",
@ -2738,12 +2737,6 @@ dependencies = [
"der", "der",
] ]
[[package]]
name = "storage-path-generator"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f11d35dae9818c4313649da4a97c8329e29357a7fe584526c1d78f5b63ef836"
[[package]] [[package]]
name = "streem" name = "streem"
version = "0.2.0" version = "0.2.0"

View file

@ -60,7 +60,6 @@ serde_json = "1.0"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
sha2 = "0.10.0" sha2 = "0.10.0"
sled = { version = "0.34.7" } sled = { version = "0.34.7" }
storage-path-generator = "0.1.0"
streem = "0.2.0" streem = "0.2.0"
subtle = { version = "2.5.0", default-features = false } subtle = { version = "2.5.0", default-features = false }
thiserror = "1.0" thiserror = "1.0"

View file

@ -33,9 +33,6 @@ impl ErrorCode {
pub(crate) const FILE_IO_ERROR: ErrorCode = ErrorCode { pub(crate) const FILE_IO_ERROR: ErrorCode = ErrorCode {
code: "file-io-error", code: "file-io-error",
}; };
pub(crate) const PARSE_PATH_ERROR: ErrorCode = ErrorCode {
code: "parse-path-error",
};
pub(crate) const FILE_EXISTS: ErrorCode = ErrorCode { pub(crate) const FILE_EXISTS: ErrorCode = ErrorCode {
code: "file-exists", code: "file-exists",
}; };

24
src/file_path.rs Normal file
View file

@ -0,0 +1,24 @@
use std::path::PathBuf;
use uuid::Uuid;
pub(crate) fn generate_disk(mut path: PathBuf) -> PathBuf {
path.extend(generate());
path
}
pub(crate) fn generate_object() -> String {
generate().join("/")
}
fn generate() -> Vec<String> {
Uuid::now_v7()
.into_bytes()
.into_iter()
.map(to_hex)
.collect()
}
fn to_hex(byte: u8) -> String {
format!("{byte:x}")
}

View file

@ -11,6 +11,7 @@ mod error_code;
mod exiftool; mod exiftool;
mod ffmpeg; mod ffmpeg;
mod file; mod file;
mod file_path;
mod formats; mod formats;
mod future; mod future;
mod generate; mod generate;
@ -1766,7 +1767,7 @@ where
{ {
match to { match to {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let store = FileStore::build(path.clone(), repo.clone()).await?; let store = FileStore::build(path.clone()).await?;
let to = State { let to = State {
config, config,
@ -1806,7 +1807,6 @@ where
signature_duration.unwrap_or(15), signature_duration.unwrap_or(15),
client_timeout.unwrap_or(30), client_timeout.unwrap_or(30),
public_endpoint, public_endpoint,
repo.clone(),
) )
.await? .await?
.build(client.clone()); .build(client.clone());
@ -1991,7 +1991,7 @@ impl PictRsConfiguration {
match from { match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?; let from = FileStore::build(path.clone()).await?;
migrate_inner( migrate_inner(
config, config,
tmp_dir, tmp_dir,
@ -2034,7 +2034,6 @@ impl PictRsConfiguration {
signature_duration.unwrap_or(15), signature_duration.unwrap_or(15),
client_timeout.unwrap_or(30), client_timeout.unwrap_or(30),
public_endpoint, public_endpoint,
repo.clone(),
) )
.await? .await?
.build(client.clone()); .build(client.clone());
@ -2075,7 +2074,7 @@ impl PictRsConfiguration {
config::Store::Filesystem(config::Filesystem { path }) => { config::Store::Filesystem(config::Filesystem { path }) => {
let arc_repo = repo.to_arc(); let arc_repo = repo.to_arc();
let store = FileStore::build(path, arc_repo.clone()).await?; let store = FileStore::build(path).await?;
let state = State { let state = State {
tmp_dir: tmp_dir.clone(), tmp_dir: tmp_dir.clone(),
@ -2135,7 +2134,6 @@ impl PictRsConfiguration {
signature_duration, signature_duration,
client_timeout, client_timeout,
public_endpoint, public_endpoint,
arc_repo.clone(),
) )
.await? .await?
.build(client.clone()); .build(client.clone());

View file

@ -1,32 +1,21 @@
use crate::{ use crate::{error_code::ErrorCode, file::File, store::Store, stream::LocalBoxStream};
error_code::ErrorCode, file::File, repo::ArcRepo, store::Store, stream::LocalBoxStream,
};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_core::Stream; use futures_core::Stream;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader; use tokio_util::io::StreamReader;
use tracing::Instrument; use tracing::Instrument;
use super::StoreError; use super::StoreError;
// - Settings Tree
// - last-path -> last generated path
const GENERATOR_KEY: &str = "last-path";
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum FileError { pub(crate) enum FileError {
#[error("Failed to read or write file")] #[error("Failed to read or write file")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Couldn't strip root dir")] #[error("Couldn't strip root dir")]
PrefixError, PrefixError,
@ -41,7 +30,6 @@ impl FileError {
pub(super) const fn error_code(&self) -> ErrorCode { pub(super) const fn error_code(&self) -> ErrorCode {
match self { match self {
Self::Io(_) => ErrorCode::FILE_IO_ERROR, Self::Io(_) => ErrorCode::FILE_IO_ERROR,
Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR,
Self::FileExists => ErrorCode::FILE_EXISTS, Self::FileExists => ErrorCode::FILE_EXISTS,
Self::StringError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR, Self::StringError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR,
} }
@ -50,9 +38,7 @@ impl FileError {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct FileStore { pub(crate) struct FileStore {
path_gen: Generator,
root_dir: PathBuf, root_dir: PathBuf,
repo: ArcRepo,
} }
impl Store for FileStore { impl Store for FileStore {
@ -76,7 +62,7 @@ impl Store for FileStore {
{ {
let mut reader = std::pin::pin!(reader); let mut reader = std::pin::pin!(reader);
let path = self.next_file().await?; let path = self.next_file();
if let Err(e) = self.safe_save_reader(&path, &mut reader).await { if let Err(e) = self.safe_save_reader(&path, &mut reader).await {
self.safe_remove_file(&path).await?; self.safe_remove_file(&path).await?;
@ -165,17 +151,10 @@ impl Store for FileStore {
} }
impl FileStore { impl FileStore {
#[tracing::instrument(skip(repo))] pub(crate) async fn build(root_dir: PathBuf) -> color_eyre::Result<Self> {
pub(crate) async fn build(root_dir: PathBuf, repo: ArcRepo) -> color_eyre::Result<Self> {
let path_gen = init_generator(&repo).await?;
tokio::fs::create_dir_all(&root_dir).await?; tokio::fs::create_dir_all(&root_dir).await?;
Ok(FileStore { Ok(FileStore { root_dir })
root_dir,
path_gen,
repo,
})
} }
fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, FileError> { fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, FileError> {
@ -190,26 +169,11 @@ impl FileStore {
self.root_dir.join(file_id.as_ref()) self.root_dir.join(file_id.as_ref())
} }
async fn next_directory(&self) -> Result<PathBuf, StoreError> { fn next_file(&self) -> PathBuf {
let path = self.path_gen.next(); let target_path = crate::file_path::generate_disk(self.root_dir.clone());
self.repo
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
let mut target_path = self.root_dir.clone();
for dir in path.to_strings() {
target_path.push(dir)
}
Ok(target_path)
}
async fn next_file(&self) -> Result<PathBuf, StoreError> {
let target_path = self.next_directory().await?;
let filename = uuid::Uuid::new_v4().to_string(); let filename = uuid::Uuid::new_v4().to_string();
Ok(target_path.join(filename)) target_path.join(filename)
} }
#[tracing::instrument(level = "debug", skip(self, path), fields(path = ?path.as_ref()))] #[tracing::instrument(level = "debug", skip(self, path), fields(path = ?path.as_ref()))]
@ -266,20 +230,9 @@ pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), Fi
Ok(()) Ok(())
} }
async fn init_generator(repo: &ArcRepo) -> Result<Generator, StoreError> {
if let Some(ivec) = repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec()).map_err(FileError::from)?,
))
} else {
Ok(Generator::new())
}
}
impl std::fmt::Debug for FileStore { impl std::fmt::Debug for FileStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileStore") f.debug_struct("FileStore")
.field("path_gen", &"generator")
.field("root_dir", &self.root_dir) .field("root_dir", &self.root_dir)
.finish() .finish()
} }

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, repo::ArcRepo, bytes_stream::BytesStream, error_code::ErrorCode, future::WithMetrics, store::Store,
store::Store, stream::LocalBoxStream, sync::DropHandle, stream::LocalBoxStream, sync::DropHandle,
}; };
use actix_web::{ use actix_web::{
error::BlockingError, error::BlockingError,
@ -20,7 +20,6 @@ use rusty_s3::{
Bucket, BucketError, Credentials, UrlStyle, Bucket, BucketError, Credentials, UrlStyle,
}; };
use std::{string::FromUtf8Error, sync::Arc, time::Duration}; use std::{string::FromUtf8Error, sync::Arc, time::Duration};
use storage_path_generator::{Generator, Path};
use streem::IntoStreamer; use streem::IntoStreamer;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
@ -31,16 +30,8 @@ use super::StoreError;
const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
// - Settings Tree
// - last-path -> last generated path
const GENERATOR_KEY: &str = "last-path";
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ObjectError { pub(crate) enum ObjectError {
#[error("Failed to generate path")]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Failed to generate request")] #[error("Failed to generate request")]
S3(#[from] BucketError), S3(#[from] BucketError),
@ -98,7 +89,6 @@ impl std::error::Error for XmlError {
impl ObjectError { impl ObjectError {
pub(super) const fn error_code(&self) -> ErrorCode { pub(super) const fn error_code(&self) -> ErrorCode {
match self { match self {
Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR,
Self::S3(_) Self::S3(_)
| Self::RequestMiddleware(_) | Self::RequestMiddleware(_)
| Self::Request(_) | Self::Request(_)
@ -127,8 +117,6 @@ impl From<BlockingError> for ObjectError {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ObjectStore { pub(crate) struct ObjectStore {
path_gen: Generator,
repo: ArcRepo,
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
client: ClientWithMiddleware, client: ClientWithMiddleware,
@ -139,8 +127,6 @@ pub(crate) struct ObjectStore {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ObjectStoreConfig { pub(crate) struct ObjectStoreConfig {
path_gen: Generator,
repo: ArcRepo,
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
signature_expiration: u64, signature_expiration: u64,
@ -151,8 +137,6 @@ pub(crate) struct ObjectStoreConfig {
impl ObjectStoreConfig { impl ObjectStoreConfig {
pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore { pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore {
ObjectStore { ObjectStore {
path_gen: self.path_gen,
repo: self.repo,
bucket: self.bucket, bucket: self.bucket,
credentials: self.credentials, credentials: self.credentials,
client, client,
@ -431,7 +415,7 @@ enum UploadState {
impl ObjectStore { impl ObjectStore {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(access_key, secret_key, session_token, repo))] #[tracing::instrument(skip(access_key, secret_key, session_token))]
pub(crate) async fn build( pub(crate) async fn build(
endpoint: Url, endpoint: Url,
bucket_name: String, bucket_name: String,
@ -443,13 +427,8 @@ impl ObjectStore {
signature_expiration: u64, signature_expiration: u64,
client_timeout: u64, client_timeout: u64,
public_endpoint: Option<Url>, public_endpoint: Option<Url>,
repo: ArcRepo,
) -> Result<ObjectStoreConfig, StoreError> { ) -> Result<ObjectStoreConfig, StoreError> {
let path_gen = init_generator(&repo).await?;
Ok(ObjectStoreConfig { Ok(ObjectStoreConfig {
path_gen,
repo,
bucket: Bucket::new(endpoint, url_style, bucket_name, region) bucket: Bucket::new(endpoint, url_style, bucket_name, region)
.map_err(ObjectError::from)?, .map_err(ObjectError::from)?,
credentials: if let Some(token) = session_token { credentials: if let Some(token) = session_token {
@ -596,7 +575,7 @@ impl ObjectStore {
length: usize, length: usize,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<(RequestBuilder, Arc<str>), StoreError> { ) -> Result<(RequestBuilder, Arc<str>), StoreError> {
let path = self.next_file().await?; let path = self.next_file();
let mut action = self.bucket.put_object(Some(&self.credentials), &path); let mut action = self.bucket.put_object(Some(&self.credentials), &path);
@ -614,7 +593,7 @@ impl ObjectStore {
&self, &self,
content_type: mime::Mime, content_type: mime::Mime,
) -> Result<(RequestBuilder, Arc<str>), StoreError> { ) -> Result<(RequestBuilder, Arc<str>), StoreError> {
let path = self.next_file().await?; let path = self.next_file();
let mut action = self let mut action = self
.bucket .bucket
@ -784,39 +763,17 @@ impl ObjectStore {
self.build_request(action) self.build_request(action)
} }
async fn next_directory(&self) -> Result<Path, StoreError> { fn next_file(&self) -> String {
let path = self.path_gen.next(); let path = crate::file_path::generate_object();
self.repo
.set(GENERATOR_KEY, path.to_be_bytes().into())
.await?;
Ok(path)
}
async fn next_file(&self) -> Result<String, StoreError> {
let path = self.next_directory().await?.to_strings().join("/");
let filename = uuid::Uuid::new_v4().to_string(); let filename = uuid::Uuid::new_v4().to_string();
Ok(format!("{path}/{filename}")) format!("{path}/{filename}")
}
}
async fn init_generator(repo: &ArcRepo) -> Result<Generator, StoreError> {
if let Some(ivec) = repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())
.map_err(ObjectError::from)?,
))
} else {
Ok(Generator::new())
} }
} }
impl std::fmt::Debug for ObjectStore { impl std::fmt::Debug for ObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStore") f.debug_struct("ObjectStore")
.field("path_gen", &"generator")
.field("bucket", &self.bucket.name()) .field("bucket", &self.bucket.name())
.field("region", &self.bucket.region()) .field("region", &self.bucket.region())
.finish() .finish()