Compare commits

...

5 commits

11 changed files with 352 additions and 87 deletions

View file

@ -6,5 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix-rt = "2.9.0"
anyhow = "1.0.79"
background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." } background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." }
serde = { version = "1.0.195", features = ["derive"] }
tokio = { version = "1.35.1", features = ["full"] } tokio = { version = "1.35.1", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

View file

@ -1,9 +1,114 @@
use background_jobs::postgres::Storage; use actix_rt::Arbiter;
use background_jobs::{
postgres::Storage, ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig,
};
// use background_jobs_sled_storage::Storage;
use std::{
future::{ready, Ready},
time::{Duration, SystemTime},
};
use tracing::info;
use tracing_subscriber::EnvFilter;
#[tokio::main] const DEFAULT_QUEUE: &str = "default";
async fn main() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_u64: u64,
other_u64: u64,
}
#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt::fmt()
.with_env_filter(env_filter)
.init();
// Set up our Storage
let storage = let storage =
Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?; Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?;
println!("Hello, world!");
let arbiter = Arbiter::new();
// Configure and start our workers
let queue_handle =
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start();
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2)).await?;
queue_handle.queue(MyJob::new(3, 4)).await?;
queue_handle.queue(MyJob::new(5, 6)).await?;
for i in 0..20 {
queue_handle
.schedule(
MyJob::new(7 + i, 8 + i),
SystemTime::now() + Duration::from_secs(i),
)
.await?;
}
// Block on Actix
actix_rt::signal::ctrl_c().await?;
arbiter.stop();
let _ = arbiter.join();
Ok(()) Ok(())
} }
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl MyJob {
pub fn new(some_u64: u64, other_u64: u64) -> Self {
MyJob {
some_u64,
other_u64,
}
}
}
impl Job for MyJob {
type State = MyState;
type Future = Ready<anyhow::Result<()>>;
type Spawner = ActixSpawner;
// The name of the job. It is super important that each job has a unique name,
// because otherwise one job will overwrite another job when they're being
// registered.
const NAME: &'static str = "MyJob";
// The queue that this processor belongs to
//
// Workers have the option to subscribe to specific queues, so this is important to
// determine which worker will call the processor
//
// Jobs can optionally override the queue they're spawned on
const QUEUE: &'static str = DEFAULT_QUEUE;
// The number of times background-jobs should try to retry a job before giving up
//
// Jobs can optionally override this value
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
fn run(self, state: MyState) -> Self::Future {
info!("{}: args, {:?}", state.app_name, self);
ready(Ok(()))
}
}

View file

@ -55,8 +55,10 @@ async fn heartbeat_job<F: Future>(
future: F, future: F,
job_id: Uuid, job_id: Uuid,
runner_id: Uuid, runner_id: Uuid,
heartbeat_interval: u64,
) -> F::Output { ) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); let mut interval =
actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut future = std::pin::pin!(future); let mut future = std::pin::pin!(future);
@ -173,6 +175,7 @@ pub(crate) async fn local_worker<State, Extras>(
let process_span = make_span(id, &queue, "process"); let process_span = make_span(id, &queue, "process");
let job_id = job.id; let job_id = job.id;
let heartbeat_interval = job.heartbeat_interval;
let return_job = process_span let return_job = process_span
.in_scope(|| { .in_scope(|| {
heartbeat_job( heartbeat_job(
@ -180,6 +183,7 @@ pub(crate) async fn local_worker<State, Extras>(
time_job(processors.process(job), job_id), time_job(processors.process(job), job_id),
job_id, job_id,
id, id,
heartbeat_interval,
) )
}) })
.instrument(process_span) .instrument(process_span)

View file

@ -74,15 +74,14 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// Jobs can override /// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2); const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
/// ///
/// Defaults to 15 seconds /// Defaults to 5 seconds
/// Jobs can override /// Jobs can override
const TIMEOUT: u64 = 15_000; const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
@ -118,13 +117,12 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
Self::BACKOFF Self::BACKOFF
} }
/// Define the maximum number of milliseconds this job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
Self::TIMEOUT Self::HEARTBEAT_INTERVAL
} }
} }
@ -138,7 +136,7 @@ where
job.queue().to_owned(), job.queue().to_owned(),
job.max_retries(), job.max_retries(),
job.backoff_strategy(), job.backoff_strategy(),
job.timeout(), job.heartbeat_interval(),
serde_json::to_value(job).map_err(|_| ToJson)?, serde_json::to_value(job).map_err(|_| ToJson)?,
); );

View file

@ -60,7 +60,7 @@ pub struct NewJobInfo {
/// Milliseconds from execution until the job is considered dead /// Milliseconds from execution until the job is considered dead
/// ///
/// This is important for storage implementations to reap unfinished jobs /// This is important for storage implementations to reap unfinished jobs
timeout: u64, heartbeat_interval: u64,
} }
impl NewJobInfo { impl NewJobInfo {
@ -73,7 +73,7 @@ impl NewJobInfo {
queue: String, queue: String,
max_retries: MaxRetries, max_retries: MaxRetries,
backoff_strategy: Backoff, backoff_strategy: Backoff,
timeout: u64, heartbeat_interval: u64,
args: Value, args: Value,
) -> Self { ) -> Self {
NewJobInfo { NewJobInfo {
@ -83,7 +83,7 @@ impl NewJobInfo {
max_retries, max_retries,
next_queue: None, next_queue: None,
backoff_strategy, backoff_strategy,
timeout, heartbeat_interval,
} }
} }
@ -113,7 +113,7 @@ impl NewJobInfo {
max_retries: self.max_retries, max_retries: self.max_retries,
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()), next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
backoff_strategy: self.backoff_strategy, backoff_strategy: self.backoff_strategy,
timeout: self.timeout, heartbeat_interval: self.heartbeat_interval,
} }
} }
} }
@ -159,7 +159,7 @@ pub struct JobInfo {
/// Milliseconds from execution until the job is considered dead /// Milliseconds from execution until the job is considered dead
/// ///
/// This is important for storage implementations to reap unfinished jobs /// This is important for storage implementations to reap unfinished jobs
pub timeout: u64, pub heartbeat_interval: u64,
} }
impl JobInfo { impl JobInfo {

View file

@ -272,16 +272,20 @@ pub mod memory_storage {
}; };
match result { match result {
// successful jobs are removed
JobResult::Success => Ok(true), JobResult::Success => Ok(true),
JobResult::Unregistered | JobResult::Unexecuted => Ok(true), // Unregistered or Unexecuted jobs are restored as-is
JobResult::Failure => { JobResult::Unregistered | JobResult::Unexecuted => {
if job.prepare_retry() { self.insert(job);
self.insert(job); Ok(false)
return Ok(false);
} else {
Ok(true)
}
} }
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job);
Ok(false)
}
// dead jobs are removed
JobResult::Failure => Ok(true),
} }
} }
} }

View file

@ -76,15 +76,14 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
/// Jobs can override /// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2); const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
/// ///
/// Defaults to 15 seconds /// Defaults to 5 seconds
/// Jobs can override /// Jobs can override
const TIMEOUT: u64 = 15_000; const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job. /// Users of this library must define what it means to run a job.
/// ///
@ -120,13 +119,12 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
Self::BACKOFF Self::BACKOFF
} }
/// Define the maximum number of milliseconds this job should be allowed to run before being /// Define how often a job should update its heartbeat timestamp
/// considered dead.
/// ///
/// This is important for allowing the job server to reap processes that were started but never /// This is important for allowing the job server to reap processes that were started but never
/// completed. /// completed.
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
Self::TIMEOUT Self::HEARTBEAT_INTERVAL
} }
} }
@ -156,7 +154,7 @@ where
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE; const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES; const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF; const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
const TIMEOUT: u64 = <Self as UnsendJob>::TIMEOUT; const HEARTBEAT_INTERVAL: u64 = <Self as UnsendJob>::HEARTBEAT_INTERVAL;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
UnwrapFuture(T::Spawner::spawn( UnwrapFuture(T::Spawner::spawn(
@ -180,7 +178,7 @@ where
UnsendJob::backoff_strategy(self) UnsendJob::backoff_strategy(self)
} }
fn timeout(&self) -> u64 { fn heartbeat_interval(&self) -> u64 {
UnsendJob::timeout(self) UnsendJob::heartbeat_interval(self)
} }
} }

View file

@ -10,19 +10,24 @@ use std::{
time::Duration, time::Duration,
}; };
use background_jobs_core::{Backoff, JobInfo, MaxRetries, NewJobInfo, ReturnJobInfo}; use background_jobs_core::{Backoff, JobInfo, JobResult, MaxRetries, NewJobInfo, ReturnJobInfo};
use dashmap::DashMap; use dashmap::DashMap;
use diesel::prelude::*; use diesel::{
data_types::PgInterval,
dsl::IntervalDsl,
prelude::*,
sql_types::{Interval, Timestamp},
};
use diesel_async::{ use diesel_async::{
pooled_connection::{ pooled_connection::{
deadpool::{BuildError, Hook, Pool, PoolError}, deadpool::{BuildError, Hook, Pool, PoolError},
AsyncDieselConnectionManager, ManagerConfig, AsyncDieselConnectionManager, ManagerConfig,
}, },
AsyncConnection, AsyncPgConnection, RunQueryDsl, AsyncPgConnection, RunQueryDsl,
}; };
use futures_core::future::BoxFuture; use futures_core::future::BoxFuture;
use serde_json::Value; use serde_json::Value;
use time::{OffsetDateTime, PrimitiveDateTime}; use time::PrimitiveDateTime;
use tokio::{sync::Notify, task::JoinHandle}; use tokio::{sync::Notify, task::JoinHandle};
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket}; use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
use tracing::Instrument; use tracing::Instrument;
@ -122,7 +127,7 @@ struct PostgresJob {
backoff_multiplier: i32, backoff_multiplier: i32,
backoff: BackoffStrategy, backoff: BackoffStrategy,
next_queue: PrimitiveDateTime, next_queue: PrimitiveDateTime,
timeout: i32, heartbeat_interval: PgInterval,
} }
impl From<JobInfo> for PostgresJob { impl From<JobInfo> for PostgresJob {
@ -136,9 +141,11 @@ impl From<JobInfo> for PostgresJob {
max_retries, max_retries,
backoff_strategy, backoff_strategy,
next_queue, next_queue,
timeout, heartbeat_interval,
} = value; } = value;
let next_queue = next_queue.to_offset(time::UtcOffset::UTC);
PostgresJob { PostgresJob {
id, id,
name, name,
@ -162,7 +169,7 @@ impl From<JobInfo> for PostgresJob {
Backoff::Exponential(_) => BackoffStrategy::Exponential, Backoff::Exponential(_) => BackoffStrategy::Exponential,
}, },
next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()), next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()),
timeout: timeout as _, heartbeat_interval: (heartbeat_interval as i32).milliseconds(),
} }
} }
} }
@ -180,7 +187,7 @@ impl From<PostgresJob> for JobInfo {
backoff_multiplier, backoff_multiplier,
backoff, backoff,
next_queue, next_queue,
timeout, heartbeat_interval,
} = value; } = value;
JobInfo { JobInfo {
@ -198,7 +205,7 @@ impl From<PostgresJob> for JobInfo {
BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _), BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _),
}, },
next_queue: next_queue.assume_utc(), next_queue: next_queue.assume_utc(),
timeout: timeout as _, heartbeat_interval: (heartbeat_interval.microseconds / 1_000) as _,
} }
} }
} }
@ -233,42 +240,134 @@ impl background_jobs_core::Storage for Storage {
} }
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> { async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
let postgres_job: PostgresJob = job.build().into(); self.insert(job.build()).await
let id = postgres_job.id; }
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; async fn pop(&self, in_queue: &str, in_runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
tracing::trace!("pop: looping");
{ let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
use schema::job_queue::dsl::*;
postgres_job let notifier: Arc<Notify> = self
.insert_into(job_queue) .inner
.queue_notifications
.entry(String::from(in_queue))
.or_insert_with(|| Arc::new(Notify::const_new()))
.clone();
diesel::sql_query("LISTEN queue_status_channel;")
.execute(&mut conn) .execute(&mut conn)
.await .await
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
let count = {
use schema::job_queue::dsl::*;
diesel::update(job_queue)
.filter(heartbeat.is_not_null().and(heartbeat.assume_not_null().le(
// not allowed to multiply heartbeat_interval. thanks diesel
diesel::dsl::sql::<Timestamp>("NOW() - heartbeat_interval * 5"),
)))
.set((
heartbeat.eq(Option::<PrimitiveDateTime>::None),
status.eq(JobStatus::New),
runner_id.eq(Option::<Uuid>::None),
))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?
};
if count > 0 {
tracing::info!("Reset {count} jobs");
}
let id_query = {
use schema::job_queue::dsl::*;
let queue_alias = diesel::alias!(schema::job_queue as queue_alias);
queue_alias
.select(queue_alias.field(id))
.filter(
queue_alias
.field(status)
.eq(JobStatus::New)
.and(queue_alias.field(queue).eq(in_queue))
.and(queue_alias.field(next_queue).le(diesel::dsl::now)),
)
.order(queue_alias.field(next_queue))
.for_update()
.skip_locked()
.single_value()
};
let opt = {
use schema::job_queue::dsl::*;
diesel::update(job_queue)
.filter(id.nullable().eq(id_query))
.filter(status.eq(JobStatus::New))
.set((
heartbeat.eq(diesel::dsl::now),
status.eq(JobStatus::Running),
runner_id.eq(in_runner_id),
))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
if let Some(postgres_job) = opt {
return Ok(postgres_job.into());
}
let next_queue = {
use schema::job_queue::dsl::*;
job_queue
.filter(queue.eq(in_queue).and(status.eq(JobStatus::New)))
.select(diesel::dsl::sql::<Interval>("NOW() - next_queue"))
.get_result::<PgInterval>(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
let sleep_duration = next_queue
.map(|interval| {
if interval.microseconds < 0 {
Duration::from_micros(interval.microseconds.abs_diff(0))
} else {
Duration::from_secs(0)
}
})
.unwrap_or(Duration::from_secs(5));
drop(conn);
if tokio::time::timeout(sleep_duration, notifier.notified())
.await
.is_ok()
{
tracing::debug!("Notified");
} else {
tracing::debug!("Timed out");
}
} }
Ok(id)
}
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
todo!()
} }
async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> { async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> {
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let now = to_primitive(OffsetDateTime::now_utc());
{ {
use schema::job_queue::dsl::*; use schema::job_queue::dsl::*;
diesel::update(job_queue) diesel::update(job_queue)
.filter(id.eq(job_id)) .filter(id.eq(job_id))
.set(( .set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id)))
heartbeat.eq(PrimitiveDateTime::new(now.date(), now.time())),
runner_id.eq(in_runner_id),
))
.execute(&mut conn) .execute(&mut conn)
.await .await
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
@ -278,13 +377,45 @@ impl background_jobs_core::Storage for Storage {
} }
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error> { async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error> {
todo!() let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
}
}
fn to_primitive(timestamp: OffsetDateTime) -> PrimitiveDateTime { let job = {
let timestamp = timestamp.to_offset(time::UtcOffset::UTC); use schema::job_queue::dsl::*;
PrimitiveDateTime::new(timestamp.date(), timestamp.time())
diesel::delete(job_queue)
.filter(id.eq(return_job_info.id))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
let mut job: JobInfo = if let Some(job) = job {
job.into()
} else {
return Ok(true);
};
match return_job_info.result {
// successful jobs are removed
JobResult::Success => Ok(true),
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unexecuted | JobResult::Unregistered => {
self.insert(job).await?;
Ok(false)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job).await?;
Ok(false)
}
// dead jobs are removed
JobResult::Failure => Ok(true),
}
}
} }
impl Storage { impl Storage {
@ -348,6 +479,25 @@ impl Storage {
Ok(Storage { inner, drop_handle }) Ok(Storage { inner, drop_handle })
} }
async fn insert(&self, job_info: JobInfo) -> Result<Uuid, PostgresError> {
let postgres_job: PostgresJob = job_info.into();
let id = postgres_job.id;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
{
use schema::job_queue::dsl::*;
postgres_job
.insert_into(job_queue)
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
}
Ok(id)
}
} }
impl<'a> JobNotifierState<'a> { impl<'a> JobNotifierState<'a> {

View file

@ -21,7 +21,10 @@ pub(crate) fn migration() -> String {
t.add_column("backoff_multiplier", types::integer().nullable(false)); t.add_column("backoff_multiplier", types::integer().nullable(false));
t.add_column("backoff", types::custom("backoff_strategy").nullable(false)); t.add_column("backoff", types::custom("backoff_strategy").nullable(false));
t.add_column("next_queue", types::datetime().nullable(false)); t.add_column("next_queue", types::datetime().nullable(false));
t.add_column("timeout", types::integer().nullable(false)); t.add_column(
"heartbeat_interval",
types::custom("INTERVAL").nullable(false),
);
t.add_column( t.add_column(
"runner_id", "runner_id",
types::uuid().nullable(true).indexed(false).unique(false), types::uuid().nullable(true).indexed(false).unique(false),

View file

@ -31,7 +31,7 @@ diesel::table! {
backoff_multiplier -> Int4, backoff_multiplier -> Int4,
backoff -> BackoffStrategy, backoff -> BackoffStrategy,
next_queue -> Timestamp, next_queue -> Timestamp,
timeout -> Int4, heartbeat_interval -> Interval,
runner_id -> Nullable<Uuid>, runner_id -> Nullable<Uuid>,
status -> JobStatus, status -> JobStatus,
heartbeat -> Nullable<Timestamp>, heartbeat -> Nullable<Timestamp>,

View file

@ -133,22 +133,20 @@ impl background_jobs_core::Storage for Storage {
}; };
match result { match result {
JobResult::Success => { // successful jobs are removed
// ok JobResult::Success => Ok(true),
Ok(true) // Unregistered or Unexecuted jobs are restored as-is
}
JobResult::Unexecuted | JobResult::Unregistered => { JobResult::Unexecuted | JobResult::Unregistered => {
// TODO: handle self.insert(job)?;
Ok(true) Ok(false)
} }
JobResult::Failure => { // retryable failed jobs are restored
if job.prepare_retry() { JobResult::Failure if job.prepare_retry() => {
self.insert(job)?; self.insert(job)?;
Ok(false) Ok(false)
} else {
Ok(true)
}
} }
// dead jobs are removed
JobResult::Failure => Ok(true),
} }
} }
} }