Compare commits
No commits in common. "main" and "background-jobs-v0.18.0" have entirely different histories.
main
...
background
12
flake.lock
12
flake.lock
|
@ -5,11 +5,11 @@
|
||||||
"systems": "systems"
|
"systems": "systems"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1710146030,
|
"lastModified": 1701680307,
|
||||||
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
|
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
|
||||||
"owner": "numtide",
|
"owner": "numtide",
|
||||||
"repo": "flake-utils",
|
"repo": "flake-utils",
|
||||||
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
|
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
@ -20,11 +20,11 @@
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1712163089,
|
"lastModified": 1704194953,
|
||||||
"narHash": "sha256-Um+8kTIrC19vD4/lUCN9/cU9kcOsD1O1m+axJqQPyMM=",
|
"narHash": "sha256-RtDKd8Mynhe5CFnVT8s0/0yqtWFMM9LmCzXv/YKxnq4=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "fd281bd6b7d3e32ddfa399853946f782553163b5",
|
"rev": "bd645e8668ec6612439a9ee7e71f7eac4099d4f6",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "background-jobs-core"
|
name = "background-jobs-core"
|
||||||
description = "Core types for implementing an asynchronous jobs processor"
|
description = "Core types for implementing an asynchronous jobs processor"
|
||||||
version = "0.18.1"
|
version = "0.18.0"
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||||
|
@ -19,7 +19,7 @@ error-logging = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1.24"
|
async-trait = "0.1.24"
|
||||||
event-listener = "5"
|
event-listener = "4"
|
||||||
metrics = "0.22.0"
|
metrics = "0.22.0"
|
||||||
time = { version = "0.3", features = ["serde-human-readable"] }
|
time = { version = "0.3", features = ["serde-human-readable"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
|
|
@ -41,6 +41,7 @@ pub mod memory_storage {
|
||||||
convert::Infallible,
|
convert::Infallible,
|
||||||
future::Future,
|
future::Future,
|
||||||
ops::Bound,
|
ops::Bound,
|
||||||
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
sync::Mutex,
|
sync::Mutex,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -64,23 +65,13 @@ pub mod memory_storage {
|
||||||
inner: Arc<Mutex<Inner>>,
|
inner: Arc<Mutex<Inner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
type OrderedKey = (String, Uuid);
|
||||||
struct QueueTimeId(Uuid);
|
type JobState = Option<(Uuid, OffsetDateTime)>;
|
||||||
|
type JobMeta = (Uuid, time::Duration, JobState);
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
||||||
struct JobId(Uuid);
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
||||||
struct RunnerId(Uuid);
|
|
||||||
|
|
||||||
type OrderedKey = (String, QueueTimeId);
|
|
||||||
type JobState = Option<(RunnerId, OffsetDateTime)>;
|
|
||||||
type JobMeta = (JobId, time::Duration, JobState);
|
|
||||||
type QueueMeta = (JobInfo, QueueTimeId);
|
|
||||||
|
|
||||||
struct Inner {
|
struct Inner {
|
||||||
queues: HashMap<String, Event>,
|
queues: HashMap<String, Event>,
|
||||||
jobs: HashMap<JobId, QueueMeta>,
|
jobs: HashMap<Uuid, JobInfo>,
|
||||||
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
|
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,16 +89,11 @@ pub mod memory_storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self, job_id: Uuid) -> Option<JobInfo> {
|
fn get(&self, job_id: Uuid) -> Option<JobInfo> {
|
||||||
self.inner
|
self.inner.lock().unwrap().jobs.get(&job_id).cloned()
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.jobs
|
|
||||||
.get(&JobId(job_id))
|
|
||||||
.map(|(job_info, _)| job_info.clone())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listener(&self, pop_queue: String) -> (EventListener, Duration) {
|
fn listener(&self, pop_queue: String) -> (Pin<Box<EventListener>>, Duration) {
|
||||||
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
|
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
@ -122,8 +108,8 @@ pub mod memory_storage {
|
||||||
))
|
))
|
||||||
.filter(|(_, (_, _, meta))| meta.is_none())
|
.filter(|(_, (_, _, meta))| meta.is_none())
|
||||||
.filter_map(|(_, (id, _, _))| inner.jobs.get(id))
|
.filter_map(|(_, (id, _, _))| inner.jobs.get(id))
|
||||||
.take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str())
|
.take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
|
||||||
.map(|(JobInfo { next_queue, .. }, _)| {
|
.map(|JobInfo { next_queue, .. }| {
|
||||||
if *next_queue > now {
|
if *next_queue > now {
|
||||||
*next_queue - now
|
*next_queue - now
|
||||||
} else {
|
} else {
|
||||||
|
@ -136,10 +122,8 @@ pub mod memory_storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
|
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
|
||||||
let runner_id = RunnerId(runner_id);
|
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||||
|
let upper_bound = Uuid::now_v7();
|
||||||
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
|
|
||||||
let upper_bound = QueueTimeId(Uuid::now_v7());
|
|
||||||
let now = time::OffsetDateTime::now_utc();
|
let now = time::OffsetDateTime::now_utc();
|
||||||
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
@ -151,9 +135,7 @@ pub mod memory_storage {
|
||||||
Bound::Included((queue.to_string(), upper_bound)),
|
Bound::Included((queue.to_string(), upper_bound)),
|
||||||
)) {
|
)) {
|
||||||
if job_meta.is_none()
|
if job_meta.is_none()
|
||||||
|| job_meta.is_some_and(|(_, heartbeat_timestamp)| {
|
|| job_meta.is_some_and(|(_, h)| h + (5 * *heartbeat_interval) < now)
|
||||||
heartbeat_timestamp + (5 * *heartbeat_interval) < now
|
|
||||||
})
|
|
||||||
{
|
{
|
||||||
*job_meta = Some((runner_id, now));
|
*job_meta = Some((runner_id, now));
|
||||||
pop_job = Some(*job_id);
|
pop_job = Some(*job_id);
|
||||||
|
@ -162,61 +144,73 @@ pub mod memory_storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(job_id) = pop_job {
|
if let Some(job_id) = pop_job {
|
||||||
return inner
|
return inner.jobs.get(&job_id).cloned();
|
||||||
.jobs
|
|
||||||
.get(&job_id)
|
|
||||||
.map(|(job_info, _)| job_info.clone());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
|
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
|
||||||
let job_id = JobId(job_id);
|
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||||
let runner_id = RunnerId(runner_id);
|
let upper_bound = Uuid::now_v7();
|
||||||
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) {
|
let queue = if let Some(job) = inner.jobs.get(&job_id) {
|
||||||
(job.queue.clone(), *queue_time_id)
|
job.queue.clone()
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) {
|
for (_, (found_job_id, _, found_job_meta)) in inner.queue_jobs.range_mut((
|
||||||
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
|
Bound::Excluded((queue.clone(), lower_bound)),
|
||||||
} else {
|
Bound::Included((queue, upper_bound)),
|
||||||
metrics::counter!("background-jobs.memory.heartbeat.missing-queue-job")
|
)) {
|
||||||
.increment(1);
|
if *found_job_id == job_id {
|
||||||
tracing::warn!("Missing job meta for {queue_key:?}");
|
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
|
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
|
||||||
let job_id = JobId(job_id);
|
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
|
||||||
|
let upper_bound = Uuid::now_v7();
|
||||||
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
let (job, queue_time_id) = inner.jobs.remove(&job_id)?;
|
let job = inner.jobs.remove(&job_id)?;
|
||||||
let queue_key = (job.queue.clone(), queue_time_id);
|
|
||||||
|
|
||||||
if inner.queue_jobs.remove(&queue_key).is_none() {
|
let mut key = None;
|
||||||
metrics::counter!("background-jobs.memory.remove.missing-queue-job").increment(1);
|
|
||||||
tracing::warn!("failed to remove job meta for {queue_key:?}");
|
for (found_key, (found_job_id, _, _)) in inner.queue_jobs.range_mut((
|
||||||
|
Bound::Excluded((job.queue.clone(), lower_bound)),
|
||||||
|
Bound::Included((job.queue.clone(), upper_bound)),
|
||||||
|
)) {
|
||||||
|
if *found_job_id == job_id {
|
||||||
|
key = Some(found_key.clone());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(key) = key {
|
||||||
|
if inner.queue_jobs.remove(&key).is_none() {
|
||||||
|
tracing::warn!("failed to remove {key:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(job)
|
Some(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert(&self, job: JobInfo) -> Uuid {
|
fn insert(&self, job: JobInfo) -> Uuid {
|
||||||
let id = JobId(job.id);
|
let id = job.id;
|
||||||
let queue = job.queue.clone();
|
let queue = job.queue.clone();
|
||||||
let queue_time_id = QueueTimeId(job.next_queue_id());
|
let queue_time_id = job.next_queue_id();
|
||||||
let heartbeat_interval = job.heartbeat_interval;
|
let heartbeat_interval = job.heartbeat_interval;
|
||||||
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
inner.jobs.insert(id, (job, queue_time_id));
|
inner.jobs.insert(id, job);
|
||||||
|
|
||||||
inner.queue_jobs.insert(
|
inner.queue_jobs.insert(
|
||||||
(queue.clone(), queue_time_id),
|
(queue.clone(), queue_time_id),
|
||||||
|
@ -229,23 +223,10 @@ pub mod memory_storage {
|
||||||
|
|
||||||
inner.queues.entry(queue).or_default().notify(1);
|
inner.queues.entry(queue).or_default().notify(1);
|
||||||
|
|
||||||
metrics::gauge!("background-jobs.memory.insert.queues")
|
id
|
||||||
.set(recordable(inner.queues.len()));
|
|
||||||
metrics::gauge!("background-jobs.memory.insert.jobs").set(recordable(inner.jobs.len()));
|
|
||||||
metrics::gauge!("background-jobs.memory.insert.queue-jobs")
|
|
||||||
.set(recordable(inner.queue_jobs.len()));
|
|
||||||
|
|
||||||
id.0
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recordable(value: usize) -> u32 {
|
|
||||||
let value = value as u64;
|
|
||||||
let value = value % u64::from(u32::MAX);
|
|
||||||
|
|
||||||
value as _
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
|
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
|
||||||
type Error = Infallible;
|
type Error = Infallible;
|
||||||
|
|
Loading…
Reference in a new issue