Queue generation jobs

This commit is contained in:
asonix 2024-03-31 20:26:15 -05:00
parent cd6fb84cc4
commit 960f6487b7
5 changed files with 108 additions and 31 deletions

View file

@ -6,7 +6,7 @@ use crate::{
error::{Error, UploadError},
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
future::{WithMetrics, WithPollTimer, WithTimeout},
repo::{Hash, VariantAlreadyExists},
repo::{Hash, NotificationEntry, VariantAlreadyExists},
state::State,
store::Store,
};
@ -102,22 +102,17 @@ pub(crate) async fn generate<S: Store + 'static>(
break res???;
}
Err(mut entry) => {
let notified = entry.notified_timeout(Duration::from_secs(20));
if let Some(identifier) = state
.repo
.variant_identifier(hash.clone(), variant.clone())
.await?
Err(entry) => {
if let Some(tuple) = wait_timeout(
hash.clone(),
variant.clone(),
entry,
state,
Duration::from_secs(20),
)
.await?
{
let details = crate::ensure_details_identifier(state, &identifier).await?;
break (details, identifier);
}
match notified.await {
Ok(()) => tracing::debug!("notified"),
Err(_) => tracing::debug!("timeout"),
break tuple;
}
attempts += 1;
@ -129,6 +124,33 @@ pub(crate) async fn generate<S: Store + 'static>(
}
}
pub(crate) async fn wait_timeout<S: Store + 'static>(
hash: Hash,
variant: String,
mut entry: NotificationEntry,
state: &State<S>,
timeout: Duration,
) -> Result<Option<(Details, Arc<str>)>, Error> {
let notified = entry.notified_timeout(timeout);
if let Some(identifier) = state
.repo
.variant_identifier(hash.clone(), variant.clone())
.await?
{
let details = crate::ensure_details_identifier(state, &identifier).await?;
return Ok(Some((details, identifier)));
}
match notified.await {
Ok(()) => tracing::debug!("notified"),
Err(_) => tracing::debug!("timeout"),
}
Ok(None)
}
async fn heartbeat<S, O>(
state: &State<S>,
hash: Hash,

View file

@ -885,17 +885,34 @@ async fn process<S: Store + 'static>(
return Err(UploadError::ReadOnly.into());
}
let original_details = ensure_details(&state, &alias).await?;
queue_generate(&state.repo, format, alias, variant.clone(), variant_args).await?;
generate::generate(
&state,
format,
variant,
variant_args,
&original_details,
hash,
)
.await?
let mut attempts = 0;
loop {
if attempts > 6 {
return Err(UploadError::ProcessTimeout.into());
}
let entry = state
.repo
.variant_waiter(hash.clone(), variant.clone())
.await?;
let opt = generate::wait_timeout(
hash.clone(),
variant.clone(),
entry,
&state,
Duration::from_secs(5),
)
.await?;
if let Some(tuple) = opt {
break tuple;
}
attempts += 1;
}
};
if let Some(public_url) = state.store.public_url(&identifier) {
@ -959,7 +976,7 @@ async fn process_head<S: Store + 'static>(
/// Process files
#[tracing::instrument(name = "Spawning image process", skip(state))]
async fn process_backgrounded<S: Store>(
async fn process_backgrounded<S: Store + 'static>(
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
state: web::Data<State<S>>,
@ -976,7 +993,7 @@ async fn process_backgrounded<S: Store>(
}
};
let (target_format, variant, process_args) =
let (target_format, variant, variant_args) =
prepare_process(&state.config, operations, ext.as_str())?;
let Some(hash) = state.repo.hash(&source).await? else {
@ -997,7 +1014,7 @@ async fn process_backgrounded<S: Store>(
return Err(UploadError::ReadOnly.into());
}
queue_generate(&state.repo, target_format, source, variant, process_args).await?;
queue_generate(&state.repo, target_format, source, variant, variant_args).await?;
Ok(HttpResponse::Accepted().finish())
}

View file

@ -24,8 +24,7 @@ pub(crate) use alias::Alias;
pub(crate) use delete_token::DeleteToken;
pub(crate) use hash::Hash;
pub(crate) use migrate::{migrate_04, migrate_repo};
use self::notification_map::NotificationEntry;
pub(crate) use notification_map::NotificationEntry;
pub(crate) type ArcRepo = Arc<dyn FullRepo>;
@ -749,6 +748,12 @@ pub(crate) trait VariantRepo: BaseRepo {
variant: String,
) -> Result<Result<(), NotificationEntry>, RepoError>;
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError>;
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
@ -784,6 +789,14 @@ where
T::claim_variant_processing_rights(self, hash, variant).await
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
T::variant_waiter(self, hash, variant).await
}
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::variant_heartbeat(self, hash, variant).await
}

View file

@ -1117,6 +1117,19 @@ impl VariantRepo for PostgresRepo {
}
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
let key = Arc::from(format!("{}{variant}", hash.to_base64()));
let entry = self.listen_on_key(key);
self.register_interest().await?;
Ok(entry)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = format!("{}{variant}", hash.to_base64());

View file

@ -1491,6 +1491,18 @@ impl VariantRepo for SledRepo {
Ok(Ok(()))
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
let entry = self
.notifications
.register_interest(Arc::from(format!("{}{variant}", hash.to_base64())));
Ok(entry)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = (hash, variant);