diff --git a/docker/object-storage/migrate.toml b/docker/object-storage/migrate.toml new file mode 100644 index 0000000..a40c4ef --- /dev/null +++ b/docker/object-storage/migrate.toml @@ -0,0 +1,9 @@ +[from] +type = "file_store" + +[to] +type = 's3_store' +bucket_name = 'pict-rs' +region = 'http://minio:9000' +access_key = '09ODZ3BGBISV4U92JLIM' +secret_key = 'j35YE9RrxhBP0dpiD5mmdXRXvPkEJR4k6zK12q3o' diff --git a/src/config.rs b/src/config.rs index f958f62..affeb2d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,9 @@ pub(crate) struct Args { #[structopt(short, long, help = "Path to the pict-rs configuration file")] config_file: Option, + #[structopt(long, help = "Path to a file defining a store migration")] + migrate_file: Option, + #[structopt(flatten)] overrides: Overrides, } @@ -109,22 +112,45 @@ impl Overrides { } } +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) struct Migrate { + from: Store, + to: Store, +} + +impl Migrate { + pub(crate) fn from(&self) -> &Store { + &self.from + } + + pub(crate) fn to(&self) -> &Store { + &self.to + } +} + #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, structopt::StructOpt)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] pub(crate) enum Store { FileStore { // defaults to {config.path} - #[structopt(long)] + #[structopt( + long, + help = "Path in which pict-rs will create it's 'files' directory" + )] #[serde(skip_serializing_if = "Option::is_none")] path: Option, }, #[cfg(feature = "object-storage")] S3Store { - #[structopt(long)] + #[structopt(long, help = "Name of the bucket in which pict-rs will store images")] bucket_name: String, - #[structopt(long)] + #[structopt( + long, + help = "Region in which the bucket exists, can be an http endpoint" + )] region: crate::serde_str::Serde, #[serde(skip_serializing_if = "Option::is_none")] @@ -194,6 +220,14 @@ impl Config { let mut base_config = config::Config::new(); base_config.merge(config::Config::try_from(&Defaults::new())?)?; + if let Some(path) = args.migrate_file { + let mut migrate_config = config::Config::new(); + migrate_config.merge(config::File::from(path))?; + let migrate: Migrate = migrate_config.try_into()?; + + crate::MIGRATE.set(migrate).unwrap(); + } + if let Some(path) = args.config_file { base_config.merge(config::File::from(path))?; }; diff --git a/src/file.rs b/src/file.rs index fb997b8..1c9f093 100644 --- a/src/file.rs +++ b/src/file.rs @@ -9,7 +9,7 @@ mod tokio_file { use crate::{store::file_store::FileError, Either}; use actix_web::web::{Bytes, BytesMut}; use futures_util::stream::{Stream, StreamExt}; - use std::{io::SeekFrom, path::Path, pin::Pin}; + use std::{io::SeekFrom, path::Path}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -35,12 +35,11 @@ mod tokio_file { Ok(()) } - pub(crate) async fn write_from_stream(&mut self, mut stream: S) -> std::io::Result<()> + pub(crate) async fn write_from_stream(&mut self, stream: S) -> std::io::Result<()> where S: Stream>, { - // SAFETY: pinned stream shadows original stream so it cannot be moved - let mut stream = unsafe { Pin::new_unchecked(&mut stream) }; + futures_util::pin_mut!(stream); while let Some(res) = stream.next().await { let mut bytes = res?; @@ -202,12 +201,11 @@ mod io_uring { Ok(()) } - pub(crate) async fn write_from_stream(&mut self, mut stream: S) -> std::io::Result<()> + pub(crate) async fn write_from_stream(&mut self, stream: S) -> std::io::Result<()> where S: Stream>, { - // SAFETY: pinned stream shadows original stream so it cannot be moved - let mut stream = unsafe { Pin::new_unchecked(&mut stream) }; + futures_util::pin_mut!(stream); let mut cursor: u64 = 0; while let Some(res) = stream.next().await { diff --git a/src/main.rs b/src/main.rs index e0d516c..2a8d209 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use actix_web::{ }; use awc::Client; use futures_util::{stream::once, Stream}; -use once_cell::sync::Lazy; +use once_cell::sync::{Lazy, OnceCell}; use std::{ collections::HashSet, future::ready, @@ -46,7 +46,7 @@ use crate::{magick::details_hint, store::file_store::FileStore}; use self::{ concurrent_processor::CancelSafeProcessor, - config::{Config, Format}, + config::{Config, Format, Migrate}, either::Either, error::{Error, UploadError}, init_tracing::init_tracing, @@ -61,6 +61,7 @@ const MINUTES: u32 = 60; const HOURS: u32 = 60 * MINUTES; const DAYS: u32 = 24 * HOURS; +static MIGRATE: OnceCell = OnceCell::new(); static CONFIG: Lazy = Lazy::new(|| Config::build().unwrap()); static PROCESS_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))); @@ -69,7 +70,8 @@ static PROCESS_SEMAPHORE: Lazy = #[instrument(name = "Uploaded files", skip(value, manager))] async fn upload( value: Value>, - manager: web::Data>, + manager: web::Data, + store: web::Data, ) -> Result where Error: From, @@ -91,11 +93,9 @@ where let delete_token = image.result.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let identifier = manager.identifier_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename::(name.clone()).await?; - let details = manager - .variant_details(identifier.clone(), name.clone()) - .await?; + let details = manager.variant_details(&identifier, name.clone()).await?; let details = if let Some(details) = details { debug!("details exist"); @@ -104,10 +104,10 @@ where debug!("generating new details from {:?}", identifier); let hint = details_hint(&name); let new_details = - Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + Details::from_store((**store).clone(), identifier.clone(), hint).await?; debug!("storing details for {:?} {}", identifier, name); manager - .store_variant_details(identifier, name, &new_details) + .store_variant_details(&identifier, name, &new_details) .await?; debug!("stored"); new_details @@ -192,7 +192,8 @@ where #[instrument(name = "Downloading file", skip(client, manager))] async fn download( client: web::Data, - manager: web::Data>, + manager: web::Data, + store: web::Data, query: web::Query, ) -> Result where @@ -204,35 +205,31 @@ where return Err(UploadError::Download(res.status()).into()); } - let mut stream = Limit::new( + let stream = Limit::new( map_error::map_crate_error(res), (CONFIG.max_file_size() * MEGABYTES) as u64, ); - // SAFETY: stream is shadowed, so original cannot not be moved - let stream = unsafe { Pin::new_unchecked(&mut stream) }; + futures_util::pin_mut!(stream); let permit = PROCESS_SEMAPHORE.acquire().await?; - let session = manager.session().upload(stream).await?; + let session = manager.session((**store).clone()).upload(stream).await?; let alias = session.alias().unwrap().to_owned(); drop(permit); let delete_token = session.delete_token().await?; let name = manager.from_alias(alias.to_owned()).await?; - let identifier = manager.identifier_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename::(name.clone()).await?; - let details = manager - .variant_details(identifier.clone(), name.clone()) - .await?; + let details = manager.variant_details(&identifier, name.clone()).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&name); - let new_details = - Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; manager - .store_variant_details(identifier, name, &new_details) + .store_variant_details(&identifier, name, &new_details) .await?; new_details }; @@ -251,7 +248,8 @@ where /// Delete aliases and files #[instrument(name = "Deleting file", skip(manager))] async fn delete( - manager: web::Data>, + manager: web::Data, + store: web::Data, path_entries: web::Path<(String, String)>, ) -> Result where @@ -259,22 +257,19 @@ where { let (alias, token) = path_entries.into_inner(); - manager.delete(token, alias).await?; + manager.delete((**store).clone(), token, alias).await?; Ok(HttpResponse::NoContent().finish()) } type ProcessQuery = Vec<(String, String)>; -async fn prepare_process( +async fn prepare_process( query: web::Query, ext: &str, - manager: &UploadManager, + manager: &UploadManager, filters: &Option>, -) -> Result<(Format, String, PathBuf, Vec), Error> -where - Error: From, -{ +) -> Result<(Format, String, PathBuf, Vec), Error> { let (alias, operations) = query .into_inner() @@ -318,7 +313,8 @@ where async fn process_details( query: web::Query, ext: web::Path, - manager: web::Data>, + manager: web::Data, + store: web::Data, filters: web::Data>>, ) -> Result where @@ -328,11 +324,11 @@ where prepare_process(query, ext.as_str(), &manager, &filters).await?; let identifier = manager - .variant_identifier(&thumbnail_path, &name) + .variant_identifier::(&thumbnail_path, &name) .await? .ok_or(UploadError::MissingAlias)?; - let details = manager.variant_details(identifier, name).await?; + let details = manager.variant_details(&identifier, name).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -345,7 +341,8 @@ async fn process( range: Option, query: web::Query, ext: web::Path, - manager: web::Data>, + manager: web::Data, + store: web::Data, filters: web::Data>>, ) -> Result where @@ -354,29 +351,30 @@ where let (format, name, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str(), &manager, &filters).await?; - let identifier_opt = manager.variant_identifier(&thumbnail_path, &name).await?; + let identifier_opt = manager + .variant_identifier::(&thumbnail_path, &name) + .await?; if let Some(identifier) = identifier_opt { - let details_opt = manager - .variant_details(identifier.clone(), name.clone()) - .await?; + let details_opt = manager.variant_details(&identifier, name.clone()).await?; let details = if let Some(details) = details_opt { details } else { let hint = details_hint(&name); - let details = - Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; manager - .store_variant_details(identifier.clone(), name, &details) + .store_variant_details(&identifier, name, &details) .await?; details }; - return ranged_file_resp(manager.store().clone(), identifier, range, details).await; + return ranged_file_resp(&**store, identifier, range, details).await; } - let identifier = manager.still_identifier_from_filename(name.clone()).await?; + let identifier = manager + .still_identifier_from_filename((**store).clone(), name.clone()) + .await?; let thumbnail_path2 = thumbnail_path.clone(); let process_fut = async { @@ -385,7 +383,7 @@ where let permit = PROCESS_SEMAPHORE.acquire().await?; let mut processed_reader = crate::magick::process_image_store_read( - manager.store().clone(), + (**store).clone(), identifier, thumbnail_args, format, @@ -410,7 +408,7 @@ where let bytes2 = bytes.clone(); actix_rt::spawn( async move { - let identifier = match manager.store().save_bytes(bytes2).await { + let identifier = match store.save_bytes(bytes2).await { Ok(identifier) => identifier, Err(e) => { tracing::warn!("Failed to generate directory path: {}", e); @@ -418,7 +416,7 @@ where } }; if let Err(e) = manager - .store_variant_details(identifier.clone(), name.clone(), &details2) + .store_variant_details(&identifier, name.clone(), &details2) .await { tracing::warn!("Error saving variant details: {}", e); @@ -479,26 +477,24 @@ where #[instrument(name = "Fetching details", skip(manager))] async fn details( alias: web::Path, - manager: web::Data>, + manager: web::Data, + store: web::Data, ) -> Result where Error: From, { let name = manager.from_alias(alias.into_inner()).await?; - let identifier = manager.identifier_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename::(name.clone()).await?; - let details = manager - .variant_details(identifier.clone(), name.clone()) - .await?; + let details = manager.variant_details(&identifier, name.clone()).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&name); - let new_details = - Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + let new_details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; manager - .store_variant_details(identifier, name, &new_details) + .store_variant_details(&identifier, name, &new_details) .await?; new_details }; @@ -511,35 +507,33 @@ where async fn serve( range: Option, alias: web::Path, - manager: web::Data>, + manager: web::Data, + store: web::Data, ) -> Result where Error: From, { let name = manager.from_alias(alias.into_inner()).await?; - let identifier = manager.identifier_from_filename(name.clone()).await?; + let identifier = manager.identifier_from_filename::(name.clone()).await?; - let details = manager - .variant_details(identifier.clone(), name.clone()) - .await?; + let details = manager.variant_details(&identifier, name.clone()).await?; let details = if let Some(details) = details { details } else { let hint = details_hint(&name); - let details = - Details::from_store(manager.store().clone(), identifier.clone(), hint).await?; + let details = Details::from_store((**store).clone(), identifier.clone(), hint).await?; manager - .store_variant_details(identifier.clone(), name, &details) + .store_variant_details(&identifier, name, &details) .await?; details }; - ranged_file_resp(manager.store().clone(), identifier, range, details).await + ranged_file_resp(&**store, identifier, range, details).await } async fn ranged_file_resp( - store: S, + store: &S, identifier: S::Identifier, range: Option, details: Details, @@ -627,7 +621,8 @@ enum FileOrAlias { #[instrument(name = "Purging file", skip(upload_manager))] async fn purge( query: web::Query, - upload_manager: web::Data>, + upload_manager: web::Data, + store: web::Data, ) -> Result where Error: From, @@ -639,7 +634,7 @@ where for alias in aliases.iter() { upload_manager - .delete_without_token(alias.to_owned()) + .delete_without_token((**store).clone(), alias.to_owned()) .await?; } @@ -652,7 +647,8 @@ where #[instrument(name = "Fetching aliases", skip(upload_manager))] async fn aliases( query: web::Query, - upload_manager: web::Data>, + upload_manager: web::Data, + store: web::Data, ) -> Result where Error: From, @@ -676,7 +672,8 @@ struct ByAlias { #[instrument(name = "Fetching filename", skip(upload_manager))] async fn filename_by_alias( query: web::Query, - upload_manager: web::Data>, + upload_manager: web::Data, + store: web::Data, ) -> Result where Error: From, @@ -695,7 +692,7 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { error } -async fn launch(manager: UploadManager) -> anyhow::Result<()> +async fn launch(manager: UploadManager, store: S) -> anyhow::Result<()> where S::Error: Unpin, Error: From, @@ -704,6 +701,7 @@ where // // This form is expecting a single array field, 'images' with at most 10 files in it let manager2 = manager.clone(); + let store2 = store.clone(); let form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) @@ -711,6 +709,7 @@ where .field( "images", Field::array(Field::file(move |filename, _, stream| { + let store = store2.clone(); let manager = manager2.clone(); let span = tracing::info_span!("file-upload", ?filename); @@ -719,7 +718,7 @@ where let permit = PROCESS_SEMAPHORE.acquire().await?; let res = manager - .session() + .session(store) .upload(map_error::map_crate_error(stream)) .await; @@ -735,6 +734,7 @@ where // This form is expecting a single array field, 'images' with at most 10 files in it let validate_imports = CONFIG.validate_imports(); let manager2 = manager.clone(); + let store2 = store.clone(); let import_form = Form::new() .max_files(10) .max_file_size(CONFIG.max_file_size() * MEGABYTES) @@ -743,6 +743,7 @@ where "images", Field::array(Field::file(move |filename, _, stream| { let manager = manager2.clone(); + let store = store2.clone(); let span = tracing::info_span!("file-import", ?filename); @@ -750,7 +751,7 @@ where let permit = PROCESS_SEMAPHORE.acquire().await?; let res = manager - .session() + .session(store) .import( filename, validate_imports, @@ -773,6 +774,7 @@ where App::new() .wrap(TracingLogger::default()) .wrap(Deadline) + .app_data(web::Data::new(store.clone())) .app_data(web::Data::new(manager.clone())) .app_data(web::Data::new(client)) .app_data(web::Data::new(CONFIG.allowed_filters())) @@ -828,23 +830,108 @@ where Ok(()) } +async fn migrate_inner( + manager: &UploadManager, + db: &sled::Db, + from: S1, + to: &config::Store, +) -> anyhow::Result<()> +where + S1: Store, + Error: From, +{ + match to { + config::Store::FileStore { path } => { + let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); + + let to = FileStore::build(path, &db)?; + manager.restructure(&to).await?; + + manager.migrate_store::(from, to).await?; + } + #[cfg(feature = "object-storage")] + config::Store::S3Store { + bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, + } => { + use store::object_store::ObjectStore; + + let to = ObjectStore::build( + bucket_name, + (**region).clone(), + access_key.clone(), + secret_key.clone(), + security_token.clone(), + session_token.clone(), + &db, + )?; + + manager.migrate_store::(from, to).await?; + } + } + + Ok(()) +} + #[actix_rt::main] async fn main() -> anyhow::Result<()> { init_tracing("pict-rs", CONFIG.opentelemetry_url())?; - let root_dir = CONFIG.data_dir(); - let db = LatestDb::exists(root_dir.clone()).migrate()?; + let db = LatestDb::exists(CONFIG.data_dir()).migrate()?; + + let manager = UploadManager::new(db.clone(), CONFIG.format()).await?; + + if let Some(m) = MIGRATE.get() { + let from = m.from(); + let to = m.to(); + + match from { + config::Store::FileStore { path } => { + let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); + + let from = FileStore::build(path, &db)?; + manager.restructure(&from).await?; + + migrate_inner(&manager, &db, from, to).await?; + } + #[cfg(feature = "object-storage")] + config::Store::S3Store { + bucket_name, + region, + access_key, + secret_key, + security_token, + session_token, + } => { + let from = crate::store::object_store::ObjectStore::build( + bucket_name, + (**region).clone(), + access_key.clone(), + secret_key.clone(), + security_token.clone(), + session_token.clone(), + &db, + )?; + + migrate_inner(&manager, &db, from, to).await?; + } + } + + return Ok(()); + } match CONFIG.store() { config::Store::FileStore { path } => { - let path = path.to_owned().unwrap_or_else(|| root_dir.clone()); + let path = path.to_owned().unwrap_or_else(|| CONFIG.data_dir()); let store = FileStore::build(path, &db)?; + manager.restructure(&store).await?; - let manager = UploadManager::new(store, db, CONFIG.format()).await?; - - manager.restructure().await?; - launch(manager).await + launch(manager, store).await } #[cfg(feature = "object-storage")] config::Store::S3Store { @@ -865,8 +952,7 @@ async fn main() -> anyhow::Result<()> { &db, )?; - let manager = UploadManager::new(store, db, CONFIG.format()).await?; - launch(manager).await + launch(manager, store).await } } } diff --git a/src/range.rs b/src/range.rs index 9f64d03..7fc20b7 100644 --- a/src/range.rs +++ b/src/range.rs @@ -57,7 +57,7 @@ impl Range { pub(crate) async fn chop_store( &self, - store: S, + store: &S, identifier: S::Identifier, ) -> Result>, Error> where diff --git a/src/store/file_store/restructure.rs b/src/store/file_store/restructure.rs index c7dcf35..65309e2 100644 --- a/src/store/file_store/restructure.rs +++ b/src/store/file_store/restructure.rs @@ -8,10 +8,10 @@ use std::path::{Path, PathBuf}; const RESTRUCTURE_COMPLETE: &[u8] = b"fs-restructure-01-complete"; const DETAILS: &[u8] = b"details"; -impl UploadManager { +impl UploadManager { #[tracing::instrument(skip(self))] - pub(crate) async fn restructure(&self) -> Result<(), Error> { - if self.restructure_complete()? { + pub(crate) async fn restructure(&self, store: &FileStore) -> Result<(), Error> { + if self.restructure_complete(store)? { return Ok(()); } @@ -20,13 +20,13 @@ impl UploadManager { let filename = String::from_utf8(filename.to_vec())?; tracing::info!("Migrating {}", filename); - let file_path = self.store().root_dir.join("files").join(&filename); + let file_path = store.root_dir.join("files").join(&filename); if tokio::fs::metadata(&file_path).await.is_ok() { - let target_path = self.store().next_directory()?.join(&filename); + let target_path = store.next_directory()?.join(&filename); let target_path_bytes = self - .generalize_path(&target_path)? + .generalize_path(store, &target_path)? .to_str() .ok_or(UploadError::Path)? .as_bytes() @@ -36,7 +36,7 @@ impl UploadManager { .identifier_tree .insert(filename.as_bytes(), target_path_bytes)?; - self.store().safe_move_file(file_path, target_path).await?; + store.safe_move_file(file_path, target_path).await?; } let (start, end) = variant_key_bounds(&hash); @@ -48,25 +48,26 @@ impl UploadManager { let variant_path = PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?); if tokio::fs::metadata(&variant_path).await.is_ok() { - let target_path = self.store().next_directory()?.join(&filename); + let target_path = store.next_directory()?.join(&filename); let relative_target_path_bytes = self - .generalize_path(&target_path)? + .generalize_path(store, &target_path)? .to_str() .ok_or(UploadError::Path)? .as_bytes() .to_vec(); - let variant_key = self.migrate_variant_key(&variant_path, &filename)?; + let variant_key = + self.migrate_variant_key(store, &variant_path, &filename)?; self.inner() .identifier_tree .insert(variant_key, relative_target_path_bytes)?; - self.store() + store .safe_move_file(variant_path.clone(), target_path) .await?; - self.store().try_remove_parents(&variant_path).await; + store.try_remove_parents(&variant_path).await; } } @@ -74,37 +75,32 @@ impl UploadManager { } } - self.mark_restructure_complete()?; + self.mark_restructure_complete(store)?; Ok(()) } - fn restructure_complete(&self) -> Result { - Ok(self - .store() - .settings_tree - .get(RESTRUCTURE_COMPLETE)? - .is_some()) + fn restructure_complete(&self, store: &FileStore) -> Result { + Ok(store.settings_tree.get(RESTRUCTURE_COMPLETE)?.is_some()) } - fn mark_restructure_complete(&self) -> Result<(), Error> { - self.store() - .settings_tree - .insert(RESTRUCTURE_COMPLETE, b"true")?; + fn mark_restructure_complete(&self, store: &FileStore) -> Result<(), Error> { + store.settings_tree.insert(RESTRUCTURE_COMPLETE, b"true")?; Ok(()) } - fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> { - Ok(path.strip_prefix(&self.store().root_dir)?) + fn generalize_path<'a>(&self, store: &FileStore, path: &'a Path) -> Result<&'a Path, Error> { + Ok(path.strip_prefix(&store.root_dir)?) } fn migrate_variant_key( &self, + store: &FileStore, variant_process_path: &Path, filename: &str, ) -> Result, Error> { let path = self - .generalize_path(variant_process_path)? + .generalize_path(&store, variant_process_path)? .strip_prefix("files")?; self.variant_key(path, filename) diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 8f74863..7bea06c 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -33,15 +33,18 @@ pub(super) use session::UploadManagerSession; // - filename -> hash // - Details Tree // - filename / S::Identifier -> details -// - Path Tree +// - Identifier Tree // - filename -> S::Identifier // - filename / variant path -> S::Identifier // - filename / motion -> S::Identifier +// - Settings Tree +// - store-migration-progress -> Path Tree Key + +const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; #[derive(Clone)] -pub(crate) struct UploadManager { +pub(crate) struct UploadManager { inner: Arc, - store: S, } pub(crate) struct UploadManagerInner { @@ -51,6 +54,7 @@ pub(crate) struct UploadManagerInner { pub(crate) filename_tree: sled::Tree, pub(crate) main_tree: sled::Tree, details_tree: sled::Tree, + settings_tree: sled::Tree, pub(crate) identifier_tree: sled::Tree, db: sled::Db, } @@ -67,70 +71,115 @@ struct FilenameIVec { inner: sled::IVec, } -impl UploadManager -where - S: Store + 'static, - Error: From, -{ +impl UploadManager { /// Create a new UploadManager - pub(crate) async fn new(store: S, db: sled::Db, format: Option) -> Result { + pub(crate) async fn new(db: sled::Db, format: Option) -> Result { let manager = UploadManager { inner: Arc::new(UploadManagerInner { format, hasher: sha2::Sha256::new(), alias_tree: db.open_tree("alias")?, filename_tree: db.open_tree("filename")?, - details_tree: db.open_tree("details")?, main_tree: db.open_tree("main")?, + details_tree: db.open_tree("details")?, + settings_tree: db.open_tree("settings")?, identifier_tree: db.open_tree("path")?, db, }), - store, }; Ok(manager) } - pub(crate) fn store(&self) -> &S { - &self.store + pub(crate) async fn migrate_store(&self, from: S1, to: S2) -> Result<(), Error> + where + S1: Store, + S2: Store, + Error: From + From, + { + let iter = + if let Some(starting_line) = self.inner.settings_tree.get(STORE_MIGRATION_PROGRESS)? { + self.inner.identifier_tree.range(starting_line..) + } else { + self.inner.identifier_tree.iter() + }; + + for res in iter { + let (key, identifier) = res?; + + let identifier = S1::Identifier::from_bytes(identifier.to_vec())?; + + let filename = + if let Some((filename, _)) = String::from_utf8_lossy(&key).split_once('/') { + filename.to_string() + } else { + String::from_utf8_lossy(&key).to_string() + }; + + let stream = from.to_stream(&identifier, None, None).await?; + futures_util::pin_mut!(stream); + let mut reader = tokio_util::io::StreamReader::new(stream); + + let new_identifier = to.save_async_read(&mut reader).await?; + + let details_key = self.details_key(&identifier, &filename)?; + + if let Some(details) = self.inner.details_tree.get(details_key.clone())? { + let new_details_key = self.details_key(&new_identifier, &filename)?; + + self.inner.details_tree.insert(new_details_key, details)?; + } + + self.inner + .identifier_tree + .insert(key.clone(), new_identifier.to_bytes()?)?; + self.inner.details_tree.remove(details_key)?; + self.inner + .settings_tree + .insert(STORE_MIGRATION_PROGRESS, key)?; + } + + Ok(()) } pub(crate) fn inner(&self) -> &UploadManagerInner { &self.inner } - pub(crate) async fn still_identifier_from_filename( + pub(crate) async fn still_identifier_from_filename( &self, + store: S, filename: String, - ) -> Result { - let identifier = self.identifier_from_filename(filename.clone()).await?; - let details = if let Some(details) = self - .variant_details(identifier.clone(), filename.clone()) - .await? - { - details - } else { - let hint = details_hint(&filename); - Details::from_store(self.store.clone(), identifier.clone(), hint).await? - }; + ) -> Result + where + Error: From, + { + let identifier = self.identifier_from_filename::(filename.clone()).await?; + let details = + if let Some(details) = self.variant_details(&identifier, filename.clone()).await? { + details + } else { + let hint = details_hint(&filename); + Details::from_store(store.clone(), identifier.clone(), hint).await? + }; if !details.is_motion() { return Ok(identifier); } - if let Some(motion_identifier) = self.motion_identifier(&filename).await? { + if let Some(motion_identifier) = self.motion_identifier::(&filename).await? { return Ok(motion_identifier); } let permit = crate::PROCESS_SEMAPHORE.acquire().await; let mut reader = crate::ffmpeg::thumbnail( - self.store.clone(), + store.clone(), identifier, InputFormat::Mp4, ThumbnailFormat::Jpeg, ) .await?; - let motion_identifier = self.store.save_async_read(&mut reader).await?; + let motion_identifier = store.save_async_read(&mut reader).await?; drop(permit); self.store_motion_path(&filename, &motion_identifier) @@ -138,7 +187,13 @@ where Ok(motion_identifier) } - async fn motion_identifier(&self, filename: &str) -> Result, Error> { + async fn motion_identifier( + &self, + filename: &str, + ) -> Result, Error> + where + Error: From, + { let identifier_tree = self.inner.identifier_tree.clone(); let motion_key = format!("{}/motion", filename); @@ -151,11 +206,14 @@ where Ok(None) } - async fn store_motion_path( + async fn store_motion_path( &self, filename: &str, - identifier: &S::Identifier, - ) -> Result<(), Error> { + identifier: &I, + ) -> Result<(), Error> + where + Error: From, + { let identifier_bytes = identifier.to_bytes()?; let motion_key = format!("{}/motion", filename); let identifier_tree = self.inner.identifier_tree.clone(); @@ -166,10 +224,13 @@ where } #[instrument(skip(self))] - pub(crate) async fn identifier_from_filename( + pub(crate) async fn identifier_from_filename( &self, filename: String, - ) -> Result { + ) -> Result + where + Error: From, + { let identifier_tree = self.inner.identifier_tree.clone(); let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes())) .await?? @@ -181,11 +242,14 @@ where } #[instrument(skip(self))] - async fn store_identifier( + async fn store_identifier( &self, filename: String, - identifier: &S::Identifier, - ) -> Result<(), Error> { + identifier: &I, + ) -> Result<(), Error> + where + Error: From, + { let identifier_bytes = identifier.to_bytes()?; let identifier_tree = self.inner.identifier_tree.clone(); web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??; @@ -193,11 +257,14 @@ where } #[instrument(skip(self))] - pub(crate) async fn variant_identifier( + pub(crate) async fn variant_identifier( &self, process_path: &std::path::Path, filename: &str, - ) -> Result, Error> { + ) -> Result, Error> + where + Error: From, + { let key = self.variant_key(process_path, filename)?; let identifier_tree = self.inner.identifier_tree.clone(); let path_opt = web::block(move || identifier_tree.get(key)).await??; @@ -212,12 +279,15 @@ where /// Store the path to a generated image variant so we can easily clean it up later #[instrument(skip(self))] - pub(crate) async fn store_variant( + pub(crate) async fn store_variant( &self, variant_process_path: Option<&std::path::Path>, - identifier: &S::Identifier, + identifier: &I, filename: &str, - ) -> Result<(), Error> { + ) -> Result<(), Error> + where + Error: From, + { let key = if let Some(path) = variant_process_path { self.variant_key(path, filename)? } else { @@ -238,11 +308,14 @@ where /// Get the image details for a given variant #[instrument(skip(self))] - pub(crate) async fn variant_details( + pub(crate) async fn variant_details( &self, - identifier: S::Identifier, + identifier: &I, filename: String, - ) -> Result, Error> { + ) -> Result, Error> + where + Error: From, + { let key = self.details_key(identifier, &filename)?; let details_tree = self.inner.details_tree.clone(); @@ -260,12 +333,15 @@ where } #[instrument(skip(self))] - pub(crate) async fn store_variant_details( + pub(crate) async fn store_variant_details( &self, - identifier: S::Identifier, + identifier: &I, filename: String, details: &Details, - ) -> Result<(), Error> { + ) -> Result<(), Error> + where + Error: From, + { let key = self.details_key(identifier, &filename)?; let details_tree = self.inner.details_tree.clone(); let details_value = serde_json::to_vec(details)?; @@ -322,19 +398,35 @@ where } /// Delete an alias without a delete token - pub(crate) async fn delete_without_token(&self, alias: String) -> Result<(), Error> { + pub(crate) async fn delete_without_token( + &self, + store: S, + alias: String, + ) -> Result<(), Error> + where + Error: From, + { let token_key = delete_key(&alias); let alias_tree = self.inner.alias_tree.clone(); let token = web::block(move || alias_tree.get(token_key.as_bytes())) .await?? .ok_or(UploadError::MissingAlias)?; - self.delete(alias, String::from_utf8(token.to_vec())?).await + self.delete(store, alias, String::from_utf8(token.to_vec())?) + .await } /// Delete the alias, and the file & variants if no more aliases exist #[instrument(skip(self, alias, token))] - pub(crate) async fn delete(&self, alias: String, token: String) -> Result<(), Error> { + pub(crate) async fn delete( + &self, + store: S, + alias: String, + token: String, + ) -> Result<(), Error> + where + Error: From, + { use sled::Transactional; let main_tree = self.inner.main_tree.clone(); let alias_tree = self.inner.alias_tree.clone(); @@ -381,10 +473,17 @@ where }) .await??; - self.check_delete_files(hash).await + self.check_delete_files(store, hash).await } - async fn check_delete_files(&self, hash: sled::IVec) -> Result<(), Error> { + async fn check_delete_files( + &self, + store: S, + hash: sled::IVec, + ) -> Result<(), Error> + where + Error: From, + { // -- CHECK IF ANY OTHER ALIASES EXIST -- let main_tree = self.inner.main_tree.clone(); let (start, end) = alias_key_bounds(&hash); @@ -420,7 +519,7 @@ where actix_rt::spawn( async move { if let Err(e) = this - .cleanup_files(FilenameIVec::new(filename.clone())) + .cleanup_files(store, FilenameIVec::new(filename.clone())) .await { error!("Error removing files from fs, {}", e); @@ -456,13 +555,19 @@ where Ok(filename) } - pub(crate) fn session(&self) -> UploadManagerSession { - UploadManagerSession::new(self.clone()) + pub(crate) fn session(&self, store: S) -> UploadManagerSession + where + Error: From, + { + UploadManagerSession::new(self.clone(), store) } // Find image variants and remove them from the DB and the disk #[instrument(skip(self))] - async fn cleanup_files(&self, filename: FilenameIVec) -> Result<(), Error> { + async fn cleanup_files(&self, store: S, filename: FilenameIVec) -> Result<(), Error> + where + Error: From, + { let filename = filename.inner; let filename2 = filename.clone(); @@ -473,7 +578,7 @@ where if let Some(identifier) = identifier { let identifier = S::Identifier::from_bytes(identifier.to_vec())?; debug!("Deleting {:?}", identifier); - if let Err(e) = self.store.remove(&identifier).await { + if let Err(e) = store.remove(&identifier).await { errors.push(e); } } @@ -500,7 +605,7 @@ where let identifier = S::Identifier::from_bytes(id.to_vec())?; debug!("Deleting {:?}", identifier); - if let Err(e) = self.store.remove(&identifier).await { + if let Err(e) = store.remove(&identifier).await { errors.push(e); } } @@ -537,7 +642,10 @@ where Ok(vec) } - fn details_key(&self, identifier: S::Identifier, filename: &str) -> Result, Error> { + fn details_key(&self, identifier: &I, filename: &str) -> Result, Error> + where + Error: From, + { let mut vec = filename.as_bytes().to_vec(); vec.extend(b"/"); vec.extend(&identifier.to_bytes()?); @@ -629,7 +737,7 @@ fn delete_key(alias: &str) -> String { format!("{}/delete", alias) } -impl std::fmt::Debug for UploadManager { +impl std::fmt::Debug for UploadManager { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("UploadManager").finish() } diff --git a/src/upload_manager/session.rs b/src/upload_manager/session.rs index 6ea4a8e..24415b1 100644 --- a/src/upload_manager/session.rs +++ b/src/upload_manager/session.rs @@ -15,21 +15,23 @@ use tracing::{debug, instrument, warn, Span}; use tracing_futures::Instrument; use uuid::Uuid; -pub(crate) struct UploadManagerSession +pub(crate) struct UploadManagerSession where Error: From, { - manager: UploadManager, + store: S, + manager: UploadManager, alias: Option, finished: bool, } -impl UploadManagerSession +impl UploadManagerSession where Error: From, { - pub(super) fn new(manager: UploadManager) -> Self { + pub(super) fn new(manager: UploadManager, store: S) -> Self { UploadManagerSession { + store, manager, alias: None, finished: false, @@ -56,7 +58,7 @@ impl Dup { } } -impl Drop for UploadManagerSession +impl Drop for UploadManagerSession where Error: From, { @@ -66,6 +68,7 @@ where } if let Some(alias) = self.alias.take() { + let store = self.store.clone(); let manager = self.manager.clone(); let cleanup_span = tracing::info_span!( parent: None, @@ -89,7 +92,7 @@ where let _ = manager.inner.main_tree.remove(&key); } - let _ = manager.check_delete_files(hash).await; + let _ = manager.check_delete_files(store, hash).await; } } .instrument(cleanup_span), @@ -164,11 +167,7 @@ where let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let identifier = self - .manager - .store - .save_async_read(&mut hasher_reader) - .await?; + let identifier = self.store.save_async_read(&mut hasher_reader).await?; let hash = hasher_reader.finalize_reset().await?; debug!("Storing alias"); @@ -206,11 +205,7 @@ where let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone()); - let identifier = self - .manager - .store - .save_async_read(&mut hasher_reader) - .await?; + let identifier = self.store.save_async_read(&mut hasher_reader).await?; let hash = hasher_reader.finalize_reset().await?; debug!("Adding alias"); @@ -236,7 +231,7 @@ where if dup.exists() { debug!("Duplicate exists, removing file"); - self.manager.store.remove(identifier).await?; + self.store.remove(identifier).await?; return Ok(()); }