From 602d1ea93503514d1f68c16fdcd089f79cf97196 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Tue, 29 Mar 2022 12:51:16 -0500 Subject: [PATCH] Begin implementing queue, put cleanups in it --- src/config/commandline.rs | 12 +++- src/config/defaults.rs | 2 + src/config/file.rs | 2 + src/main.rs | 31 ++++++----- src/queue.rs | 113 ++++++++++++++++++++++++++++++++++++++ src/repo.rs | 24 +++++--- src/repo/sled.rs | 80 +++++++++++++++++++++++---- src/upload_manager.rs | 82 +++++---------------------- 8 files changed, 245 insertions(+), 101 deletions(-) create mode 100644 src/queue.rs diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 08900d3..c335cbb 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -44,6 +44,7 @@ impl Args { Command::Run(Run { address, api_key, + worker_id, media_skip_validate_imports, media_max_width, media_max_height, @@ -54,7 +55,11 @@ impl Args { media_format, store, }) => { - let server = Server { address, api_key }; + let server = Server { + address, + api_key, + worker_id, + }; let media = Media { skip_validate_imports: media_skip_validate_imports, max_width: media_max_width, @@ -240,6 +245,8 @@ struct Server { #[serde(skip_serializing_if = "Option::is_none")] address: Option, #[serde(skip_serializing_if = "Option::is_none")] + worker_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] api_key: Option, } @@ -372,6 +379,9 @@ struct Run { #[clap(long)] api_key: Option, + #[clap(long)] + worker_id: Option, + /// Whether to validate media on the "import" endpoint #[clap(long)] media_skip_validate_imports: Option, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index a459e73..bcedb0d 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -19,6 +19,7 @@ pub(crate) struct Defaults { #[serde(rename_all = "snake_case")] struct ServerDefaults { address: SocketAddr, + worker_id: String, } #[derive(Clone, Debug, Default, serde::Serialize)] @@ -100,6 +101,7 @@ impl Default for ServerDefaults { fn default() -> Self { ServerDefaults { address: "0.0.0.0:8080".parse().expect("Valid address string"), + worker_id: String::from("pict-rs-1"), } } } diff --git a/src/config/file.rs b/src/config/file.rs index 10cd8dc..d49edc2 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -33,6 +33,8 @@ pub(crate) enum Repo { pub(crate) struct Server { pub(crate) address: SocketAddr, + pub(crate) worker_id: String, + #[serde(skip_serializing_if = "Option::is_none")] pub(crate) api_key: Option, } diff --git a/src/main.rs b/src/main.rs index 6692442..15b3725 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,7 @@ mod middleware; mod migrate; mod process; mod processor; +mod queue; mod range; mod repo; mod serde_str; @@ -246,9 +247,8 @@ async fn download( /// Delete aliases and files #[instrument(name = "Deleting file", skip(manager))] -async fn delete( +async fn delete( manager: web::Data, - store: web::Data, path_entries: web::Path<(String, String)>, ) -> Result { let (token, alias) = path_entries.into_inner(); @@ -256,7 +256,7 @@ async fn delete( let token = DeleteToken::from_existing(&token); let alias = Alias::from_existing(&alias); - manager.delete((**store).clone(), alias, token).await?; + manager.delete(alias, token).await?; Ok(HttpResponse::NoContent().finish()) } @@ -308,7 +308,6 @@ async fn process_details( query: web::Query, ext: web::Path, manager: web::Data, - store: web::Data, filters: web::Data>, ) -> Result { let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?; @@ -581,17 +580,16 @@ struct AliasQuery { } #[instrument(name = "Purging file", skip(upload_manager))] -async fn purge( +async fn purge( query: web::Query, upload_manager: web::Data, - store: web::Data, ) -> Result { let alias = Alias::from_existing(&query.alias); let aliases = upload_manager.aliases_by_alias(&alias).await?; for alias in aliases.iter() { upload_manager - .delete_without_token((**store).clone(), alias.to_owned()) + .delete_without_token(alias.to_owned()) .await?; } @@ -602,10 +600,9 @@ async fn purge( } #[instrument(name = "Fetching aliases", skip(upload_manager))] -async fn aliases( +async fn aliases( query: web::Query, upload_manager: web::Data, - store: web::Data, ) -> Result { let alias = Alias::from_existing(&query.alias); let aliases = upload_manager.aliases_by_alias(&alias).await?; @@ -639,6 +636,14 @@ async fn launch( manager: UploadManager, store: S, ) -> color_eyre::Result<()> { + let repo = manager.repo().clone(); + + actix_rt::spawn(queue::process_jobs( + repo, + store.clone(), + CONFIG.server.worker_id.as_bytes().to_vec(), + )); + // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it @@ -730,8 +735,8 @@ async fn launch( .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete::)) - .route(web::get().to(delete::)), + .route(web::delete().to(delete)) + .route(web::get().to(delete)), ) .service(web::resource("/original/{filename}").route(web::get().to(serve::))) .service(web::resource("/process.{ext}").route(web::get().to(process::))) @@ -757,8 +762,8 @@ async fn launch( .wrap(import_form.clone()) .route(web::post().to(upload::)), ) - .service(web::resource("/purge").route(web::post().to(purge::))) - .service(web::resource("/aliases").route(web::get().to(aliases::))), + .service(web::resource("/purge").route(web::post().to(purge))) + .service(web::resource("/aliases").route(web::get().to(aliases))), ) }) .bind(CONFIG.server.address)? diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..04af1b6 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,113 @@ +use crate::{ + error::Error, + repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, + store::Store, +}; +use tracing::{debug, error, Span}; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +enum Job { + Cleanup { hash: Vec }, +} + +pub(crate) async fn queue_cleanup(repo: &R, hash: R::Bytes) -> Result<(), Error> { + let job = serde_json::to_vec(&Job::Cleanup { + hash: hash.as_ref().to_vec(), + })?; + repo.push(job.into()).await?; + Ok(()) +} + +pub(crate) async fn process_jobs(repo: Repo, store: S, worker_id: Vec) { + loop { + let res = match repo { + Repo::Sled(ref repo) => do_process_jobs(repo, &store, worker_id.clone()).await, + }; + + if let Err(e) = res { + tracing::warn!("Error processing jobs: {}", e); + tracing::warn!("{:?}", e); + continue; + } + + break; + } +} + +async fn do_process_jobs(repo: &R, store: &S, worker_id: Vec) -> Result<(), Error> +where + R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, +{ + loop { + let bytes = repo.pop(worker_id.clone()).await?; + + match serde_json::from_slice(bytes.as_ref()) { + Ok(job) => match job { + Job::Cleanup { hash } => cleanup(repo, store, hash).await?, + }, + Err(e) => { + tracing::warn!("Invalid job: {}", e); + } + } + } +} + +#[tracing::instrument(skip(repo, store))] +async fn cleanup(repo: &R, store: &S, hash: Vec) -> Result<(), Error> +where + R: HashRepo + IdentifierRepo + AliasRepo, + R::Bytes: Clone, + S: Store, +{ + let hash: R::Bytes = hash.into(); + + let aliases = repo.aliases(hash.clone()).await?; + + if !aliases.is_empty() { + return Ok(()); + } + + let variant_idents = repo + .variants::(hash.clone()) + .await? + .into_iter() + .map(|(_, v)| v) + .collect::>(); + let main_ident = repo.identifier(hash.clone()).await?; + let motion_ident = repo.motion_identifier(hash.clone()).await?; + + HashRepo::cleanup(repo, hash).await?; + + let cleanup_span = tracing::info_span!(parent: None, "Cleaning files"); + cleanup_span.follows_from(Span::current()); + + let mut errors = Vec::new(); + + for identifier in variant_idents + .iter() + .chain(&[main_ident]) + .chain(motion_ident.iter()) + { + debug!("Deleting {:?}", identifier); + if let Err(e) = store.remove(identifier).await { + errors.push(e); + } + + if let Err(e) = IdentifierRepo::cleanup(repo, identifier).await { + errors.push(e); + } + } + + if !errors.is_empty() { + let span = tracing::error_span!("Error deleting files"); + span.in_scope(|| { + for error in errors { + error!("{}", error); + } + }); + } + + Ok(()) +} diff --git a/src/repo.rs b/src/repo.rs index 8cc0115..27f3cf0 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -30,17 +30,28 @@ pub(crate) struct DeleteToken { pub(crate) struct AlreadyExists; -#[async_trait::async_trait(?Send)] -pub(crate) trait SettingsRepo { +pub(crate) trait BaseRepo { type Bytes: AsRef<[u8]> + From>; +} +#[async_trait::async_trait(?Send)] +pub(crate) trait QueueRepo: BaseRepo { + async fn in_progress(&self, worker_id: Vec) -> Result, Error>; + + async fn push(&self, job: Self::Bytes) -> Result<(), Error>; + + async fn pop(&self, worker_id: Vec) -> Result; +} + +#[async_trait::async_trait(?Send)] +pub(crate) trait SettingsRepo: BaseRepo { async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>; async fn get(&self, key: &'static [u8]) -> Result, Error>; async fn remove(&self, key: &'static [u8]) -> Result<(), Error>; } #[async_trait::async_trait(?Send)] -pub(crate) trait IdentifierRepo { +pub(crate) trait IdentifierRepo: BaseRepo { async fn relate_details( &self, identifier: &I, @@ -52,8 +63,7 @@ pub(crate) trait IdentifierRepo { } #[async_trait::async_trait(?Send)] -pub(crate) trait HashRepo { - type Bytes: AsRef<[u8]> + From>; +pub(crate) trait HashRepo: BaseRepo { type Stream: Stream>; async fn hashes(&self) -> Self::Stream; @@ -101,9 +111,7 @@ pub(crate) trait HashRepo { } #[async_trait::async_trait(?Send)] -pub(crate) trait AliasRepo { - type Bytes: AsRef<[u8]> + From>; - +pub(crate) trait AliasRepo: BaseRepo { async fn create(&self, alias: &Alias) -> Result, Error>; async fn relate_delete_token( diff --git a/src/repo/sled.rs b/src/repo/sled.rs index c1f323f..a049861 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1,9 +1,15 @@ -use super::{ - Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo, - SettingsRepo, +use crate::{ + error::Error, + repo::{ + Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, + IdentifierRepo, QueueRepo, SettingsRepo, + }, }; -use crate::error::Error; use sled::{Db, IVec, Tree}; +use std::sync::Arc; +use tokio::sync::Notify; + +use super::BaseRepo; macro_rules! b { ($self:ident.$ident:ident, $expr:expr) => {{ @@ -42,7 +48,10 @@ pub(crate) struct SledRepo { aliases: Tree, alias_hashes: Tree, alias_delete_tokens: Tree, - _db: Db, + queue: Tree, + in_progress_queue: Tree, + queue_notifier: Arc, + db: Db, } impl SledRepo { @@ -58,15 +67,67 @@ impl SledRepo { aliases: db.open_tree("pict-rs-aliases-tree")?, alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, - _db: db, + queue: db.open_tree("pict-rs-queue-tree")?, + in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, + queue_notifier: Arc::new(Notify::new()), + db, }) } } +impl BaseRepo for SledRepo { + type Bytes = IVec; +} + +#[async_trait::async_trait(?Send)] +impl QueueRepo for SledRepo { + async fn in_progress(&self, worker_id: Vec) -> Result, Error> { + let opt = b!(self.in_progress_queue, in_progress_queue.get(worker_id)); + + Ok(opt) + } + + async fn push(&self, job: Self::Bytes) -> Result<(), Error> { + let id = self.db.generate_id()?; + b!(self.queue, queue.insert(id.to_be_bytes(), job)); + self.queue_notifier.notify_one(); + Ok(()) + } + + async fn pop(&self, worker_id: Vec) -> Result { + let notify = Arc::clone(&self.queue_notifier); + + loop { + let in_progress_queue = self.in_progress_queue.clone(); + + let worker_id = worker_id.clone(); + let job = b!(self.queue, { + in_progress_queue.remove(&worker_id)?; + + while let Some((key, job)) = queue.iter().find_map(Result::ok) { + in_progress_queue.insert(&worker_id, &job)?; + + if queue.remove(key)?.is_some() { + return Ok(Some(job)); + } + + in_progress_queue.remove(&worker_id)?; + } + + Ok(None) as Result<_, SledError> + }); + + if let Some(job) = job { + return Ok(job); + } + + notify.notified().await + } + } +} + #[async_trait::async_trait(?Send)] impl SettingsRepo for SledRepo { - type Bytes = IVec; - #[tracing::instrument(skip(value))] async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> { b!(self.settings, settings.insert(key, value)); @@ -212,7 +273,6 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec { #[async_trait::async_trait(?Send)] impl HashRepo for SledRepo { - type Bytes = IVec; type Stream = HashStream; async fn hashes(&self) -> Self::Stream { @@ -429,8 +489,6 @@ impl HashRepo for SledRepo { #[async_trait::async_trait(?Send)] impl AliasRepo for SledRepo { - type Bytes = sled::IVec; - #[tracing::instrument] async fn create(&self, alias: &Alias) -> Result, Error> { let bytes = alias.to_bytes(); diff --git a/src/upload_manager.rs b/src/upload_manager.rs index 42edaad..0952b81 100644 --- a/src/upload_manager.rs +++ b/src/upload_manager.rs @@ -5,15 +5,15 @@ use crate::{ ffmpeg::{InputFormat, ThumbnailFormat}, magick::details_hint, repo::{ - sled::SledRepo, Alias, AliasRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, SettingsRepo, + sled::SledRepo, Alias, AliasRepo, BaseRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, + SettingsRepo, }, store::{Identifier, Store}, }; use futures_util::StreamExt; use sha2::Digest; use std::sync::Arc; -use tracing::{debug, error, instrument, Span}; -use tracing_futures::Instrument; +use tracing::instrument; mod hasher; mod session; @@ -34,6 +34,10 @@ pub(crate) struct UploadManagerInner { } impl UploadManager { + pub(crate) fn repo(&self) -> &Repo { + &self.inner.repo + } + /// Create a new UploadManager pub(crate) async fn new(repo: Repo, format: Option) -> Result { let manager = UploadManager { @@ -229,26 +233,17 @@ impl UploadManager { } /// Delete an alias without a delete token - pub(crate) async fn delete_without_token( - &self, - store: S, - alias: Alias, - ) -> Result<(), Error> { + pub(crate) async fn delete_without_token(&self, alias: Alias) -> Result<(), Error> { let token = match self.inner.repo { Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?, }; - self.delete(store, alias, token).await + self.delete(alias, token).await } /// Delete the alias, and the file & variants if no more aliases exist #[instrument(skip(self, alias, token))] - pub(crate) async fn delete( - &self, - store: S, - alias: Alias, - token: DeleteToken, - ) -> Result<(), Error> { + pub(crate) async fn delete(&self, alias: Alias, token: DeleteToken) -> Result<(), Error> { let hash = match self.inner.repo { Repo::Sled(ref sled_repo) => { let saved_delete_token = sled_repo.delete_token(&alias).await?; @@ -262,17 +257,13 @@ impl UploadManager { } }; - self.check_delete_files(store, hash).await + self.check_delete_files(hash).await } - async fn check_delete_files( - &self, - store: S, - hash: Vec, - ) -> Result<(), Error> { + async fn check_delete_files(&self, hash: Vec) -> Result<(), Error> { match self.inner.repo { Repo::Sled(ref sled_repo) => { - let hash: ::Bytes = hash.into(); + let hash: ::Bytes = hash.into(); let aliases = sled_repo.aliases(hash.clone()).await?; @@ -280,52 +271,7 @@ impl UploadManager { return Ok(()); } - let variant_idents = sled_repo - .variants::(hash.clone()) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); - let main_ident = sled_repo.identifier(hash.clone()).await?; - let motion_ident = sled_repo.motion_identifier(hash.clone()).await?; - - let repo = sled_repo.clone(); - - HashRepo::cleanup(sled_repo, hash).await?; - - let cleanup_span = tracing::info_span!(parent: None, "Cleaning files"); - cleanup_span.follows_from(Span::current()); - - actix_rt::spawn( - async move { - let mut errors = Vec::new(); - - for identifier in variant_idents - .iter() - .chain(&[main_ident]) - .chain(motion_ident.iter()) - { - debug!("Deleting {:?}", identifier); - if let Err(e) = store.remove(identifier).await { - errors.push(e); - } - - if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await { - errors.push(e); - } - } - - if !errors.is_empty() { - let span = tracing::error_span!("Error deleting files"); - span.in_scope(|| { - for error in errors { - error!("{}", error); - } - }); - } - } - .instrument(cleanup_span), - ); + crate::queue::queue_cleanup(sled_repo, hash).await?; } }