background-jobs/jobs-core/src/storage.rs

289 lines
9.6 KiB
Rust
Raw Normal View History

2022-11-19 20:38:47 +00:00
use crate::{JobInfo, NewJobInfo, ReturnJobInfo};
2024-01-08 00:52:09 +00:00
use std::error::Error;
2020-03-22 17:52:43 +00:00
use uuid::Uuid;
2018-11-05 02:13:06 +00:00
2019-05-28 00:01:21 +00:00
/// Define a storage backend for jobs
///
/// This crate provides a default implementation in the `memory_storage` module, which is backed by
/// HashMaps and uses counting to assign IDs. If jobs must be persistent across application
2019-11-05 22:09:23 +00:00
/// restarts, look into the [`sled-backed`](https://github.com/spacejam/sled) implementation from
2019-05-28 00:01:21 +00:00
/// the `background-jobs-sled-storage` crate.
2020-03-21 02:31:03 +00:00
#[async_trait::async_trait]
pub trait Storage: Clone + Send {
2019-05-28 00:01:21 +00:00
/// The error type used by the storage mechansim.
2020-03-21 02:31:03 +00:00
type Error: Error + Send + Sync;
/// Get the JobInfo for a given job ID
async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error>;
2024-01-08 00:52:09 +00:00
/// push a job into the queue
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error>;
2018-11-05 02:13:06 +00:00
2024-01-08 00:52:09 +00:00
/// pop a job from the provided queue
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error>;
2018-11-05 02:13:06 +00:00
2024-01-08 00:52:09 +00:00
/// mark a job as being actively worked on
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error>;
2019-09-22 17:12:08 +00:00
/// "Return" a job to the database, marking it for retry if needed
///
/// returns `true` if the job has not been requeued
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error>;
}
2019-05-28 00:01:21 +00:00
2019-09-22 17:12:08 +00:00
/// A default, in-memory implementation of a storage mechanism
2019-05-28 00:01:21 +00:00
pub mod memory_storage {
2024-01-08 00:52:09 +00:00
use crate::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
2022-12-09 23:38:22 +00:00
use event_listener::{Event, EventListener};
use std::{
2024-01-08 00:52:09 +00:00
collections::{BTreeMap, HashMap},
convert::Infallible,
2022-12-09 23:38:22 +00:00
future::Future,
2024-01-08 00:52:09 +00:00
ops::Bound,
pin::Pin,
sync::Arc,
sync::Mutex,
2024-01-08 00:52:09 +00:00
time::Duration,
};
2024-01-08 00:52:09 +00:00
use time::OffsetDateTime;
use uuid::{NoContext, Timestamp, Uuid};
2019-05-28 00:01:21 +00:00
/// Allows memory storage to set timeouts for when to retry checking a queue for a job
#[async_trait::async_trait]
pub trait Timer {
/// Race a future against the clock, returning an empty tuple if the clock wins
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where
2022-12-09 23:38:22 +00:00
F: Future + Send + Sync;
}
2019-05-28 00:01:21 +00:00
#[derive(Clone)]
2019-09-22 17:12:08 +00:00
/// An In-Memory store for jobs
pub struct Storage<T> {
timer: T,
2019-05-28 00:01:21 +00:00
inner: Arc<Mutex<Inner>>,
}
2024-01-08 00:52:09 +00:00
type OrderedKey = (String, Uuid);
type JobState = Option<(Uuid, OffsetDateTime)>;
type JobMeta = (Uuid, JobState);
2019-05-28 00:01:21 +00:00
struct Inner {
queues: HashMap<String, Event>,
2020-03-22 17:52:43 +00:00
jobs: HashMap<Uuid, JobInfo>,
2024-01-08 00:52:09 +00:00
queue_jobs: BTreeMap<OrderedKey, JobMeta>,
2019-05-28 00:01:21 +00:00
}
impl<T: Timer> Storage<T> {
2019-09-22 17:12:08 +00:00
/// Create a new, empty job store
pub fn new(timer: T) -> Self {
2019-05-28 00:01:21 +00:00
Storage {
inner: Arc::new(Mutex::new(Inner {
queues: HashMap::new(),
jobs: HashMap::new(),
2024-01-08 00:52:09 +00:00
queue_jobs: BTreeMap::new(),
2019-05-28 00:01:21 +00:00
})),
timer,
2019-05-28 00:01:21 +00:00
}
}
2022-12-09 23:38:22 +00:00
fn get(&self, job_id: Uuid) -> Option<JobInfo> {
self.inner.lock().unwrap().jobs.get(&job_id).cloned()
}
fn listener(&self, pop_queue: String) -> (Pin<Box<EventListener>>, Duration) {
2024-01-08 00:52:09 +00:00
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let now = OffsetDateTime::now_utc();
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
let mut inner = self.inner.lock().unwrap();
2022-12-09 23:38:22 +00:00
let listener = inner.queues.entry(pop_queue.clone()).or_default().listen();
2024-01-08 00:52:09 +00:00
let duration = inner
2024-01-08 00:52:09 +00:00
.queue_jobs
.range((
Bound::Excluded((pop_queue.clone(), lower_bound)),
Bound::Unbounded,
2024-01-08 00:52:09 +00:00
))
.filter(|(_, (_, meta))| meta.is_none())
.filter_map(|(_, (id, _))| inner.jobs.get(id))
.take_while(|JobInfo { queue, .. }| queue.as_str() == pop_queue.as_str())
.map(|JobInfo { next_queue, .. }| {
if *next_queue > now {
*next_queue - now
2024-01-08 00:52:09 +00:00
} else {
time::Duration::seconds(0)
2024-01-08 00:52:09 +00:00
}
})
.find_map(|duration| duration.try_into().ok());
2024-01-08 00:52:09 +00:00
(listener, duration.unwrap_or(Duration::from_secs(10)))
2022-12-09 23:38:22 +00:00
}
2024-01-08 00:52:09 +00:00
fn try_pop(&self, queue: &str, runner_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
let now = time::OffsetDateTime::now_utc();
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
let mut inner = self.inner.lock().unwrap();
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
let mut pop_job = None;
for (_, (job_id, job_meta)) in inner.queue_jobs.range_mut((
Bound::Excluded((queue.to_string(), lower_bound)),
Bound::Included((queue.to_string(), upper_bound)),
)) {
if job_meta.is_none()
|| job_meta.is_some_and(|(_, h)| h + time::Duration::seconds(30) < now)
{
*job_meta = Some((runner_id, now));
pop_job = Some(*job_id);
break;
2022-12-09 23:38:22 +00:00
}
2024-01-08 00:52:09 +00:00
}
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
if let Some(job_id) = pop_job {
return inner.jobs.get(&job_id).cloned();
2022-12-09 23:38:22 +00:00
}
None
}
2024-01-08 00:52:09 +00:00
fn set_heartbeat(&self, job_id: Uuid, runner_id: Uuid) {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
2022-12-09 23:38:22 +00:00
let mut inner = self.inner.lock().unwrap();
2024-01-08 00:52:09 +00:00
let queue = if let Some(job) = inner.jobs.get(&job_id) {
job.queue.clone()
} else {
return;
};
for (_, (found_job_id, found_job_meta)) in inner.queue_jobs.range_mut((
Bound::Excluded((queue.clone(), lower_bound)),
Bound::Included((queue, upper_bound)),
)) {
if *found_job_id == job_id {
*found_job_meta = Some((runner_id, OffsetDateTime::now_utc()));
return;
}
}
2022-12-09 23:38:22 +00:00
}
2024-01-08 00:52:09 +00:00
fn remove_job(&self, job_id: Uuid) -> Option<JobInfo> {
let lower_bound = Uuid::new_v7(Timestamp::from_unix(NoContext, 0, 0));
let upper_bound = Uuid::now_v7();
2022-12-09 23:38:22 +00:00
let mut inner = self.inner.lock().unwrap();
2024-01-08 00:52:09 +00:00
let job = inner.jobs.remove(&job_id)?;
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
let mut key = None;
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
for (found_key, (found_job_id, _)) in inner.queue_jobs.range_mut((
Bound::Excluded((job.queue.clone(), lower_bound)),
Bound::Included((job.queue.clone(), upper_bound)),
)) {
if *found_job_id == job_id {
key = Some(found_key.clone());
break;
}
}
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
if let Some(key) = key {
if inner.queue_jobs.remove(&key).is_none() {
tracing::warn!("failed to remove {key:?}");
}
2024-01-08 00:52:09 +00:00
}
Some(job)
2022-12-09 23:38:22 +00:00
}
2024-01-08 00:52:09 +00:00
fn insert(&self, job: JobInfo) -> Uuid {
let id = job.id;
let queue = job.queue.clone();
let queue_time_id = job.next_queue_id();
2022-12-09 23:38:22 +00:00
let mut inner = self.inner.lock().unwrap();
2024-01-08 00:52:09 +00:00
inner.jobs.insert(id, job);
2022-12-09 23:38:22 +00:00
2024-01-08 00:52:09 +00:00
inner
.queue_jobs
.insert((queue.clone(), queue_time_id), (id, None));
inner.queues.entry(queue).or_default().notify(1);
id
2022-12-09 23:38:22 +00:00
}
2019-05-28 00:01:21 +00:00
}
2020-03-21 02:31:03 +00:00
#[async_trait::async_trait]
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
2020-03-21 02:31:03 +00:00
type Error = Infallible;
2019-05-28 00:01:21 +00:00
async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
Ok(self.get(job_id))
}
2024-01-08 00:52:09 +00:00
/// push a job into the queue
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
Ok(self.insert(job.build()))
2019-05-28 00:01:21 +00:00
}
2024-01-08 00:52:09 +00:00
/// pop a job from the provided queue
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
2024-01-08 00:52:09 +00:00
let (listener, duration) = self.listener(queue.to_string());
2020-03-21 19:19:16 +00:00
2024-01-08 00:52:09 +00:00
if let Some(job) = self.try_pop(queue, runner_id) {
2022-12-09 23:38:22 +00:00
return Ok(job);
}
2024-01-08 00:52:09 +00:00
match self.timer.timeout(duration, listener).await {
Ok(()) => {
// listener wakeup
}
Err(()) => {
// timeout
}
}
2019-05-28 00:01:21 +00:00
}
}
2024-01-08 00:52:09 +00:00
/// mark a job as being actively worked on
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Self::Error> {
self.set_heartbeat(job_id, runner_id);
2019-05-28 00:01:21 +00:00
Ok(())
}
2024-01-08 00:52:09 +00:00
/// "Return" a job to the database, marking it for retry if needed
async fn complete(
&self,
ReturnJobInfo { id, result }: ReturnJobInfo,
) -> Result<bool, Self::Error> {
2024-01-08 00:52:09 +00:00
let mut job = if let Some(job) = self.remove_job(id) {
job
} else {
return Ok(true);
2024-01-08 00:52:09 +00:00
};
2019-05-28 00:01:21 +00:00
2024-01-08 00:52:09 +00:00
match result {
JobResult::Success => Ok(true),
JobResult::Unregistered | JobResult::Unexecuted => Ok(true),
2024-01-08 00:52:09 +00:00
JobResult::Failure => {
if job.prepare_retry() {
self.insert(job);
return Ok(false);
} else {
Ok(true)
2024-01-08 00:52:09 +00:00
}
}
}
2019-05-28 00:01:21 +00:00
}
}
}