diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 7c60914..b4e417c 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -7,8 +7,10 @@ use uuid::{NoContext, Timestamp, Uuid}; #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] /// Information about the sate of an attempted job pub struct ReturnJobInfo { - pub(crate) id: Uuid, - pub(crate) result: JobResult, + /// The ID of the job being returned + pub id: Uuid, + /// The result status of the job + pub result: JobResult, } impl ReturnJobInfo { @@ -95,7 +97,8 @@ impl NewJobInfo { self.next_queue.is_none() } - pub(crate) fn build(self) -> JobInfo { + /// Construct a JobInfo from a NewJobInfo + pub fn build(self) -> JobInfo { JobInfo { id: Uuid::now_v7(), name: self.name, diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 6575440..7561e27 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -83,34 +83,31 @@ pub mod memory_storage { } } - fn listener(&self, queue: String) -> (Pin>, Duration) { + fn listener(&self, pop_queue: String) -> (Pin>, Duration) { let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); - let upper_bound = Uuid::now_v7(); + let now = OffsetDateTime::now_utc(); let mut inner = self.inner.lock().unwrap(); - let listener = inner.queues.entry(queue.clone()).or_default().listen(); + let listener = inner.queues.entry(pop_queue.clone()).or_default().listen(); - let next_job = inner + let duration = inner .queue_jobs .range(( - Bound::Excluded((queue.clone(), lower_bound)), - Bound::Included((queue, upper_bound)), + Bound::Excluded((pop_queue.clone(), lower_bound)), + Bound::Unbounded, )) - .find_map(|(_, (id, meta))| { - if meta.is_none() { - inner.jobs.get(id) + .filter(|(_, (_, meta))| meta.is_none()) + .filter_map(|(_, (id, _))| inner.jobs.get(id)) + .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str()) + .map(|JobInfo { next_queue, .. }| { + if *next_queue > now { + *next_queue - now } else { - None + time::Duration::seconds(0) } - }); - - let duration = if let Some(job) = next_job { - let duration = OffsetDateTime::now_utc() - job.next_queue; - duration.try_into().ok() - } else { - None - }; + }) + .find_map(|duration| duration.try_into().ok()); (listener, duration.unwrap_or(Duration::from_secs(10))) } @@ -188,7 +185,9 @@ pub mod memory_storage { } if let Some(key) = key { - inner.queue_jobs.remove(&key); + if inner.queue_jobs.remove(&key).is_none() { + tracing::warn!("failed to remove {key:?}"); + } } Some(job)