Enable at-most-once queueing for some cleanup jobs

This commit is contained in:
asonix 2024-01-24 17:14:31 -06:00
parent 1b28ea573d
commit fe1132aec1
6 changed files with 132 additions and 35 deletions

View file

@ -22,6 +22,10 @@ mod process;
const CLEANUP_QUEUE: &str = "cleanup";
const PROCESS_QUEUE: &str = "process";
const OUTDATED_PROXIES_UNIQUE_KEY: &str = "outdated-proxies";
const OUTDATED_VARIANTS_UNIQUE_KEY: &str = "outdated-variants";
const ALL_VARIANTS_UNIQUE_KEY: &str = "all-variants";
const PRUNE_MISSING_UNIQUE_KEY: &str = "prune-missing";
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Cleanup {
@ -71,13 +75,13 @@ pub(crate) async fn cleanup_alias(
token: Serde::new(token),
})
.map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
repo.push(CLEANUP_QUEUE, job, None).await?;
Ok(())
}
pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
repo.push(CLEANUP_QUEUE, job, None).await?;
Ok(())
}
@ -86,7 +90,7 @@ pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) ->
identifier: identifier.to_string(),
})
.map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
repo.push(CLEANUP_QUEUE, job, None).await?;
Ok(())
}
@ -97,31 +101,55 @@ async fn cleanup_variants(
) -> Result<(), Error> {
let job =
serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
repo.push(CLEANUP_QUEUE, job, None).await?;
Ok(())
}
pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
if repo
.push(CLEANUP_QUEUE, job, Some(OUTDATED_PROXIES_UNIQUE_KEY))
.await?
.is_none()
{
tracing::debug!("outdated proxies conflict");
}
Ok(())
}
pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
if repo
.push(CLEANUP_QUEUE, job, Some(OUTDATED_VARIANTS_UNIQUE_KEY))
.await?
.is_none()
{
tracing::debug!("outdated variants conflict");
}
Ok(())
}
pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
if repo
.push(CLEANUP_QUEUE, job, Some(ALL_VARIANTS_UNIQUE_KEY))
.await?
.is_none()
{
tracing::debug!("all variants conflict");
}
Ok(())
}
pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> {
let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
if repo
.push(CLEANUP_QUEUE, job, Some(PRUNE_MISSING_UNIQUE_KEY))
.await?
.is_none()
{
tracing::debug!("prune missing conflict");
}
Ok(())
}
@ -137,7 +165,7 @@ pub(crate) async fn queue_ingest(
upload_id: Serde::new(upload_id),
})
.map_err(UploadError::PushJob)?;
repo.push(PROCESS_QUEUE, job).await?;
repo.push(PROCESS_QUEUE, job, None).await?;
Ok(())
}
@ -155,7 +183,7 @@ pub(crate) async fn queue_generate(
process_args,
})
.map_err(UploadError::PushJob)?;
repo.push(PROCESS_QUEUE, job).await?;
repo.push(PROCESS_QUEUE, job, None).await?;
Ok(())
}

View file

@ -353,7 +353,12 @@ impl JobId {
#[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo {
async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result<JobId, RepoError>;
async fn push(
&self,
queue: &'static str,
job: serde_json::Value,
unique_key: Option<&'static str>,
) -> Result<Option<JobId>, RepoError>;
async fn pop(
&self,
@ -381,8 +386,13 @@ impl<T> QueueRepo for Arc<T>
where
T: QueueRepo,
{
async fn push(&self, queue: &'static str, job: serde_json::Value) -> Result<JobId, RepoError> {
T::push(self, queue, job).await
async fn push(
&self,
queue: &'static str,
job: serde_json::Value,
unique_key: Option<&'static str>,
) -> Result<Option<JobId>, RepoError> {
T::push(self, queue, job, unique_key).await
}
async fn pop(

View file

@ -1223,26 +1223,41 @@ impl QueueRepo for PostgresRepo {
&self,
queue_name: &'static str,
job_json: serde_json::Value,
) -> Result<JobId, RepoError> {
in_unique_key: Option<&'static str>,
) -> Result<Option<JobId>, RepoError> {
let guard = PushMetricsGuard::guard(queue_name);
use schema::job_queue::dsl::*;
let mut conn = self.get_connection().await?;
let job_id = diesel::insert_into(job_queue)
.values((queue.eq(queue_name), job.eq(job_json)))
let res = diesel::insert_into(job_queue)
.values((
queue.eq(queue_name),
job.eq(job_json),
unique_key.eq(in_unique_key),
))
.returning(id)
.get_result::<Uuid>(&mut conn)
.with_metrics("pict-rs.postgres.queue.push")
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
.map(JobId)
.map(Some);
guard.disarm();
match res {
Ok(job_id) => {
guard.disarm();
Ok(JobId(job_id))
Ok(job_id)
}
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(None),
Err(e) => Err(RepoError::from(PostgresError::Diesel(e))),
}
}
#[tracing::instrument(level = "debug", skip(self))]

View file

@ -0,0 +1,15 @@
use barrel::backend::Pg;
use barrel::{types, Migration};
pub(crate) fn migration() -> String {
let mut m = Migration::new();
m.change_table("job_queue", |t| {
t.add_column(
"unique_key",
types::text().size(50).nullable(true).unique(true),
);
});
m.make::<Pg>().to_string()
}

View file

@ -42,6 +42,7 @@ diesel::table! {
status -> JobStatus,
queue_time -> Timestamp,
heartbeat -> Nullable<Timestamp>,
unique_key -> Nullable<Text>,
}
}

View file

@ -95,6 +95,8 @@ pub(crate) struct SledRepo {
alias_hashes: Tree,
alias_delete_tokens: Tree,
queue: Tree,
unique_jobs: Tree,
unique_jobs_inverse: Tree,
job_state: Tree,
alias_access: Tree,
inverse_alias_access: Tree,
@ -134,6 +136,8 @@ impl SledRepo {
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
queue: db.open_tree("pict-rs-queue-tree")?,
unique_jobs: db.open_tree("pict-rs-unique-jobs-tree")?,
unique_jobs_inverse: db.open_tree("pict-rs-unique-jobs-inverse-tree")?,
job_state: db.open_tree("pict-rs-job-state-tree")?,
alias_access: db.open_tree("pict-rs-alias-access-tree")?,
inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?,
@ -625,7 +629,8 @@ impl QueueRepo for SledRepo {
&self,
queue_name: &'static str,
job: serde_json::Value,
) -> Result<JobId, RepoError> {
unique_key: Option<&'static str>,
) -> Result<Option<JobId>, RepoError> {
let metrics_guard = PushMetricsGuard::guard(queue_name);
let id = JobId::gen();
@ -633,29 +638,45 @@ impl QueueRepo for SledRepo {
let job = serde_json::to_vec(&job).map_err(SledError::Job)?;
let queue = self.queue.clone();
let unique_jobs = self.unique_jobs.clone();
let unique_jobs_inverse = self.unique_jobs_inverse.clone();
let job_state = self.job_state.clone();
let res = crate::sync::spawn_blocking("sled-io", move || {
(&queue, &job_state).transaction(|(queue, job_state)| {
let state = JobState::pending();
(&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction(
|(queue, unique_jobs, unique_jobs_inverse, job_state)| {
let state = JobState::pending();
queue.insert(&key[..], &job[..])?;
job_state.insert(&key[..], state.as_bytes())?;
queue.insert(&key[..], &job[..])?;
if let Some(unique_key) = unique_key {
if unique_jobs
.insert(unique_key.as_bytes(), &key[..])?
.is_some()
{
return sled::transaction::abort(());
}
Ok(())
})
unique_jobs_inverse.insert(&key[..], unique_key.as_bytes())?;
}
job_state.insert(&key[..], state.as_bytes())?;
Ok(())
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
return Err(RepoError::from(SledError::from(e)));
match res {
Err(TransactionError::Abort(())) => return Ok(None),
Err(TransactionError::Storage(e)) => return Err(RepoError::from(SledError::from(e))),
Ok(()) => (),
}
if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) {
notifier.notify_one();
metrics_guard.disarm();
return Ok(id);
return Ok(Some(id));
}
self.queue_notifier
@ -667,7 +688,7 @@ impl QueueRepo for SledRepo {
metrics_guard.disarm();
Ok(id)
Ok(Some(id))
}
#[tracing::instrument(skip(self, worker_id), fields(job_id))]
@ -808,14 +829,21 @@ impl QueueRepo for SledRepo {
let key = job_key(queue_name, job_id);
let queue = self.queue.clone();
let unique_jobs = self.unique_jobs.clone();
let unique_jobs_inverse = self.unique_jobs_inverse.clone();
let job_state = self.job_state.clone();
let res = crate::sync::spawn_blocking("sled-io", move || {
(&queue, &job_state).transaction(|(queue, job_state)| {
queue.remove(&key[..])?;
job_state.remove(&key[..])?;
Ok(())
})
(&queue, &unique_jobs, &unique_jobs_inverse, &job_state).transaction(
|(queue, unique_jobs, unique_jobs_inverse, job_state)| {
queue.remove(&key[..])?;
if let Some(unique_key) = unique_jobs_inverse.remove(&key[..])? {
unique_jobs.remove(unique_key)?;
}
job_state.remove(&key[..])?;
Ok(())
},
)
})
.await
.map_err(|_| RepoError::Canceled)?;