From 960f6487b7aef2aa921525db8025893bcfef54e8 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 31 Mar 2024 20:26:15 -0500 Subject: [PATCH] Queue generation jobs --- src/generate.rs | 54 +++++++++++++++++++++++++++++++------------- src/lib.rs | 43 ++++++++++++++++++++++++----------- src/repo.rs | 17 ++++++++++++-- src/repo/postgres.rs | 13 +++++++++++ src/repo/sled.rs | 12 ++++++++++ 5 files changed, 108 insertions(+), 31 deletions(-) diff --git a/src/generate.rs b/src/generate.rs index e38bfc8..c270186 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -6,7 +6,7 @@ use crate::{ error::{Error, UploadError}, formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, future::{WithMetrics, WithPollTimer, WithTimeout}, - repo::{Hash, VariantAlreadyExists}, + repo::{Hash, NotificationEntry, VariantAlreadyExists}, state::State, store::Store, }; @@ -102,22 +102,17 @@ pub(crate) async fn generate( break res???; } - Err(mut entry) => { - let notified = entry.notified_timeout(Duration::from_secs(20)); - - if let Some(identifier) = state - .repo - .variant_identifier(hash.clone(), variant.clone()) - .await? + Err(entry) => { + if let Some(tuple) = wait_timeout( + hash.clone(), + variant.clone(), + entry, + state, + Duration::from_secs(20), + ) + .await? { - let details = crate::ensure_details_identifier(state, &identifier).await?; - - break (details, identifier); - } - - match notified.await { - Ok(()) => tracing::debug!("notified"), - Err(_) => tracing::debug!("timeout"), + break tuple; } attempts += 1; @@ -129,6 +124,33 @@ pub(crate) async fn generate( } } +pub(crate) async fn wait_timeout( + hash: Hash, + variant: String, + mut entry: NotificationEntry, + state: &State, + timeout: Duration, +) -> Result)>, Error> { + let notified = entry.notified_timeout(timeout); + + if let Some(identifier) = state + .repo + .variant_identifier(hash.clone(), variant.clone()) + .await? + { + let details = crate::ensure_details_identifier(state, &identifier).await?; + + return Ok(Some((details, identifier))); + } + + match notified.await { + Ok(()) => tracing::debug!("notified"), + Err(_) => tracing::debug!("timeout"), + } + + Ok(None) +} + async fn heartbeat( state: &State, hash: Hash, diff --git a/src/lib.rs b/src/lib.rs index f11f600..9e29df8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -885,17 +885,34 @@ async fn process( return Err(UploadError::ReadOnly.into()); } - let original_details = ensure_details(&state, &alias).await?; + queue_generate(&state.repo, format, alias, variant.clone(), variant_args).await?; - generate::generate( - &state, - format, - variant, - variant_args, - &original_details, - hash, - ) - .await? + let mut attempts = 0; + loop { + if attempts > 6 { + return Err(UploadError::ProcessTimeout.into()); + } + + let entry = state + .repo + .variant_waiter(hash.clone(), variant.clone()) + .await?; + + let opt = generate::wait_timeout( + hash.clone(), + variant.clone(), + entry, + &state, + Duration::from_secs(5), + ) + .await?; + + if let Some(tuple) = opt { + break tuple; + } + + attempts += 1; + } }; if let Some(public_url) = state.store.public_url(&identifier) { @@ -959,7 +976,7 @@ async fn process_head( /// Process files #[tracing::instrument(name = "Spawning image process", skip(state))] -async fn process_backgrounded( +async fn process_backgrounded( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, state: web::Data>, @@ -976,7 +993,7 @@ async fn process_backgrounded( } }; - let (target_format, variant, process_args) = + let (target_format, variant, variant_args) = prepare_process(&state.config, operations, ext.as_str())?; let Some(hash) = state.repo.hash(&source).await? else { @@ -997,7 +1014,7 @@ async fn process_backgrounded( return Err(UploadError::ReadOnly.into()); } - queue_generate(&state.repo, target_format, source, variant, process_args).await?; + queue_generate(&state.repo, target_format, source, variant, variant_args).await?; Ok(HttpResponse::Accepted().finish()) } diff --git a/src/repo.rs b/src/repo.rs index 44905a3..9cbccc5 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -24,8 +24,7 @@ pub(crate) use alias::Alias; pub(crate) use delete_token::DeleteToken; pub(crate) use hash::Hash; pub(crate) use migrate::{migrate_04, migrate_repo}; - -use self::notification_map::NotificationEntry; +pub(crate) use notification_map::NotificationEntry; pub(crate) type ArcRepo = Arc; @@ -749,6 +748,12 @@ pub(crate) trait VariantRepo: BaseRepo { variant: String, ) -> Result, RepoError>; + async fn variant_waiter( + &self, + hash: Hash, + variant: String, + ) -> Result; + async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>; async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>; @@ -784,6 +789,14 @@ where T::claim_variant_processing_rights(self, hash, variant).await } + async fn variant_waiter( + &self, + hash: Hash, + variant: String, + ) -> Result { + T::variant_waiter(self, hash, variant).await + } + async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> { T::variant_heartbeat(self, hash, variant).await } diff --git a/src/repo/postgres.rs b/src/repo/postgres.rs index 7db0a58..8a58b46 100644 --- a/src/repo/postgres.rs +++ b/src/repo/postgres.rs @@ -1117,6 +1117,19 @@ impl VariantRepo for PostgresRepo { } } + async fn variant_waiter( + &self, + hash: Hash, + variant: String, + ) -> Result { + let key = Arc::from(format!("{}{variant}", hash.to_base64())); + let entry = self.listen_on_key(key); + + self.register_interest().await?; + + Ok(entry) + } + #[tracing::instrument(level = "debug", skip(self))] async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let key = format!("{}{variant}", hash.to_base64()); diff --git a/src/repo/sled.rs b/src/repo/sled.rs index ac3730f..ff45e04 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -1491,6 +1491,18 @@ impl VariantRepo for SledRepo { Ok(Ok(())) } + async fn variant_waiter( + &self, + hash: Hash, + variant: String, + ) -> Result { + let entry = self + .notifications + .register_interest(Arc::from(format!("{}{variant}", hash.to_base64()))); + + Ok(entry) + } + #[tracing::instrument(level = "trace", skip(self))] async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> { let key = (hash, variant);