Fix fetching next timestamp, make more things public

This commit is contained in:
asonix 2024-01-07 22:45:48 -06:00
parent 496c40ddd4
commit 0cd0f91369
2 changed files with 24 additions and 22 deletions

View file

@ -7,8 +7,10 @@ use uuid::{NoContext, Timestamp, Uuid};
#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
/// Information about the sate of an attempted job /// Information about the sate of an attempted job
pub struct ReturnJobInfo { pub struct ReturnJobInfo {
pub(crate) id: Uuid, /// The ID of the job being returned
pub(crate) result: JobResult, pub id: Uuid,
/// The result status of the job
pub result: JobResult,
} }
impl ReturnJobInfo { impl ReturnJobInfo {
@ -95,7 +97,8 @@ impl NewJobInfo {
self.next_queue.is_none() self.next_queue.is_none()
} }
pub(crate) fn build(self) -> JobInfo { /// Construct a JobInfo from a NewJobInfo
pub fn build(self) -> JobInfo {
JobInfo { JobInfo {
id: Uuid::now_v7(), id: Uuid::now_v7(),
name: self.name, name: self.name,

View file

@ -83,34 +83,31 @@ pub mod memory_storage {
} }
} }
fn listener(&self, queue: String) -> (Pin<Box<EventListener>>, Duration) { fn listener(&self, pop_queue: String) -> (Pin<Box<EventListener>>, Duration) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)); let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7(); let now = OffsetDateTime::now_utc();
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let listener = inner.queues.entry(queue.clone()).or_default().listen(); let listener = inner.queues.entry(pop_queue.clone()).or_default().listen();
let next_job = inner let duration = inner
.queue_jobs .queue_jobs
.range(( .range((
Bound::Excluded((queue.clone(), lower_bound)), Bound::Excluded((pop_queue.clone(), lower_bound)),
Bound::Included((queue, upper_bound)), Bound::Unbounded,
)) ))
.find_map(|(_, (id, meta))| { .filter(|(_, (_, meta))| meta.is_none())
if meta.is_none() { .filter_map(|(_, (id, _))| inner.jobs.get(id))
inner.jobs.get(id) .take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
.map(|JobInfo { next_queue, .. }| {
if *next_queue > now {
*next_queue - now
} else { } else {
None time::Duration::seconds(0)
} }
}); })
.find_map(|duration| duration.try_into().ok());
let duration = if let Some(job) = next_job {
let duration = OffsetDateTime::now_utc() - job.next_queue;
duration.try_into().ok()
} else {
None
};
(listener, duration.unwrap_or(Duration::from_secs(10))) (listener, duration.unwrap_or(Duration::from_secs(10)))
} }
@ -188,7 +185,9 @@ pub mod memory_storage {
} }
if let Some(key) = key { if let Some(key) = key {
inner.queue_jobs.remove(&key); if inner.queue_jobs.remove(&key).is_none() {
tracing::warn!("failed to remove {key:?}");
}
} }
Some(job) Some(job)