Compare commits

...

5 commits

11 changed files with 352 additions and 87 deletions

View file

@ -6,5 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.9.0"
anyhow = "1.0.79"
background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." }
serde = { version = "1.0.195", features = ["derive"] }
tokio = { version = "1.35.1", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

View file

@ -1,9 +1,114 @@
use background_jobs::postgres::Storage;
use actix_rt::Arbiter;
use background_jobs::{
postgres::Storage, ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig,
};
// use background_jobs_sled_storage::Storage;
use std::{
future::{ready, Ready},
time::{Duration, SystemTime},
};
use tracing::info;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
const DEFAULT_QUEUE: &str = "default";
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_u64: u64,
other_u64: u64,
}
#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt::fmt()
.with_env_filter(env_filter)
.init();
// Set up our Storage
let storage =
Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?;
println!("Hello, world!");
let arbiter = Arbiter::new();
// Configure and start our workers
let queue_handle =
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start();
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2)).await?;
queue_handle.queue(MyJob::new(3, 4)).await?;
queue_handle.queue(MyJob::new(5, 6)).await?;
for i in 0..20 {
queue_handle
.schedule(
MyJob::new(7 + i, 8 + i),
SystemTime::now() + Duration::from_secs(i),
)
.await?;
}
// Block on Actix
actix_rt::signal::ctrl_c().await?;
arbiter.stop();
let _ = arbiter.join();
Ok(())
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl MyJob {
pub fn new(some_u64: u64, other_u64: u64) -> Self {
MyJob {
some_u64,
other_u64,
}
}
}
impl Job for MyJob {
type State = MyState;
type Future = Ready<anyhow::Result<()>>;
type Spawner = ActixSpawner;
// The name of the job. It is super important that each job has a unique name,
// because otherwise one job will overwrite another job when they're being
// registered.
const NAME: &'static str = "MyJob";
// The queue that this processor belongs to
//
// Workers have the option to subscribe to specific queues, so this is important to
// determine which worker will call the processor
//
// Jobs can optionally override the queue they're spawned on
const QUEUE: &'static str = DEFAULT_QUEUE;
// The number of times background-jobs should try to retry a job before giving up
//
// Jobs can optionally override this value
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
fn run(self, state: MyState) -> Self::Future {
info!("{}: args, {:?}", state.app_name, self);
ready(Ok(()))
}
}

View file

@ -55,8 +55,10 @@ async fn heartbeat_job<F: Future>(
future: F,
job_id: Uuid,
runner_id: Uuid,
heartbeat_interval: u64,
) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
let mut interval =
actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut future = std::pin::pin!(future);
@ -173,6 +175,7 @@ pub(crate) async fn local_worker<State, Extras>(
let process_span = make_span(id, &queue, "process");
let job_id = job.id;
let heartbeat_interval = job.heartbeat_interval;
let return_job = process_span
.in_scope(|| {
heartbeat_job(
@ -180,6 +183,7 @@ pub(crate) async fn local_worker<State, Extras>(
time_job(processors.process(job), job_id),
job_id,
id,
heartbeat_interval,
)
})
.instrument(process_span)

View file

@ -74,15 +74,14 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being
/// considered dead.
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
///
/// Defaults to 15 seconds
/// Defaults to 5 seconds
/// Jobs can override
const TIMEOUT: u64 = 15_000;
const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job.
///
@ -118,13 +117,12 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
Self::BACKOFF
}
/// Define the maximum number of milliseconds this job should be allowed to run before being
/// considered dead.
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
fn timeout(&self) -> u64 {
Self::TIMEOUT
fn heartbeat_interval(&self) -> u64 {
Self::HEARTBEAT_INTERVAL
}
}
@ -138,7 +136,7 @@ where
job.queue().to_owned(),
job.max_retries(),
job.backoff_strategy(),
job.timeout(),
job.heartbeat_interval(),
serde_json::to_value(job).map_err(|_| ToJson)?,
);

View file

@ -60,7 +60,7 @@ pub struct NewJobInfo {
/// Milliseconds from execution until the job is considered dead
///
/// This is important for storage implementations to reap unfinished jobs
timeout: u64,
heartbeat_interval: u64,
}
impl NewJobInfo {
@ -73,7 +73,7 @@ impl NewJobInfo {
queue: String,
max_retries: MaxRetries,
backoff_strategy: Backoff,
timeout: u64,
heartbeat_interval: u64,
args: Value,
) -> Self {
NewJobInfo {
@ -83,7 +83,7 @@ impl NewJobInfo {
max_retries,
next_queue: None,
backoff_strategy,
timeout,
heartbeat_interval,
}
}
@ -113,7 +113,7 @@ impl NewJobInfo {
max_retries: self.max_retries,
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
backoff_strategy: self.backoff_strategy,
timeout: self.timeout,
heartbeat_interval: self.heartbeat_interval,
}
}
}
@ -159,7 +159,7 @@ pub struct JobInfo {
/// Milliseconds from execution until the job is considered dead
///
/// This is important for storage implementations to reap unfinished jobs
pub timeout: u64,
pub heartbeat_interval: u64,
}
impl JobInfo {

View file

@ -272,16 +272,20 @@ pub mod memory_storage {
};
match result {
// successful jobs are removed
JobResult::Success => Ok(true),
JobResult::Unregistered | JobResult::Unexecuted => Ok(true),
JobResult::Failure => {
if job.prepare_retry() {
self.insert(job);
return Ok(false);
} else {
Ok(true)
}
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unregistered | JobResult::Unexecuted => {
self.insert(job);
Ok(false)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job);
Ok(false)
}
// dead jobs are removed
JobResult::Failure => Ok(true),
}
}
}

View file

@ -76,15 +76,14 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
/// Jobs can override
const BACKOFF: Backoff = Backoff::Exponential(2);
/// Define the maximum number of milliseconds a job should be allowed to run before being
/// considered dead.
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
///
/// Defaults to 15 seconds
/// Defaults to 5 seconds
/// Jobs can override
const TIMEOUT: u64 = 15_000;
const HEARTBEAT_INTERVAL: u64 = 5_000;
/// Users of this library must define what it means to run a job.
///
@ -120,13 +119,12 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
Self::BACKOFF
}
/// Define the maximum number of milliseconds this job should be allowed to run before being
/// considered dead.
/// Define how often a job should update its heartbeat timestamp
///
/// This is important for allowing the job server to reap processes that were started but never
/// completed.
fn timeout(&self) -> u64 {
Self::TIMEOUT
fn heartbeat_interval(&self) -> u64 {
Self::HEARTBEAT_INTERVAL
}
}
@ -156,7 +154,7 @@ where
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
const TIMEOUT: u64 = <Self as UnsendJob>::TIMEOUT;
const HEARTBEAT_INTERVAL: u64 = <Self as UnsendJob>::HEARTBEAT_INTERVAL;
fn run(self, state: Self::State) -> Self::Future {
UnwrapFuture(T::Spawner::spawn(
@ -180,7 +178,7 @@ where
UnsendJob::backoff_strategy(self)
}
fn timeout(&self) -> u64 {
UnsendJob::timeout(self)
fn heartbeat_interval(&self) -> u64 {
UnsendJob::heartbeat_interval(self)
}
}

View file

@ -10,19 +10,24 @@ use std::{
time::Duration,
};
use background_jobs_core::{Backoff, JobInfo, MaxRetries, NewJobInfo, ReturnJobInfo};
use background_jobs_core::{Backoff, JobInfo, JobResult, MaxRetries, NewJobInfo, ReturnJobInfo};
use dashmap::DashMap;
use diesel::prelude::*;
use diesel::{
data_types::PgInterval,
dsl::IntervalDsl,
prelude::*,
sql_types::{Interval, Timestamp},
};
use diesel_async::{
pooled_connection::{
deadpool::{BuildError, Hook, Pool, PoolError},
AsyncDieselConnectionManager, ManagerConfig,
},
AsyncConnection, AsyncPgConnection, RunQueryDsl,
AsyncPgConnection, RunQueryDsl,
};
use futures_core::future::BoxFuture;
use serde_json::Value;
use time::{OffsetDateTime, PrimitiveDateTime};
use time::PrimitiveDateTime;
use tokio::{sync::Notify, task::JoinHandle};
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
use tracing::Instrument;
@ -122,7 +127,7 @@ struct PostgresJob {
backoff_multiplier: i32,
backoff: BackoffStrategy,
next_queue: PrimitiveDateTime,
timeout: i32,
heartbeat_interval: PgInterval,
}
impl From<JobInfo> for PostgresJob {
@ -136,9 +141,11 @@ impl From<JobInfo> for PostgresJob {
max_retries,
backoff_strategy,
next_queue,
timeout,
heartbeat_interval,
} = value;
let next_queue = next_queue.to_offset(time::UtcOffset::UTC);
PostgresJob {
id,
name,
@ -162,7 +169,7 @@ impl From<JobInfo> for PostgresJob {
Backoff::Exponential(_) => BackoffStrategy::Exponential,
},
next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()),
timeout: timeout as _,
heartbeat_interval: (heartbeat_interval as i32).milliseconds(),
}
}
}
@ -180,7 +187,7 @@ impl From<PostgresJob> for JobInfo {
backoff_multiplier,
backoff,
next_queue,
timeout,
heartbeat_interval,
} = value;
JobInfo {
@ -198,7 +205,7 @@ impl From<PostgresJob> for JobInfo {
BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _),
},
next_queue: next_queue.assume_utc(),
timeout: timeout as _,
heartbeat_interval: (heartbeat_interval.microseconds / 1_000) as _,
}
}
}
@ -233,42 +240,134 @@ impl background_jobs_core::Storage for Storage {
}
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
let postgres_job: PostgresJob = job.build().into();
let id = postgres_job.id;
self.insert(job.build()).await
}
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
async fn pop(&self, in_queue: &str, in_runner_id: Uuid) -> Result<JobInfo, Self::Error> {
loop {
tracing::trace!("pop: looping");
{
use schema::job_queue::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
postgres_job
.insert_into(job_queue)
let notifier: Arc<Notify> = self
.inner
.queue_notifications
.entry(String::from(in_queue))
.or_insert_with(|| Arc::new(Notify::const_new()))
.clone();
diesel::sql_query("LISTEN queue_status_channel;")
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
let count = {
use schema::job_queue::dsl::*;
diesel::update(job_queue)
.filter(heartbeat.is_not_null().and(heartbeat.assume_not_null().le(
// not allowed to multiply heartbeat_interval. thanks diesel
diesel::dsl::sql::<Timestamp>("NOW() - heartbeat_interval * 5"),
)))
.set((
heartbeat.eq(Option::<PrimitiveDateTime>::None),
status.eq(JobStatus::New),
runner_id.eq(Option::<Uuid>::None),
))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?
};
if count > 0 {
tracing::info!("Reset {count} jobs");
}
let id_query = {
use schema::job_queue::dsl::*;
let queue_alias = diesel::alias!(schema::job_queue as queue_alias);
queue_alias
.select(queue_alias.field(id))
.filter(
queue_alias
.field(status)
.eq(JobStatus::New)
.and(queue_alias.field(queue).eq(in_queue))
.and(queue_alias.field(next_queue).le(diesel::dsl::now)),
)
.order(queue_alias.field(next_queue))
.for_update()
.skip_locked()
.single_value()
};
let opt = {
use schema::job_queue::dsl::*;
diesel::update(job_queue)
.filter(id.nullable().eq(id_query))
.filter(status.eq(JobStatus::New))
.set((
heartbeat.eq(diesel::dsl::now),
status.eq(JobStatus::Running),
runner_id.eq(in_runner_id),
))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
if let Some(postgres_job) = opt {
return Ok(postgres_job.into());
}
let next_queue = {
use schema::job_queue::dsl::*;
job_queue
.filter(queue.eq(in_queue).and(status.eq(JobStatus::New)))
.select(diesel::dsl::sql::<Interval>("NOW() - next_queue"))
.get_result::<PgInterval>(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
let sleep_duration = next_queue
.map(|interval| {
if interval.microseconds < 0 {
Duration::from_micros(interval.microseconds.abs_diff(0))
} else {
Duration::from_secs(0)
}
})
.unwrap_or(Duration::from_secs(5));
drop(conn);
if tokio::time::timeout(sleep_duration, notifier.notified())
.await
.is_ok()
{
tracing::debug!("Notified");
} else {
tracing::debug!("Timed out");
}
}
Ok(id)
}
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
todo!()
}
async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> {
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let now = to_primitive(OffsetDateTime::now_utc());
{
use schema::job_queue::dsl::*;
diesel::update(job_queue)
.filter(id.eq(job_id))
.set((
heartbeat.eq(PrimitiveDateTime::new(now.date(), now.time())),
runner_id.eq(in_runner_id),
))
.set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id)))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
@ -278,13 +377,45 @@ impl background_jobs_core::Storage for Storage {
}
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error> {
todo!()
}
}
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
fn to_primitive(timestamp: OffsetDateTime) -> PrimitiveDateTime {
let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
PrimitiveDateTime::new(timestamp.date(), timestamp.time())
let job = {
use schema::job_queue::dsl::*;
diesel::delete(job_queue)
.filter(id.eq(return_job_info.id))
.returning(PostgresJob::as_returning())
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
};
let mut job: JobInfo = if let Some(job) = job {
job.into()
} else {
return Ok(true);
};
match return_job_info.result {
// successful jobs are removed
JobResult::Success => Ok(true),
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unexecuted | JobResult::Unregistered => {
self.insert(job).await?;
Ok(false)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job).await?;
Ok(false)
}
// dead jobs are removed
JobResult::Failure => Ok(true),
}
}
}
impl Storage {
@ -348,6 +479,25 @@ impl Storage {
Ok(Storage { inner, drop_handle })
}
async fn insert(&self, job_info: JobInfo) -> Result<Uuid, PostgresError> {
let postgres_job: PostgresJob = job_info.into();
let id = postgres_job.id;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
{
use schema::job_queue::dsl::*;
postgres_job
.insert_into(job_queue)
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
}
Ok(id)
}
}
impl<'a> JobNotifierState<'a> {

View file

@ -21,7 +21,10 @@ pub(crate) fn migration() -> String {
t.add_column("backoff_multiplier", types::integer().nullable(false));
t.add_column("backoff", types::custom("backoff_strategy").nullable(false));
t.add_column("next_queue", types::datetime().nullable(false));
t.add_column("timeout", types::integer().nullable(false));
t.add_column(
"heartbeat_interval",
types::custom("INTERVAL").nullable(false),
);
t.add_column(
"runner_id",
types::uuid().nullable(true).indexed(false).unique(false),

View file

@ -31,7 +31,7 @@ diesel::table! {
backoff_multiplier -> Int4,
backoff -> BackoffStrategy,
next_queue -> Timestamp,
timeout -> Int4,
heartbeat_interval -> Interval,
runner_id -> Nullable<Uuid>,
status -> JobStatus,
heartbeat -> Nullable<Timestamp>,

View file

@ -133,22 +133,20 @@ impl background_jobs_core::Storage for Storage {
};
match result {
JobResult::Success => {
// ok
Ok(true)
}
// successful jobs are removed
JobResult::Success => Ok(true),
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unexecuted | JobResult::Unregistered => {
// TODO: handle
Ok(true)
self.insert(job)?;
Ok(false)
}
JobResult::Failure => {
if job.prepare_retry() {
self.insert(job)?;
Ok(false)
} else {
Ok(true)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
self.insert(job)?;
Ok(false)
}
// dead jobs are removed
JobResult::Failure => Ok(true),
}
}
}