Compare commits

...

3 commits

Author SHA1 Message Date
asonix 4dbde2c9eb Add gauges for memory storage 2024-04-06 13:24:29 -05:00
asonix f5163454da Store queue_time_id to avoid O(n) heartbeats & removals 2024-04-06 13:15:46 -05:00
asonix 005b8f851b Update nixpkgs 2024-04-06 13:06:20 -05:00
2 changed files with 73 additions and 53 deletions

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1704194953,
"narHash": "sha256-RtDKd8Mynhe5CFnVT8s0/0yqtWFMM9LmCzXv/YKxnq4=",
"lastModified": 1712163089,
"narHash": "sha256-Um+8kTIrC19vD4/lUCN9/cU9kcOsD1O1m+axJqQPyMM=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "bd645e8668ec6612439a9ee7e71f7eac4099d4f6",
"rev": "fd281bd6b7d3e32ddfa399853946f782553163b5",
"type": "github"
},
"original": {

View file

@ -65,13 +65,23 @@ pub mod memory_storage {
inner: Arc<Mutex<Inner>>,
}
type OrderedKey = (String, Uuid);
type JobState = Option<(Uuid, OffsetDateTime)>;
type JobMeta = (Uuid, time::Duration, JobState);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct QueueTimeId(Uuid);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct JobId(Uuid);
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct RunnerId(Uuid);
type OrderedKey = (String, QueueTimeId);
type JobState = Option<(RunnerId, OffsetDateTime)>;
type JobMeta = (JobId, time::Duration, JobState);
type QueueMeta = (JobInfo, QueueTimeId);
struct Inner {
queues: HashMap<String, Event>,
jobs: HashMap<Uuid, JobInfo>,
jobs: HashMap<JobId, QueueMeta>,
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
}
@ -89,11 +99,16 @@ pub mod memory_storage {
}
fn get(&self, job_id: Uuid) -> Option<JobInfo> {
self.inner.lock().unwrap().jobs.get(&job_id).cloned()
self.inner
.lock()
.unwrap()
.jobs
.get(&JobId(job_id))
.map(|(job_info, _)| job_info.clone())
}
fn listener(&self, pop_queue: String) -> (Pin<Box<EventListener>>, Duration) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
let now = OffsetDateTime::now_utc();
let mut inner = self.inner.lock().unwrap();
@ -108,8 +123,8 @@ pub mod memory_storage {
))
.filter(|(_, (_, _, meta))| meta.is_none())
.filter_map(|(_, (id, _, _))| inner.jobs.get(id))
.take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
.map(|JobInfo { next_queue, .. }| {
.take_while(|(JobInfo { queue, .. }, _)| queue.as_str() == pop_queue.as_str())
.map(|(JobInfo { next_queue, .. }, _)| {
if *next_queue > now {
*next_queue - now
} else {
@ -122,8 +137,10 @@ pub mod memory_storage {
}
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let runner_id = RunnerId(runner_id);
let lower_bound = QueueTimeId(Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0)));
let upper_bound = QueueTimeId(Uuid::now_v7());
let now = time::OffsetDateTime::now_utc();
let mut inner = self.inner.lock().unwrap();
@ -135,7 +152,9 @@ pub mod memory_storage {
Bound::Included((queue.to_string(), upper_bound)),
)) {
if job_meta.is_none()
|| job_meta.is_some_and(|(_, h)| h + (5 * *heartbeat_interval) < now)
|| job_meta.is_some_and(|(_, heartbeat_timestamp)| {
heartbeat_timestamp + (5 * *heartbeat_interval) < now
})
{
*job_meta = Some((runner_id, now));
pop_job = Some(*job_id);
@ -144,73 +163,61 @@ pub mod memory_storage {
}
if let Some(job_id) = pop_job {
return inner.jobs.get(&job_id).cloned();
return inner
.jobs
.get(&job_id)
.map(|(job_info, _)| job_info.clone());
}
None
}
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let job_id = JobId(job_id);
let runner_id = RunnerId(runner_id);
let mut inner = self.inner.lock().unwrap();
let queue = if let Some(job) = inner.jobs.get(&job_id) {
job.queue.clone()
let queue_key = if let Some((job, queue_time_id)) = inner.jobs.get(&job_id) {
(job.queue.clone(), *queue_time_id)
} else {
return;
};
for (_, (found_job_id, _, found_job_meta)) in inner.queue_jobs.range_mut((
Bound::Excluded((queue.clone(), lower_bound)),
Bound::Included((queue, upper_bound)),
)) {
if *found_job_id == job_id {
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
return;
}
if let Some((_, _, found_job_meta)) = inner.queue_jobs.get_mut(&queue_key) {
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
} else {
metrics::counter!("background-jobs.memory.heartbeat.missing-queue-job")
.increment(1);
tracing::warn!("Missing job meta for {queue_key:?}");
}
}
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let job_id = JobId(job_id);
let mut inner = self.inner.lock().unwrap();
let job = inner.jobs.remove(&job_id)?;
let (job, queue_time_id) = inner.jobs.remove(&job_id)?;
let queue_key = (job.queue.clone(), queue_time_id);
let mut key = None;
for (found_key, (found_job_id, _, _)) in inner.queue_jobs.range_mut((
Bound::Excluded((job.queue.clone(), lower_bound)),
Bound::Included((job.queue.clone(), upper_bound)),
)) {
if *found_job_id == job_id {
key = Some(found_key.clone());
break;
}
}
if let Some(key) = key {
if inner.queue_jobs.remove(&key).is_none() {
tracing::warn!("failed to remove {key:?}");
}
if inner.queue_jobs.remove(&queue_key).is_none() {
metrics::counter!("background-jobs.memory.remove.missing-queue-job").increment(1);
tracing::warn!("failed to remove job meta for {queue_key:?}");
}
Some(job)
}
fn insert(&self, job: JobInfo) -> Uuid {
let id = job.id;
let id = JobId(job.id);
let queue = job.queue.clone();
let queue_time_id = job.next_queue_id();
let queue_time_id = QueueTimeId(job.next_queue_id());
let heartbeat_interval = job.heartbeat_interval;
let mut inner = self.inner.lock().unwrap();
inner.jobs.insert(id, job);
inner.jobs.insert(id, (job, queue_time_id));
inner.queue_jobs.insert(
(queue.clone(), queue_time_id),
@ -223,10 +230,23 @@ pub mod memory_storage {
inner.queues.entry(queue).or_default().notify(1);
id
metrics::gauge!("background-jobs.memory.insert.queues")
.set(recordable(inner.queues.len()));
metrics::gauge!("background-jobs.memory.insert.jobs").set(recordable(inner.jobs.len()));
metrics::gauge!("background-jobs.memory.insert.queue-jobs")
.set(recordable(inner.queue_jobs.len()));
id.0
}
}
fn recordable(value: usize) -> u32 {
let value = value as u64;
let value = value % u64::from(u32::MAX);
value as _
}
#[async_trait::async_trait]
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
type Error = Infallible;