From 63ee0d7cb7a83ae43193d01ad1a1f141478baea4 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 10 Jan 2024 15:06:36 -0600 Subject: [PATCH] jobs-core: change 'timeout' to 'heartbeat_interval' --- jobs-core/src/job.rs | 16 +++++++--------- jobs-core/src/job_info.rs | 10 +++++----- jobs-core/src/storage.rs | 20 ++++++++++++-------- jobs-core/src/unsend_job.rs | 20 +++++++++----------- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 440ac58..0ff5022 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -74,15 +74,14 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// Jobs can override const BACKOFF: Backoff = Backoff::Exponential(2); - /// Define the maximum number of milliseconds a job should be allowed to run before being - /// considered dead. + /// Define how often a job should update its heartbeat timestamp /// /// This is important for allowing the job server to reap processes that were started but never /// completed. /// - /// Defaults to 15 seconds + /// Defaults to 5 seconds /// Jobs can override - const TIMEOUT: u64 = 15_000; + const HEARTBEAT_INTERVAL: u64 = 5_000; /// Users of this library must define what it means to run a job. /// @@ -118,13 +117,12 @@ pub trait Job: Serialize + DeserializeOwned + 'static { Self::BACKOFF } - /// Define the maximum number of milliseconds this job should be allowed to run before being - /// considered dead. + /// Define how often a job should update its heartbeat timestamp /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> u64 { - Self::TIMEOUT + fn heartbeat_interval(&self) -> u64 { + Self::HEARTBEAT_INTERVAL } } @@ -138,7 +136,7 @@ where job.queue().to_owned(), job.max_retries(), job.backoff_strategy(), - job.timeout(), + job.heartbeat_interval(), serde_json::to_value(job).map_err(|_| ToJson)?, ); diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 7f55928..1cd8aa5 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -60,7 +60,7 @@ pub struct NewJobInfo { /// Milliseconds from execution until the job is considered dead /// /// This is important for storage implementations to reap unfinished jobs - timeout: u64, + heartbeat_interval: u64, } impl NewJobInfo { @@ -73,7 +73,7 @@ impl NewJobInfo { queue: String, max_retries: MaxRetries, backoff_strategy: Backoff, - timeout: u64, + heartbeat_interval: u64, args: Value, ) -> Self { NewJobInfo { @@ -83,7 +83,7 @@ impl NewJobInfo { max_retries, next_queue: None, backoff_strategy, - timeout, + heartbeat_interval, } } @@ -113,7 +113,7 @@ impl NewJobInfo { max_retries: self.max_retries, next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()), backoff_strategy: self.backoff_strategy, - timeout: self.timeout, + heartbeat_interval: self.heartbeat_interval, } } } @@ -159,7 +159,7 @@ pub struct JobInfo { /// Milliseconds from execution until the job is considered dead /// /// This is important for storage implementations to reap unfinished jobs - pub timeout: u64, + pub heartbeat_interval: u64, } impl JobInfo { diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index c105f92..54cd945 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -272,16 +272,20 @@ pub mod memory_storage { }; match result { + // successful jobs are removed JobResult::Success => Ok(true), - JobResult::Unregistered | JobResult::Unexecuted => Ok(true), - JobResult::Failure => { - if job.prepare_retry() { - self.insert(job); - return Ok(false); - } else { - Ok(true) - } + // Unregistered or Unexecuted jobs are restored as-is + JobResult::Unregistered | JobResult::Unexecuted => { + self.insert(job); + Ok(false) } + // retryable failed jobs are restored + JobResult::Failure if job.prepare_retry() => { + self.insert(job); + Ok(false) + } + // dead jobs are removed + JobResult::Failure => Ok(true), } } } diff --git a/jobs-core/src/unsend_job.rs b/jobs-core/src/unsend_job.rs index 6e47b16..6aefc52 100644 --- a/jobs-core/src/unsend_job.rs +++ b/jobs-core/src/unsend_job.rs @@ -76,15 +76,14 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static { /// Jobs can override const BACKOFF: Backoff = Backoff::Exponential(2); - /// Define the maximum number of milliseconds a job should be allowed to run before being - /// considered dead. + /// Define how often a job should update its heartbeat timestamp /// /// This is important for allowing the job server to reap processes that were started but never /// completed. /// - /// Defaults to 15 seconds + /// Defaults to 5 seconds /// Jobs can override - const TIMEOUT: u64 = 15_000; + const HEARTBEAT_INTERVAL: u64 = 5_000; /// Users of this library must define what it means to run a job. /// @@ -120,13 +119,12 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static { Self::BACKOFF } - /// Define the maximum number of milliseconds this job should be allowed to run before being - /// considered dead. + /// Define how often a job should update its heartbeat timestamp /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> u64 { - Self::TIMEOUT + fn heartbeat_interval(&self) -> u64 { + Self::HEARTBEAT_INTERVAL } } @@ -156,7 +154,7 @@ where const QUEUE: &'static str = ::QUEUE; const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; const BACKOFF: Backoff = ::BACKOFF; - const TIMEOUT: u64 = ::TIMEOUT; + const HEARTBEAT_INTERVAL: u64 = ::HEARTBEAT_INTERVAL; fn run(self, state: Self::State) -> Self::Future { UnwrapFuture(T::Spawner::spawn( @@ -180,7 +178,7 @@ where UnsendJob::backoff_strategy(self) } - fn timeout(&self) -> u64 { - UnsendJob::timeout(self) + fn heartbeat_interval(&self) -> u64 { + UnsendJob::heartbeat_interval(self) } }