Compare commits
5 commits
0f8b279e3f
...
5e25b2f11d
Author | SHA1 | Date | |
---|---|---|---|
asonix | 5e25b2f11d | ||
asonix | e2e784f012 | ||
asonix | ae90774bb1 | ||
asonix | ad0a295dd1 | ||
asonix | 63ee0d7cb7 |
|
@ -6,5 +6,10 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
actix-rt = "2.9.0"
|
||||||
|
anyhow = "1.0.79"
|
||||||
background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." }
|
background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." }
|
||||||
|
serde = { version = "1.0.195", features = ["derive"] }
|
||||||
tokio = { version = "1.35.1", features = ["full"] }
|
tokio = { version = "1.35.1", features = ["full"] }
|
||||||
|
tracing = "0.1.40"
|
||||||
|
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||||
|
|
|
@ -1,9 +1,114 @@
|
||||||
use background_jobs::postgres::Storage;
|
use actix_rt::Arbiter;
|
||||||
|
use background_jobs::{
|
||||||
|
postgres::Storage, ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig,
|
||||||
|
};
|
||||||
|
// use background_jobs_sled_storage::Storage;
|
||||||
|
use std::{
|
||||||
|
future::{ready, Ready},
|
||||||
|
time::{Duration, SystemTime},
|
||||||
|
};
|
||||||
|
use tracing::info;
|
||||||
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
#[tokio::main]
|
const DEFAULT_QUEUE: &str = "default";
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct MyState {
|
||||||
|
pub app_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct MyJob {
|
||||||
|
some_u64: u64,
|
||||||
|
other_u64: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||||
|
|
||||||
|
tracing_subscriber::fmt::fmt()
|
||||||
|
.with_env_filter(env_filter)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
// Set up our Storage
|
||||||
let storage =
|
let storage =
|
||||||
Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?;
|
Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?;
|
||||||
println!("Hello, world!");
|
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
|
// Configure and start our workers
|
||||||
|
let queue_handle =
|
||||||
|
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
|
||||||
|
.register::<MyJob>()
|
||||||
|
.set_worker_count(DEFAULT_QUEUE, 16)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
// Queue our jobs
|
||||||
|
queue_handle.queue(MyJob::new(1, 2)).await?;
|
||||||
|
queue_handle.queue(MyJob::new(3, 4)).await?;
|
||||||
|
queue_handle.queue(MyJob::new(5, 6)).await?;
|
||||||
|
for i in 0..20 {
|
||||||
|
queue_handle
|
||||||
|
.schedule(
|
||||||
|
MyJob::new(7 + i, 8 + i),
|
||||||
|
SystemTime::now() + Duration::from_secs(i),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block on Actix
|
||||||
|
actix_rt::signal::ctrl_c().await?;
|
||||||
|
|
||||||
|
arbiter.stop();
|
||||||
|
let _ = arbiter.join();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl MyState {
|
||||||
|
pub fn new(app_name: &str) -> Self {
|
||||||
|
MyState {
|
||||||
|
app_name: app_name.to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MyJob {
|
||||||
|
pub fn new(some_u64: u64, other_u64: u64) -> Self {
|
||||||
|
MyJob {
|
||||||
|
some_u64,
|
||||||
|
other_u64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for MyJob {
|
||||||
|
type State = MyState;
|
||||||
|
type Future = Ready<anyhow::Result<()>>;
|
||||||
|
type Spawner = ActixSpawner;
|
||||||
|
|
||||||
|
// The name of the job. It is super important that each job has a unique name,
|
||||||
|
// because otherwise one job will overwrite another job when they're being
|
||||||
|
// registered.
|
||||||
|
const NAME: &'static str = "MyJob";
|
||||||
|
|
||||||
|
// The queue that this processor belongs to
|
||||||
|
//
|
||||||
|
// Workers have the option to subscribe to specific queues, so this is important to
|
||||||
|
// determine which worker will call the processor
|
||||||
|
//
|
||||||
|
// Jobs can optionally override the queue they're spawned on
|
||||||
|
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||||
|
|
||||||
|
// The number of times background-jobs should try to retry a job before giving up
|
||||||
|
//
|
||||||
|
// Jobs can optionally override this value
|
||||||
|
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
|
||||||
|
|
||||||
|
fn run(self, state: MyState) -> Self::Future {
|
||||||
|
info!("{}: args, {:?}", state.app_name, self);
|
||||||
|
|
||||||
|
ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -55,8 +55,10 @@ async fn heartbeat_job<F: Future>(
|
||||||
future: F,
|
future: F,
|
||||||
job_id: Uuid,
|
job_id: Uuid,
|
||||||
runner_id: Uuid,
|
runner_id: Uuid,
|
||||||
|
heartbeat_interval: u64,
|
||||||
) -> F::Output {
|
) -> F::Output {
|
||||||
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
|
let mut interval =
|
||||||
|
actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval));
|
||||||
|
|
||||||
let mut future = std::pin::pin!(future);
|
let mut future = std::pin::pin!(future);
|
||||||
|
|
||||||
|
@ -173,6 +175,7 @@ pub(crate) async fn local_worker<State, Extras>(
|
||||||
|
|
||||||
let process_span = make_span(id, &queue, "process");
|
let process_span = make_span(id, &queue, "process");
|
||||||
let job_id = job.id;
|
let job_id = job.id;
|
||||||
|
let heartbeat_interval = job.heartbeat_interval;
|
||||||
let return_job = process_span
|
let return_job = process_span
|
||||||
.in_scope(|| {
|
.in_scope(|| {
|
||||||
heartbeat_job(
|
heartbeat_job(
|
||||||
|
@ -180,6 +183,7 @@ pub(crate) async fn local_worker<State, Extras>(
|
||||||
time_job(processors.process(job), job_id),
|
time_job(processors.process(job), job_id),
|
||||||
job_id,
|
job_id,
|
||||||
id,
|
id,
|
||||||
|
heartbeat_interval,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.instrument(process_span)
|
.instrument(process_span)
|
||||||
|
|
|
@ -74,15 +74,14 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
|
||||||
/// Jobs can override
|
/// Jobs can override
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(2);
|
const BACKOFF: Backoff = Backoff::Exponential(2);
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds a job should be allowed to run before being
|
/// Define how often a job should update its heartbeat timestamp
|
||||||
/// considered dead.
|
|
||||||
///
|
///
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
/// This is important for allowing the job server to reap processes that were started but never
|
||||||
/// completed.
|
/// completed.
|
||||||
///
|
///
|
||||||
/// Defaults to 15 seconds
|
/// Defaults to 5 seconds
|
||||||
/// Jobs can override
|
/// Jobs can override
|
||||||
const TIMEOUT: u64 = 15_000;
|
const HEARTBEAT_INTERVAL: u64 = 5_000;
|
||||||
|
|
||||||
/// Users of this library must define what it means to run a job.
|
/// Users of this library must define what it means to run a job.
|
||||||
///
|
///
|
||||||
|
@ -118,13 +117,12 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
|
||||||
Self::BACKOFF
|
Self::BACKOFF
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds this job should be allowed to run before being
|
/// Define how often a job should update its heartbeat timestamp
|
||||||
/// considered dead.
|
|
||||||
///
|
///
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
/// This is important for allowing the job server to reap processes that were started but never
|
||||||
/// completed.
|
/// completed.
|
||||||
fn timeout(&self) -> u64 {
|
fn heartbeat_interval(&self) -> u64 {
|
||||||
Self::TIMEOUT
|
Self::HEARTBEAT_INTERVAL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,7 +136,7 @@ where
|
||||||
job.queue().to_owned(),
|
job.queue().to_owned(),
|
||||||
job.max_retries(),
|
job.max_retries(),
|
||||||
job.backoff_strategy(),
|
job.backoff_strategy(),
|
||||||
job.timeout(),
|
job.heartbeat_interval(),
|
||||||
serde_json::to_value(job).map_err(|_| ToJson)?,
|
serde_json::to_value(job).map_err(|_| ToJson)?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ pub struct NewJobInfo {
|
||||||
/// Milliseconds from execution until the job is considered dead
|
/// Milliseconds from execution until the job is considered dead
|
||||||
///
|
///
|
||||||
/// This is important for storage implementations to reap unfinished jobs
|
/// This is important for storage implementations to reap unfinished jobs
|
||||||
timeout: u64,
|
heartbeat_interval: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NewJobInfo {
|
impl NewJobInfo {
|
||||||
|
@ -73,7 +73,7 @@ impl NewJobInfo {
|
||||||
queue: String,
|
queue: String,
|
||||||
max_retries: MaxRetries,
|
max_retries: MaxRetries,
|
||||||
backoff_strategy: Backoff,
|
backoff_strategy: Backoff,
|
||||||
timeout: u64,
|
heartbeat_interval: u64,
|
||||||
args: Value,
|
args: Value,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
NewJobInfo {
|
NewJobInfo {
|
||||||
|
@ -83,7 +83,7 @@ impl NewJobInfo {
|
||||||
max_retries,
|
max_retries,
|
||||||
next_queue: None,
|
next_queue: None,
|
||||||
backoff_strategy,
|
backoff_strategy,
|
||||||
timeout,
|
heartbeat_interval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ impl NewJobInfo {
|
||||||
max_retries: self.max_retries,
|
max_retries: self.max_retries,
|
||||||
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
|
next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()),
|
||||||
backoff_strategy: self.backoff_strategy,
|
backoff_strategy: self.backoff_strategy,
|
||||||
timeout: self.timeout,
|
heartbeat_interval: self.heartbeat_interval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ pub struct JobInfo {
|
||||||
/// Milliseconds from execution until the job is considered dead
|
/// Milliseconds from execution until the job is considered dead
|
||||||
///
|
///
|
||||||
/// This is important for storage implementations to reap unfinished jobs
|
/// This is important for storage implementations to reap unfinished jobs
|
||||||
pub timeout: u64,
|
pub heartbeat_interval: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobInfo {
|
impl JobInfo {
|
||||||
|
|
|
@ -272,16 +272,20 @@ pub mod memory_storage {
|
||||||
};
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
|
// successful jobs are removed
|
||||||
JobResult::Success => Ok(true),
|
JobResult::Success => Ok(true),
|
||||||
JobResult::Unregistered | JobResult::Unexecuted => Ok(true),
|
// Unregistered or Unexecuted jobs are restored as-is
|
||||||
JobResult::Failure => {
|
JobResult::Unregistered | JobResult::Unexecuted => {
|
||||||
if job.prepare_retry() {
|
self.insert(job);
|
||||||
self.insert(job);
|
Ok(false)
|
||||||
return Ok(false);
|
|
||||||
} else {
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// retryable failed jobs are restored
|
||||||
|
JobResult::Failure if job.prepare_retry() => {
|
||||||
|
self.insert(job);
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
// dead jobs are removed
|
||||||
|
JobResult::Failure => Ok(true),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,15 +76,14 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
|
||||||
/// Jobs can override
|
/// Jobs can override
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(2);
|
const BACKOFF: Backoff = Backoff::Exponential(2);
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds a job should be allowed to run before being
|
/// Define how often a job should update its heartbeat timestamp
|
||||||
/// considered dead.
|
|
||||||
///
|
///
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
/// This is important for allowing the job server to reap processes that were started but never
|
||||||
/// completed.
|
/// completed.
|
||||||
///
|
///
|
||||||
/// Defaults to 15 seconds
|
/// Defaults to 5 seconds
|
||||||
/// Jobs can override
|
/// Jobs can override
|
||||||
const TIMEOUT: u64 = 15_000;
|
const HEARTBEAT_INTERVAL: u64 = 5_000;
|
||||||
|
|
||||||
/// Users of this library must define what it means to run a job.
|
/// Users of this library must define what it means to run a job.
|
||||||
///
|
///
|
||||||
|
@ -120,13 +119,12 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static {
|
||||||
Self::BACKOFF
|
Self::BACKOFF
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Define the maximum number of milliseconds this job should be allowed to run before being
|
/// Define how often a job should update its heartbeat timestamp
|
||||||
/// considered dead.
|
|
||||||
///
|
///
|
||||||
/// This is important for allowing the job server to reap processes that were started but never
|
/// This is important for allowing the job server to reap processes that were started but never
|
||||||
/// completed.
|
/// completed.
|
||||||
fn timeout(&self) -> u64 {
|
fn heartbeat_interval(&self) -> u64 {
|
||||||
Self::TIMEOUT
|
Self::HEARTBEAT_INTERVAL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +154,7 @@ where
|
||||||
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
|
const QUEUE: &'static str = <Self as UnsendJob>::QUEUE;
|
||||||
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
|
const MAX_RETRIES: MaxRetries = <Self as UnsendJob>::MAX_RETRIES;
|
||||||
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
|
const BACKOFF: Backoff = <Self as UnsendJob>::BACKOFF;
|
||||||
const TIMEOUT: u64 = <Self as UnsendJob>::TIMEOUT;
|
const HEARTBEAT_INTERVAL: u64 = <Self as UnsendJob>::HEARTBEAT_INTERVAL;
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
UnwrapFuture(T::Spawner::spawn(
|
UnwrapFuture(T::Spawner::spawn(
|
||||||
|
@ -180,7 +178,7 @@ where
|
||||||
UnsendJob::backoff_strategy(self)
|
UnsendJob::backoff_strategy(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timeout(&self) -> u64 {
|
fn heartbeat_interval(&self) -> u64 {
|
||||||
UnsendJob::timeout(self)
|
UnsendJob::heartbeat_interval(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,19 +10,24 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use background_jobs_core::{Backoff, JobInfo, MaxRetries, NewJobInfo, ReturnJobInfo};
|
use background_jobs_core::{Backoff, JobInfo, JobResult, MaxRetries, NewJobInfo, ReturnJobInfo};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use diesel::prelude::*;
|
use diesel::{
|
||||||
|
data_types::PgInterval,
|
||||||
|
dsl::IntervalDsl,
|
||||||
|
prelude::*,
|
||||||
|
sql_types::{Interval, Timestamp},
|
||||||
|
};
|
||||||
use diesel_async::{
|
use diesel_async::{
|
||||||
pooled_connection::{
|
pooled_connection::{
|
||||||
deadpool::{BuildError, Hook, Pool, PoolError},
|
deadpool::{BuildError, Hook, Pool, PoolError},
|
||||||
AsyncDieselConnectionManager, ManagerConfig,
|
AsyncDieselConnectionManager, ManagerConfig,
|
||||||
},
|
},
|
||||||
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
AsyncPgConnection, RunQueryDsl,
|
||||||
};
|
};
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::future::BoxFuture;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use time::{OffsetDateTime, PrimitiveDateTime};
|
use time::PrimitiveDateTime;
|
||||||
use tokio::{sync::Notify, task::JoinHandle};
|
use tokio::{sync::Notify, task::JoinHandle};
|
||||||
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
|
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
|
@ -122,7 +127,7 @@ struct PostgresJob {
|
||||||
backoff_multiplier: i32,
|
backoff_multiplier: i32,
|
||||||
backoff: BackoffStrategy,
|
backoff: BackoffStrategy,
|
||||||
next_queue: PrimitiveDateTime,
|
next_queue: PrimitiveDateTime,
|
||||||
timeout: i32,
|
heartbeat_interval: PgInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<JobInfo> for PostgresJob {
|
impl From<JobInfo> for PostgresJob {
|
||||||
|
@ -136,9 +141,11 @@ impl From<JobInfo> for PostgresJob {
|
||||||
max_retries,
|
max_retries,
|
||||||
backoff_strategy,
|
backoff_strategy,
|
||||||
next_queue,
|
next_queue,
|
||||||
timeout,
|
heartbeat_interval,
|
||||||
} = value;
|
} = value;
|
||||||
|
|
||||||
|
let next_queue = next_queue.to_offset(time::UtcOffset::UTC);
|
||||||
|
|
||||||
PostgresJob {
|
PostgresJob {
|
||||||
id,
|
id,
|
||||||
name,
|
name,
|
||||||
|
@ -162,7 +169,7 @@ impl From<JobInfo> for PostgresJob {
|
||||||
Backoff::Exponential(_) => BackoffStrategy::Exponential,
|
Backoff::Exponential(_) => BackoffStrategy::Exponential,
|
||||||
},
|
},
|
||||||
next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()),
|
next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()),
|
||||||
timeout: timeout as _,
|
heartbeat_interval: (heartbeat_interval as i32).milliseconds(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,7 +187,7 @@ impl From<PostgresJob> for JobInfo {
|
||||||
backoff_multiplier,
|
backoff_multiplier,
|
||||||
backoff,
|
backoff,
|
||||||
next_queue,
|
next_queue,
|
||||||
timeout,
|
heartbeat_interval,
|
||||||
} = value;
|
} = value;
|
||||||
|
|
||||||
JobInfo {
|
JobInfo {
|
||||||
|
@ -198,7 +205,7 @@ impl From<PostgresJob> for JobInfo {
|
||||||
BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _),
|
BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _),
|
||||||
},
|
},
|
||||||
next_queue: next_queue.assume_utc(),
|
next_queue: next_queue.assume_utc(),
|
||||||
timeout: timeout as _,
|
heartbeat_interval: (heartbeat_interval.microseconds / 1_000) as _,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,42 +240,134 @@ impl background_jobs_core::Storage for Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
|
async fn push(&self, job: NewJobInfo) -> Result<Uuid, Self::Error> {
|
||||||
let postgres_job: PostgresJob = job.build().into();
|
self.insert(job.build()).await
|
||||||
let id = postgres_job.id;
|
}
|
||||||
|
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
async fn pop(&self, in_queue: &str, in_runner_id: Uuid) -> Result<JobInfo, Self::Error> {
|
||||||
|
loop {
|
||||||
|
tracing::trace!("pop: looping");
|
||||||
|
|
||||||
{
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
use schema::job_queue::dsl::*;
|
|
||||||
|
|
||||||
postgres_job
|
let notifier: Arc<Notify> = self
|
||||||
.insert_into(job_queue)
|
.inner
|
||||||
|
.queue_notifications
|
||||||
|
.entry(String::from(in_queue))
|
||||||
|
.or_insert_with(|| Arc::new(Notify::const_new()))
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
diesel::sql_query("LISTEN queue_status_channel;")
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
|
||||||
|
let count = {
|
||||||
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
|
diesel::update(job_queue)
|
||||||
|
.filter(heartbeat.is_not_null().and(heartbeat.assume_not_null().le(
|
||||||
|
// not allowed to multiply heartbeat_interval. thanks diesel
|
||||||
|
diesel::dsl::sql::<Timestamp>("NOW() - heartbeat_interval * 5"),
|
||||||
|
)))
|
||||||
|
.set((
|
||||||
|
heartbeat.eq(Option::<PrimitiveDateTime>::None),
|
||||||
|
status.eq(JobStatus::New),
|
||||||
|
runner_id.eq(Option::<Uuid>::None),
|
||||||
|
))
|
||||||
|
.execute(&mut conn)
|
||||||
|
.await
|
||||||
|
.map_err(PostgresError::Diesel)?
|
||||||
|
};
|
||||||
|
|
||||||
|
if count > 0 {
|
||||||
|
tracing::info!("Reset {count} jobs");
|
||||||
|
}
|
||||||
|
|
||||||
|
let id_query = {
|
||||||
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
|
let queue_alias = diesel::alias!(schema::job_queue as queue_alias);
|
||||||
|
|
||||||
|
queue_alias
|
||||||
|
.select(queue_alias.field(id))
|
||||||
|
.filter(
|
||||||
|
queue_alias
|
||||||
|
.field(status)
|
||||||
|
.eq(JobStatus::New)
|
||||||
|
.and(queue_alias.field(queue).eq(in_queue))
|
||||||
|
.and(queue_alias.field(next_queue).le(diesel::dsl::now)),
|
||||||
|
)
|
||||||
|
.order(queue_alias.field(next_queue))
|
||||||
|
.for_update()
|
||||||
|
.skip_locked()
|
||||||
|
.single_value()
|
||||||
|
};
|
||||||
|
|
||||||
|
let opt = {
|
||||||
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
|
diesel::update(job_queue)
|
||||||
|
.filter(id.nullable().eq(id_query))
|
||||||
|
.filter(status.eq(JobStatus::New))
|
||||||
|
.set((
|
||||||
|
heartbeat.eq(diesel::dsl::now),
|
||||||
|
status.eq(JobStatus::Running),
|
||||||
|
runner_id.eq(in_runner_id),
|
||||||
|
))
|
||||||
|
.returning(PostgresJob::as_returning())
|
||||||
|
.get_result(&mut conn)
|
||||||
|
.await
|
||||||
|
.optional()
|
||||||
|
.map_err(PostgresError::Diesel)?
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(postgres_job) = opt {
|
||||||
|
return Ok(postgres_job.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let next_queue = {
|
||||||
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
|
job_queue
|
||||||
|
.filter(queue.eq(in_queue).and(status.eq(JobStatus::New)))
|
||||||
|
.select(diesel::dsl::sql::<Interval>("NOW() - next_queue"))
|
||||||
|
.get_result::<PgInterval>(&mut conn)
|
||||||
|
.await
|
||||||
|
.optional()
|
||||||
|
.map_err(PostgresError::Diesel)?
|
||||||
|
};
|
||||||
|
|
||||||
|
let sleep_duration = next_queue
|
||||||
|
.map(|interval| {
|
||||||
|
if interval.microseconds < 0 {
|
||||||
|
Duration::from_micros(interval.microseconds.abs_diff(0))
|
||||||
|
} else {
|
||||||
|
Duration::from_secs(0)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or(Duration::from_secs(5));
|
||||||
|
|
||||||
|
drop(conn);
|
||||||
|
if tokio::time::timeout(sleep_duration, notifier.notified())
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
tracing::debug!("Notified");
|
||||||
|
} else {
|
||||||
|
tracing::debug!("Timed out");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Self::Error> {
|
|
||||||
todo!()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> {
|
async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
|
|
||||||
let now = to_primitive(OffsetDateTime::now_utc());
|
|
||||||
|
|
||||||
{
|
{
|
||||||
use schema::job_queue::dsl::*;
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
diesel::update(job_queue)
|
diesel::update(job_queue)
|
||||||
.filter(id.eq(job_id))
|
.filter(id.eq(job_id))
|
||||||
.set((
|
.set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id)))
|
||||||
heartbeat.eq(PrimitiveDateTime::new(now.date(), now.time())),
|
|
||||||
runner_id.eq(in_runner_id),
|
|
||||||
))
|
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
@ -278,13 +377,45 @@ impl background_jobs_core::Storage for Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error> {
|
async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<bool, Self::Error> {
|
||||||
todo!()
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_primitive(timestamp: OffsetDateTime) -> PrimitiveDateTime {
|
let job = {
|
||||||
let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
|
use schema::job_queue::dsl::*;
|
||||||
PrimitiveDateTime::new(timestamp.date(), timestamp.time())
|
|
||||||
|
diesel::delete(job_queue)
|
||||||
|
.filter(id.eq(return_job_info.id))
|
||||||
|
.returning(PostgresJob::as_returning())
|
||||||
|
.get_result(&mut conn)
|
||||||
|
.await
|
||||||
|
.optional()
|
||||||
|
.map_err(PostgresError::Diesel)?
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut job: JobInfo = if let Some(job) = job {
|
||||||
|
job.into()
|
||||||
|
} else {
|
||||||
|
return Ok(true);
|
||||||
|
};
|
||||||
|
|
||||||
|
match return_job_info.result {
|
||||||
|
// successful jobs are removed
|
||||||
|
JobResult::Success => Ok(true),
|
||||||
|
// Unregistered or Unexecuted jobs are restored as-is
|
||||||
|
JobResult::Unexecuted | JobResult::Unregistered => {
|
||||||
|
self.insert(job).await?;
|
||||||
|
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
// retryable failed jobs are restored
|
||||||
|
JobResult::Failure if job.prepare_retry() => {
|
||||||
|
self.insert(job).await?;
|
||||||
|
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
// dead jobs are removed
|
||||||
|
JobResult::Failure => Ok(true),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Storage {
|
impl Storage {
|
||||||
|
@ -348,6 +479,25 @@ impl Storage {
|
||||||
|
|
||||||
Ok(Storage { inner, drop_handle })
|
Ok(Storage { inner, drop_handle })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn insert(&self, job_info: JobInfo) -> Result<Uuid, PostgresError> {
|
||||||
|
let postgres_job: PostgresJob = job_info.into();
|
||||||
|
let id = postgres_job.id;
|
||||||
|
|
||||||
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
|
|
||||||
|
{
|
||||||
|
use schema::job_queue::dsl::*;
|
||||||
|
|
||||||
|
postgres_job
|
||||||
|
.insert_into(job_queue)
|
||||||
|
.execute(&mut conn)
|
||||||
|
.await
|
||||||
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> JobNotifierState<'a> {
|
impl<'a> JobNotifierState<'a> {
|
||||||
|
|
|
@ -21,7 +21,10 @@ pub(crate) fn migration() -> String {
|
||||||
t.add_column("backoff_multiplier", types::integer().nullable(false));
|
t.add_column("backoff_multiplier", types::integer().nullable(false));
|
||||||
t.add_column("backoff", types::custom("backoff_strategy").nullable(false));
|
t.add_column("backoff", types::custom("backoff_strategy").nullable(false));
|
||||||
t.add_column("next_queue", types::datetime().nullable(false));
|
t.add_column("next_queue", types::datetime().nullable(false));
|
||||||
t.add_column("timeout", types::integer().nullable(false));
|
t.add_column(
|
||||||
|
"heartbeat_interval",
|
||||||
|
types::custom("INTERVAL").nullable(false),
|
||||||
|
);
|
||||||
t.add_column(
|
t.add_column(
|
||||||
"runner_id",
|
"runner_id",
|
||||||
types::uuid().nullable(true).indexed(false).unique(false),
|
types::uuid().nullable(true).indexed(false).unique(false),
|
||||||
|
|
|
@ -31,7 +31,7 @@ diesel::table! {
|
||||||
backoff_multiplier -> Int4,
|
backoff_multiplier -> Int4,
|
||||||
backoff -> BackoffStrategy,
|
backoff -> BackoffStrategy,
|
||||||
next_queue -> Timestamp,
|
next_queue -> Timestamp,
|
||||||
timeout -> Int4,
|
heartbeat_interval -> Interval,
|
||||||
runner_id -> Nullable<Uuid>,
|
runner_id -> Nullable<Uuid>,
|
||||||
status -> JobStatus,
|
status -> JobStatus,
|
||||||
heartbeat -> Nullable<Timestamp>,
|
heartbeat -> Nullable<Timestamp>,
|
||||||
|
|
|
@ -133,22 +133,20 @@ impl background_jobs_core::Storage for Storage {
|
||||||
};
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
JobResult::Success => {
|
// successful jobs are removed
|
||||||
// ok
|
JobResult::Success => Ok(true),
|
||||||
Ok(true)
|
// Unregistered or Unexecuted jobs are restored as-is
|
||||||
}
|
|
||||||
JobResult::Unexecuted | JobResult::Unregistered => {
|
JobResult::Unexecuted | JobResult::Unregistered => {
|
||||||
// TODO: handle
|
self.insert(job)?;
|
||||||
Ok(true)
|
Ok(false)
|
||||||
}
|
}
|
||||||
JobResult::Failure => {
|
// retryable failed jobs are restored
|
||||||
if job.prepare_retry() {
|
JobResult::Failure if job.prepare_retry() => {
|
||||||
self.insert(job)?;
|
self.insert(job)?;
|
||||||
Ok(false)
|
Ok(false)
|
||||||
} else {
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// dead jobs are removed
|
||||||
|
JobResult::Failure => Ok(true),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue