Implement storage migration

This commit is contained in:
Aode (lion) 2021-10-31 21:11:35 -05:00
parent 411e5ed9d0
commit ccb9f49b8b
8 changed files with 420 additions and 194 deletions

View file

@ -0,0 +1,9 @@
[from]
type = "file_store"
[to]
type = 's3_store'
bucket_name = 'pict-rs'
region = 'http://minio:9000'
access_key = '09ODZ3BGBISV4U92JLIM'
secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o'

View file

@ -9,6 +9,9 @@ pub(crate) struct Args {
#[structopt(short, long, help = "Path to the pict-rs configuration file")]
config_file: Option<PathBuf>,
#[structopt(long, help = "Path to a file defining a store migration")]
migrate_file: Option<PathBuf>,
#[structopt(flatten)]
overrides: Overrides,
}
@ -109,22 +112,45 @@ impl Overrides {
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct Migrate {
from: Store,
to: Store,
}
impl Migrate {
pub(crate) fn from(&self) -> &Store {
&self.from
}
pub(crate) fn to(&self) -> &Store {
&self.to
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, structopt::StructOpt)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub(crate) enum Store {
FileStore {
// defaults to {config.path}
#[structopt(long)]
#[structopt(
long,
help = "Path in which pict-rs will create it's 'files' directory"
)]
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<PathBuf>,
},
#[cfg(feature = "object-storage")]
S3Store {
#[structopt(long)]
#[structopt(long, help = "Name of the bucket in which pict-rs will store images")]
bucket_name: String,
#[structopt(long)]
#[structopt(
long,
help = "Region in which the bucket exists, can be an http endpoint"
)]
region: crate::serde_str::Serde<s3::Region>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -194,6 +220,14 @@ impl Config {
let mut base_config = config::Config::new();
base_config.merge(config::Config::try_from(&Defaults::new())?)?;
if let Some(path) = args.migrate_file {
let mut migrate_config = config::Config::new();
migrate_config.merge(config::File::from(path))?;
let migrate: Migrate = migrate_config.try_into()?;
crate::MIGRATE.set(migrate).unwrap();
}
if let Some(path) = args.config_file {
base_config.merge(config::File::from(path))?;
};

View file

@ -9,7 +9,7 @@ mod tokio_file {
use crate::{store::file_store::FileError, Either};
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{Stream, StreamExt};
use std::{io::SeekFrom, path::Path, pin::Pin};
use std::{io::SeekFrom, path::Path};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead};
@ -35,12 +35,11 @@ mod tokio_file {
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, mut stream: S) -> std::io::Result<()>
pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()>
where
S: Stream<Item = std::io::Result<Bytes>>,
{
// SAFETY: pinned stream shadows original stream so it cannot be moved
let mut stream = unsafe { Pin::new_unchecked(&mut stream) };
futures_util::pin_mut!(stream);
while let Some(res) = stream.next().await {
let mut bytes = res?;
@ -202,12 +201,11 @@ mod io_uring {
Ok(())
}
pub(crate) async fn write_from_stream<S>(&mut self, mut stream: S) -> std::io::Result<()>
pub(crate) async fn write_from_stream<S>(&mut self, stream: S) -> std::io::Result<()>
where
S: Stream<Item = std::io::Result<Bytes>>,
{
// SAFETY: pinned stream shadows original stream so it cannot be moved
let mut stream = unsafe { Pin::new_unchecked(&mut stream) };
futures_util::pin_mut!(stream);
let mut cursor: u64 = 0;
while let Some(res) = stream.next().await {

View file

@ -6,7 +6,7 @@ use actix_web::{
};
use awc::Client;
use futures_util::{stream::once, Stream};
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use std::{
collections::HashSet,
future::ready,
@ -46,7 +46,7 @@ use crate::{magick::details_hint, store::file_store::FileStore};
use self::{
concurrent_processor::CancelSafeProcessor,
config::{Config, Format},
config::{Config, Format, Migrate},
either::Either,
error::{Error, UploadError},
init_tracing::init_tracing,
@ -61,6 +61,7 @@ const MINUTES: u32 = 60;
const HOURS: u32 = 60 * MINUTES;
const DAYS: u32 = 24 * HOURS;
static MIGRATE: OnceCell<Migrate> = OnceCell::new();
static CONFIG: Lazy<Config> = Lazy::new(|| Config::build().unwrap());
static PROCESS_SEMAPHORE: Lazy<Semaphore> =
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)));
@ -69,7 +70,8 @@ static PROCESS_SEMAPHORE: Lazy<Semaphore> =
#[instrument(name = "Uploaded files", skip(value, manager))]
async fn upload<S: Store>(
value: Value<UploadManagerSession<S>>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
@ -91,11 +93,9 @@ where
let delete_token = image.result.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = if let Some(details) = details {
debug!("details exist");
@ -104,10 +104,10 @@ where
debug!("generating new details from {:?}", identifier);
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
Details::from_store((**store).clone(), identifier.clone(), hint).await?;
debug!("storing details for {:?} {}", identifier, name);
manager
.store_variant_details(identifier, name, &new_details)
.store_variant_details(&identifier, name, &new_details)
.await?;
debug!("stored");
new_details
@ -192,7 +192,8 @@ where
#[instrument(name = "Downloading file", skip(client, manager))]
async fn download<S: Store>(
client: web::Data<Client>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error>
where
@ -204,35 +205,31 @@ where
return Err(UploadError::Download(res.status()).into());
}
let mut stream = Limit::new(
let stream = Limit::new(
map_error::map_crate_error(res),
(CONFIG.max_file_size() * MEGABYTES) as u64,
);
// SAFETY: stream is shadowed, so original cannot not be moved
let stream = unsafe { Pin::new_unchecked(&mut stream) };
futures_util::pin_mut!(stream);
let permit = PROCESS_SEMAPHORE.acquire().await?;
let session = manager.session().upload(stream).await?;
let session = manager.session((**store).clone()).upload(stream).await?;
let alias = session.alias().unwrap().to_owned();
drop(permit);
let delete_token = session.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(identifier, name, &new_details)
.store_variant_details(&identifier, name, &new_details)
.await?;
new_details
};
@ -251,7 +248,8 @@ where
/// Delete aliases and files
#[instrument(name = "Deleting file", skip(manager))]
async fn delete<S: Store>(
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error>
where
@ -259,22 +257,19 @@ where
{
let (alias, token) = path_entries.into_inner();
manager.delete(token, alias).await?;
manager.delete((**store).clone(), token, alias).await?;
Ok(HttpResponse::NoContent().finish())
}
type ProcessQuery = Vec<(String, String)>;
async fn prepare_process<S: Store>(
async fn prepare_process(
query: web::Query<ProcessQuery>,
ext: &str,
manager: &UploadManager<S>,
manager: &UploadManager,
filters: &Option<HashSet<String>>,
) -> Result<(Format, String, PathBuf, Vec<String>), Error>
where
Error: From<S::Error>,
{
) -> Result<(Format, String, PathBuf, Vec<String>), Error> {
let (alias, operations) =
query
.into_inner()
@ -318,7 +313,8 @@ where
async fn process_details<S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error>
where
@ -328,11 +324,11 @@ where
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let identifier = manager
.variant_identifier(&thumbnail_path, &name)
.variant_identifier::<S>(&thumbnail_path, &name)
.await?
.ok_or(UploadError::MissingAlias)?;
let details = manager.variant_details(identifier, name).await?;
let details = manager.variant_details(&identifier, name).await?;
let details = details.ok_or(UploadError::NoFiles)?;
@ -345,7 +341,8 @@ async fn process<S: Store + 'static>(
range: Option<range::RangeHeader>,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error>
where
@ -354,29 +351,30 @@ where
let (format, name, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let identifier_opt = manager.variant_identifier(&thumbnail_path, &name).await?;
let identifier_opt = manager
.variant_identifier::<S>(&thumbnail_path, &name)
.await?;
if let Some(identifier) = identifier_opt {
let details_opt = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details_opt = manager.variant_details(&identifier, name.clone()).await?;
let details = if let Some(details) = details_opt {
details
} else {
let hint = details_hint(&name);
let details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(identifier.clone(), name, &details)
.store_variant_details(&identifier, name, &details)
.await?;
details
};
return ranged_file_resp(manager.store().clone(), identifier, range, details).await;
return ranged_file_resp(&**store, identifier, range, details).await;
}
let identifier = manager.still_identifier_from_filename(name.clone()).await?;
let identifier = manager
.still_identifier_from_filename((**store).clone(), name.clone())
.await?;
let thumbnail_path2 = thumbnail_path.clone();
let process_fut = async {
@ -385,7 +383,7 @@ where
let permit = PROCESS_SEMAPHORE.acquire().await?;
let mut processed_reader = crate::magick::process_image_store_read(
manager.store().clone(),
(**store).clone(),
identifier,
thumbnail_args,
format,
@ -410,7 +408,7 @@ where
let bytes2 = bytes.clone();
actix_rt::spawn(
async move {
let identifier = match manager.store().save_bytes(bytes2).await {
let identifier = match store.save_bytes(bytes2).await {
Ok(identifier) => identifier,
Err(e) => {
tracing::warn!("Failed to generate directory path: {}", e);
@ -418,7 +416,7 @@ where
}
};
if let Err(e) = manager
.store_variant_details(identifier.clone(), name.clone(), &details2)
.store_variant_details(&identifier, name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
@ -479,26 +477,24 @@ where
#[instrument(name = "Fetching details", skip(manager))]
async fn details<S: Store>(
alias: web::Path<String>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(identifier, name, &new_details)
.store_variant_details(&identifier, name, &new_details)
.await?;
new_details
};
@ -511,35 +507,33 @@ where
async fn serve<S: Store>(
range: Option<range::RangeHeader>,
alias: web::Path<String>,
manager: web::Data<UploadManager<S>>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename::<S>(name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = manager.variant_details(&identifier, name.clone()).await?;
let details = if let Some(details) = details {
details
} else {
let hint = details_hint(&name);
let details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(identifier.clone(), name, &details)
.store_variant_details(&identifier, name, &details)
.await?;
details
};
ranged_file_resp(manager.store().clone(), identifier, range, details).await
ranged_file_resp(&**store, identifier, range, details).await
}
async fn ranged_file_resp<S: Store>(
store: S,
store: &S,
identifier: S::Identifier,
range: Option<range::RangeHeader>,
details: Details,
@ -627,7 +621,8 @@ enum FileOrAlias {
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge<S: Store>(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager<S>>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
@ -639,7 +634,7 @@ where
for alias in aliases.iter() {
upload_manager
.delete_without_token(alias.to_owned())
.delete_without_token((**store).clone(), alias.to_owned())
.await?;
}
@ -652,7 +647,8 @@ where
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases<S: Store>(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager<S>>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
@ -676,7 +672,8 @@ struct ByAlias {
#[instrument(name = "Fetching filename", skip(upload_manager))]
async fn filename_by_alias<S: Store>(
query: web::Query<ByAlias>,
upload_manager: web::Data<UploadManager<S>>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
@ -695,7 +692,7 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
error
}
async fn launch<S: Store>(manager: UploadManager<S>) -> anyhow::Result<()>
async fn launch<S: Store + Clone + 'static>(manager: UploadManager, store: S) -> anyhow::Result<()>
where
S::Error: Unpin,
Error: From<S::Error>,
@ -704,6 +701,7 @@ where
//
// This form is expecting a single array field, 'images' with at most 10 files in it
let manager2 = manager.clone();
let store2 = store.clone();
let form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
@ -711,6 +709,7 @@ where
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
let store = store2.clone();
let manager = manager2.clone();
let span = tracing::info_span!("file-upload", ?filename);
@ -719,7 +718,7 @@ where
let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager
.session()
.session(store)
.upload(map_error::map_crate_error(stream))
.await;
@ -735,6 +734,7 @@ where
// This form is expecting a single array field, 'images' with at most 10 files in it
let validate_imports = CONFIG.validate_imports();
let manager2 = manager.clone();
let store2 = store.clone();
let import_form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
@ -743,6 +743,7 @@ where
"images",
Field::array(Field::file(move |filename, _, stream| {
let manager = manager2.clone();
let store = store2.clone();
let span = tracing::info_span!("file-import", ?filename);
@ -750,7 +751,7 @@ where
let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager
.session()
.session(store)
.import(
filename,
validate_imports,
@ -773,6 +774,7 @@ where
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.app_data(web::Data::new(store.clone()))
.app_data(web::Data::new(manager.clone()))
.app_data(web::Data::new(client))
.app_data(web::Data::new(CONFIG.allowed_filters()))
@ -828,23 +830,108 @@ where
Ok(())
}
async fn migrate_inner<S1>(
manager: &UploadManager,
db: &sled::Db,
from: S1,
to: &config::Store,
) -> anyhow::Result<()>
where
S1: Store,
Error: From<S1::Error>,
{
match to {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let to = FileStore::build(path, &db)?;
manager.restructure(&to).await?;
manager.migrate_store::<S1, FileStore>(from, to).await?;
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
} => {
use store::object_store::ObjectStore;
let to = ObjectStore::build(
bucket_name,
(**region).clone(),
access_key.clone(),
secret_key.clone(),
security_token.clone(),
session_token.clone(),
&db,
)?;
manager.migrate_store::<S1, ObjectStore>(from, to).await?;
}
}
Ok(())
}
#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
init_tracing("pict-rs", CONFIG.opentelemetry_url())?;
let root_dir = CONFIG.data_dir();
let db = LatestDb::exists(root_dir.clone()).migrate()?;
let db = LatestDb::exists(CONFIG.data_dir()).migrate()?;
let manager = UploadManager::new(db.clone(), CONFIG.format()).await?;
if let Some(m) = MIGRATE.get() {
let from = m.from();
let to = m.to();
match from {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let from = FileStore::build(path, &db)?;
manager.restructure(&from).await?;
migrate_inner(&manager, &db, from, to).await?;
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
bucket_name,
region,
access_key,
secret_key,
security_token,
session_token,
} => {
let from = crate::store::object_store::ObjectStore::build(
bucket_name,
(**region).clone(),
access_key.clone(),
secret_key.clone(),
security_token.clone(),
session_token.clone(),
&db,
)?;
migrate_inner(&manager, &db, from, to).await?;
}
}
return Ok(());
}
match CONFIG.store() {
config::Store::FileStore { path } => {
let path = path.to_owned().unwrap_or_else(|| root_dir.clone());
let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir());
let store = FileStore::build(path, &db)?;
manager.restructure(&store).await?;
let manager = UploadManager::new(store, db, CONFIG.format()).await?;
manager.restructure().await?;
launch(manager).await
launch(manager, store).await
}
#[cfg(feature = "object-storage")]
config::Store::S3Store {
@ -865,8 +952,7 @@ async fn main() -> anyhow::Result<()> {
&db,
)?;
let manager = UploadManager::new(store, db, CONFIG.format()).await?;
launch(manager).await
launch(manager, store).await
}
}
}

View file

@ -57,7 +57,7 @@ impl Range {
pub(crate) async fn chop_store<S: Store>(
&self,
store: S,
store: &S,
identifier: S::Identifier,
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error>
where

View file

@ -8,10 +8,10 @@ use std::path::{Path, PathBuf};
const RESTRUCTURE_COMPLETE: &[u8] = b"fs-restructure-01-complete";
const DETAILS: &[u8] = b"details";
impl UploadManager<FileStore> {
impl UploadManager {
#[tracing::instrument(skip(self))]
pub(crate) async fn restructure(&self) -> Result<(), Error> {
if self.restructure_complete()? {
pub(crate) async fn restructure(&self, store: &FileStore) -> Result<(), Error> {
if self.restructure_complete(store)? {
return Ok(());
}
@ -20,13 +20,13 @@ impl UploadManager<FileStore> {
let filename = String::from_utf8(filename.to_vec())?;
tracing::info!("Migrating {}", filename);
let file_path = self.store().root_dir.join("files").join(&filename);
let file_path = store.root_dir.join("files").join(&filename);
if tokio::fs::metadata(&file_path).await.is_ok() {
let target_path = self.store().next_directory()?.join(&filename);
let target_path = store.next_directory()?.join(&filename);
let target_path_bytes = self
.generalize_path(&target_path)?
.generalize_path(store, &target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
@ -36,7 +36,7 @@ impl UploadManager<FileStore> {
.identifier_tree
.insert(filename.as_bytes(), target_path_bytes)?;
self.store().safe_move_file(file_path, target_path).await?;
store.safe_move_file(file_path, target_path).await?;
}
let (start, end) = variant_key_bounds(&hash);
@ -48,25 +48,26 @@ impl UploadManager<FileStore> {
let variant_path =
PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?);
if tokio::fs::metadata(&variant_path).await.is_ok() {
let target_path = self.store().next_directory()?.join(&filename);
let target_path = store.next_directory()?.join(&filename);
let relative_target_path_bytes = self
.generalize_path(&target_path)?
.generalize_path(store, &target_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let variant_key = self.migrate_variant_key(&variant_path, &filename)?;
let variant_key =
self.migrate_variant_key(store, &variant_path, &filename)?;
self.inner()
.identifier_tree
.insert(variant_key, relative_target_path_bytes)?;
self.store()
store
.safe_move_file(variant_path.clone(), target_path)
.await?;
self.store().try_remove_parents(&variant_path).await;
store.try_remove_parents(&variant_path).await;
}
}
@ -74,37 +75,32 @@ impl UploadManager<FileStore> {
}
}
self.mark_restructure_complete()?;
self.mark_restructure_complete(store)?;
Ok(())
}
fn restructure_complete(&self) -> Result<bool, Error> {
Ok(self
.store()
.settings_tree
.get(RESTRUCTURE_COMPLETE)?
.is_some())
fn restructure_complete(&self, store: &FileStore) -> Result<bool, Error> {
Ok(store.settings_tree.get(RESTRUCTURE_COMPLETE)?.is_some())
}
fn mark_restructure_complete(&self) -> Result<(), Error> {
self.store()
.settings_tree
.insert(RESTRUCTURE_COMPLETE, b"true")?;
fn mark_restructure_complete(&self, store: &FileStore) -> Result<(), Error> {
store.settings_tree.insert(RESTRUCTURE_COMPLETE, b"true")?;
Ok(())
}
fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&self.store().root_dir)?)
fn generalize_path<'a>(&self, store: &FileStore, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&store.root_dir)?)
}
fn migrate_variant_key(
&self,
store: &FileStore,
variant_process_path: &Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self
.generalize_path(variant_process_path)?
.generalize_path(&store, variant_process_path)?
.strip_prefix("files")?;
self.variant_key(path, filename)

View file

@ -33,15 +33,18 @@ pub(super) use session::UploadManagerSession;
// - filename -> hash
// - Details Tree
// - filename / S::Identifier -> details
// - Path Tree
// - Identifier Tree
// - filename -> S::Identifier
// - filename / variant path -> S::Identifier
// - filename / motion -> S::Identifier
// - Settings Tree
// - store-migration-progress -> Path Tree Key
const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress";
#[derive(Clone)]
pub(crate) struct UploadManager<S> {
pub(crate) struct UploadManager {
inner: Arc<UploadManagerInner>,
store: S,
}
pub(crate) struct UploadManagerInner {
@ -51,6 +54,7 @@ pub(crate) struct UploadManagerInner {
pub(crate) filename_tree: sled::Tree,
pub(crate) main_tree: sled::Tree,
details_tree: sled::Tree,
settings_tree: sled::Tree,
pub(crate) identifier_tree: sled::Tree,
db: sled::Db,
}
@ -67,70 +71,115 @@ struct FilenameIVec {
inner: sled::IVec,
}
impl<S> UploadManager<S>
where
S: Store + 'static,
Error: From<S::Error>,
{
impl UploadManager {
/// Create a new UploadManager
pub(crate) async fn new(store: S, db: sled::Db, format: Option<Format>) -> Result<Self, Error> {
pub(crate) async fn new(db: sled::Db, format: Option<Format>) -> Result<Self, Error> {
let manager = UploadManager {
inner: Arc::new(UploadManagerInner {
format,
hasher: sha2::Sha256::new(),
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
details_tree: db.open_tree("details")?,
main_tree: db.open_tree("main")?,
details_tree: db.open_tree("details")?,
settings_tree: db.open_tree("settings")?,
identifier_tree: db.open_tree("path")?,
db,
}),
store,
};
Ok(manager)
}
pub(crate) fn store(&self) -> &S {
&self.store
pub(crate) async fn migrate_store<S1, S2>(&self, from: S1, to: S2) -> Result<(), Error>
where
S1: Store,
S2: Store,
Error: From<S1::Error> + From<S2::Error>,
{
let iter =
if let Some(starting_line) = self.inner.settings_tree.get(STORE_MIGRATION_PROGRESS)? {
self.inner.identifier_tree.range(starting_line..)
} else {
self.inner.identifier_tree.iter()
};
for res in iter {
let (key, identifier) = res?;
let identifier = S1::Identifier::from_bytes(identifier.to_vec())?;
let filename =
if let Some((filename, _)) = String::from_utf8_lossy(&key).split_once('/') {
filename.to_string()
} else {
String::from_utf8_lossy(&key).to_string()
};
let stream = from.to_stream(&identifier, None, None).await?;
futures_util::pin_mut!(stream);
let mut reader = tokio_util::io::StreamReader::new(stream);
let new_identifier = to.save_async_read(&mut reader).await?;
let details_key = self.details_key(&identifier, &filename)?;
if let Some(details) = self.inner.details_tree.get(details_key.clone())? {
let new_details_key = self.details_key(&new_identifier, &filename)?;
self.inner.details_tree.insert(new_details_key, details)?;
}
self.inner
.identifier_tree
.insert(key.clone(), new_identifier.to_bytes()?)?;
self.inner.details_tree.remove(details_key)?;
self.inner
.settings_tree
.insert(STORE_MIGRATION_PROGRESS, key)?;
}
Ok(())
}
pub(crate) fn inner(&self) -> &UploadManagerInner {
&self.inner
}
pub(crate) async fn still_identifier_from_filename(
pub(crate) async fn still_identifier_from_filename<S: Store + Clone>(
&self,
store: S,
filename: String,
) -> Result<S::Identifier, Error> {
let identifier = self.identifier_from_filename(filename.clone()).await?;
let details = if let Some(details) = self
.variant_details(identifier.clone(), filename.clone())
.await?
{
details
} else {
let hint = details_hint(&filename);
Details::from_store(self.store.clone(), identifier.clone(), hint).await?
};
) -> Result<S::Identifier, Error>
where
Error: From<S::Error>,
{
let identifier = self.identifier_from_filename::<S>(filename.clone()).await?;
let details =
if let Some(details) = self.variant_details(&identifier, filename.clone()).await? {
details
} else {
let hint = details_hint(&filename);
Details::from_store(store.clone(), identifier.clone(), hint).await?
};
if !details.is_motion() {
return Ok(identifier);
}
if let Some(motion_identifier) = self.motion_identifier(&filename).await? {
if let Some(motion_identifier) = self.motion_identifier::<S>(&filename).await? {
return Ok(motion_identifier);
}
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let mut reader = crate::ffmpeg::thumbnail(
self.store.clone(),
store.clone(),
identifier,
InputFormat::Mp4,
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = self.store.save_async_read(&mut reader).await?;
let motion_identifier = store.save_async_read(&mut reader).await?;
drop(permit);
self.store_motion_path(&filename, &motion_identifier)
@ -138,7 +187,13 @@ where
Ok(motion_identifier)
}
async fn motion_identifier(&self, filename: &str) -> Result<Option<S::Identifier>, Error> {
async fn motion_identifier<S: Store>(
&self,
filename: &str,
) -> Result<Option<S::Identifier>, Error>
where
Error: From<S::Error>,
{
let identifier_tree = self.inner.identifier_tree.clone();
let motion_key = format!("{}/motion", filename);
@ -151,11 +206,14 @@ where
Ok(None)
}
async fn store_motion_path(
async fn store_motion_path<I: Identifier>(
&self,
filename: &str,
identifier: &S::Identifier,
) -> Result<(), Error> {
identifier: &I,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let identifier_bytes = identifier.to_bytes()?;
let motion_key = format!("{}/motion", filename);
let identifier_tree = self.inner.identifier_tree.clone();
@ -166,10 +224,13 @@ where
}
#[instrument(skip(self))]
pub(crate) async fn identifier_from_filename(
pub(crate) async fn identifier_from_filename<S: Store>(
&self,
filename: String,
) -> Result<S::Identifier, Error> {
) -> Result<S::Identifier, Error>
where
Error: From<S::Error>,
{
let identifier_tree = self.inner.identifier_tree.clone();
let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes()))
.await??
@ -181,11 +242,14 @@ where
}
#[instrument(skip(self))]
async fn store_identifier(
async fn store_identifier<I: Identifier>(
&self,
filename: String,
identifier: &S::Identifier,
) -> Result<(), Error> {
identifier: &I,
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let identifier_bytes = identifier.to_bytes()?;
let identifier_tree = self.inner.identifier_tree.clone();
web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??;
@ -193,11 +257,14 @@ where
}
#[instrument(skip(self))]
pub(crate) async fn variant_identifier(
pub(crate) async fn variant_identifier<S: Store>(
&self,
process_path: &std::path::Path,
filename: &str,
) -> Result<Option<S::Identifier>, Error> {
) -> Result<Option<S::Identifier>, Error>
where
Error: From<S::Error>,
{
let key = self.variant_key(process_path, filename)?;
let identifier_tree = self.inner.identifier_tree.clone();
let path_opt = web::block(move || identifier_tree.get(key)).await??;
@ -212,12 +279,15 @@ where
/// Store the path to a generated image variant so we can easily clean it up later
#[instrument(skip(self))]
pub(crate) async fn store_variant(
pub(crate) async fn store_variant<I: Identifier>(
&self,
variant_process_path: Option<&std::path::Path>,
identifier: &S::Identifier,
identifier: &I,
filename: &str,
) -> Result<(), Error> {
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let key = if let Some(path) = variant_process_path {
self.variant_key(path, filename)?
} else {
@ -238,11 +308,14 @@ where
/// Get the image details for a given variant
#[instrument(skip(self))]
pub(crate) async fn variant_details(
pub(crate) async fn variant_details<I: Identifier>(
&self,
identifier: S::Identifier,
identifier: &I,
filename: String,
) -> Result<Option<Details>, Error> {
) -> Result<Option<Details>, Error>
where
Error: From<I::Error>,
{
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
@ -260,12 +333,15 @@ where
}
#[instrument(skip(self))]
pub(crate) async fn store_variant_details(
pub(crate) async fn store_variant_details<I: Identifier>(
&self,
identifier: S::Identifier,
identifier: &I,
filename: String,
details: &Details,
) -> Result<(), Error> {
) -> Result<(), Error>
where
Error: From<I::Error>,
{
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
let details_value = serde_json::to_vec(details)?;
@ -322,19 +398,35 @@ where
}
/// Delete an alias without a delete token
pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), Error> {
pub(crate) async fn delete_without_token<S: Store + 'static>(
&self,
store: S,
alias: String,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
let token_key = delete_key(&alias);
let alias_tree = self.inner.alias_tree.clone();
let token = web::block(move || alias_tree.get(token_key.as_bytes()))
.await??
.ok_or(UploadError::MissingAlias)?;
self.delete(alias, String::from_utf8(token.to_vec())?).await
self.delete(store, alias, String::from_utf8(token.to_vec())?)
.await
}
/// Delete the alias, and the file & variants if no more aliases exist
#[instrument(skip(self, alias, token))]
pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), Error> {
pub(crate) async fn delete<S: Store + 'static>(
&self,
store: S,
alias: String,
token: String,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
use sled::Transactional;
let main_tree = self.inner.main_tree.clone();
let alias_tree = self.inner.alias_tree.clone();
@ -381,10 +473,17 @@ where
})
.await??;
self.check_delete_files(hash).await
self.check_delete_files(store, hash).await
}
async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), Error> {
async fn check_delete_files<S: Store + 'static>(
&self,
store: S,
hash: sled::IVec,
) -> Result<(), Error>
where
Error: From<S::Error>,
{
// -- CHECK IF ANY OTHER ALIASES EXIST --
let main_tree = self.inner.main_tree.clone();
let (start, end) = alias_key_bounds(&hash);
@ -420,7 +519,7 @@ where
actix_rt::spawn(
async move {
if let Err(e) = this
.cleanup_files(FilenameIVec::new(filename.clone()))
.cleanup_files(store, FilenameIVec::new(filename.clone()))
.await
{
error!("Error removing files from fs, {}", e);
@ -456,13 +555,19 @@ where
Ok(filename)
}
pub(crate) fn session(&self) -> UploadManagerSession<S> {
UploadManagerSession::new(self.clone())
pub(crate) fn session<S: Store + Clone + 'static>(&self, store: S) -> UploadManagerSession<S>
where
Error: From<S::Error>,
{
UploadManagerSession::new(self.clone(), store)
}
// Find image variants and remove them from the DB and the disk
#[instrument(skip(self))]
async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> {
async fn cleanup_files<S: Store>(&self, store: S, filename: FilenameIVec) -> Result<(), Error>
where
Error: From<S::Error>,
{
let filename = filename.inner;
let filename2 = filename.clone();
@ -473,7 +578,7 @@ where
if let Some(identifier) = identifier {
let identifier = S::Identifier::from_bytes(identifier.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = self.store.remove(&identifier).await {
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
}
@ -500,7 +605,7 @@ where
let identifier = S::Identifier::from_bytes(id.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = self.store.remove(&identifier).await {
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
}
@ -537,7 +642,10 @@ where
Ok(vec)
}
fn details_key(&self, identifier: S::Identifier, filename: &str) -> Result<Vec<u8>, Error> {
fn details_key<I: Identifier>(&self, identifier: &I, filename: &str) -> Result<Vec<u8>, Error>
where
Error: From<I::Error>,
{
let mut vec = filename.as_bytes().to_vec();
vec.extend(b"/");
vec.extend(&identifier.to_bytes()?);
@ -629,7 +737,7 @@ fn delete_key(alias: &str) -> String {
format!("{}/delete", alias)
}
impl<S> std::fmt::Debug for UploadManager<S> {
impl std::fmt::Debug for UploadManager {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager").finish()
}

View file

@ -15,21 +15,23 @@ use tracing::{debug, instrument, warn, Span};
use tracing_futures::Instrument;
use uuid::Uuid;
pub(crate) struct UploadManagerSession<S: Store>
pub(crate) struct UploadManagerSession<S: Store + Clone + 'static>
where
Error: From<S::Error>,
{
manager: UploadManager<S>,
store: S,
manager: UploadManager,
alias: Option<String>,
finished: bool,
}
impl<S: Store> UploadManagerSession<S>
impl<S: Store + Clone + 'static> UploadManagerSession<S>
where
Error: From<S::Error>,
{
pub(super) fn new(manager: UploadManager<S>) -> Self {
pub(super) fn new(manager: UploadManager, store: S) -> Self {
UploadManagerSession {
store,
manager,
alias: None,
finished: false,
@ -56,7 +58,7 @@ impl Dup {
}
}
impl<S: Store> Drop for UploadManagerSession<S>
impl<S: Store + Clone + 'static> Drop for UploadManagerSession<S>
where
Error: From<S::Error>,
{
@ -66,6 +68,7 @@ where
}
if let Some(alias) = self.alias.take() {
let store = self.store.clone();
let manager = self.manager.clone();
let cleanup_span = tracing::info_span!(
parent: None,
@ -89,7 +92,7 @@ where
let _ = manager.inner.main_tree.remove(&key);
}
let _ = manager.check_delete_files(hash).await;
let _ = manager.check_delete_files(store, hash).await;
}
}
.instrument(cleanup_span),
@ -164,11 +167,7 @@ where
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let identifier = self
.manager
.store
.save_async_read(&mut hasher_reader)
.await?;
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Storing alias");
@ -206,11 +205,7 @@ where
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let identifier = self
.manager
.store
.save_async_read(&mut hasher_reader)
.await?;
let identifier = self.store.save_async_read(&mut hasher_reader).await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
@ -236,7 +231,7 @@ where
if dup.exists() {
debug!("Duplicate exists, removing file");
self.manager.store.remove(identifier).await?;
self.store.remove(identifier).await?;
return Ok(());
}