From ae90774bb177d336fb0e55102e713e04f93ee712 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 10 Jan 2024 15:07:18 -0600 Subject: [PATCH] jobs-postgres: complete the implementation --- jobs-postgres/src/lib.rs | 218 +++++++++++++++--- .../src/migrations/V001__create_queue.rs | 5 +- jobs-postgres/src/schema.rs | 2 +- 3 files changed, 189 insertions(+), 36 deletions(-) diff --git a/jobs-postgres/src/lib.rs b/jobs-postgres/src/lib.rs index 35592ba..36962fd 100644 --- a/jobs-postgres/src/lib.rs +++ b/jobs-postgres/src/lib.rs @@ -10,19 +10,24 @@ use std::{ time::Duration, }; -use background_jobs_core::{Backoff, JobInfo, MaxRetries, NewJobInfo, ReturnJobInfo}; +use background_jobs_core::{Backoff, JobInfo, JobResult, MaxRetries, NewJobInfo, ReturnJobInfo}; use dashmap::DashMap; -use diesel::prelude::*; +use diesel::{ + data_types::PgInterval, + dsl::IntervalDsl, + prelude::*, + sql_types::{Interval, Timestamp}, +}; use diesel_async::{ pooled_connection::{ deadpool::{BuildError, Hook, Pool, PoolError}, AsyncDieselConnectionManager, ManagerConfig, }, - AsyncConnection, AsyncPgConnection, RunQueryDsl, + AsyncPgConnection, RunQueryDsl, }; use futures_core::future::BoxFuture; use serde_json::Value; -use time::{OffsetDateTime, PrimitiveDateTime}; +use time::PrimitiveDateTime; use tokio::{sync::Notify, task::JoinHandle}; use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket}; use tracing::Instrument; @@ -122,7 +127,7 @@ struct PostgresJob { backoff_multiplier: i32, backoff: BackoffStrategy, next_queue: PrimitiveDateTime, - timeout: i32, + heartbeat_interval: PgInterval, } impl From for PostgresJob { @@ -136,9 +141,11 @@ impl From for PostgresJob { max_retries, backoff_strategy, next_queue, - timeout, + heartbeat_interval, } = value; + let next_queue = next_queue.to_offset(time::UtcOffset::UTC); + PostgresJob { id, name, @@ -162,7 +169,7 @@ impl From for PostgresJob { Backoff::Exponential(_) => BackoffStrategy::Exponential, }, next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()), - timeout: timeout as _, + heartbeat_interval: (heartbeat_interval as i32).milliseconds(), } } } @@ -180,7 +187,7 @@ impl From for JobInfo { backoff_multiplier, backoff, next_queue, - timeout, + heartbeat_interval, } = value; JobInfo { @@ -198,7 +205,7 @@ impl From for JobInfo { BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _), }, next_queue: next_queue.assume_utc(), - timeout: timeout as _, + heartbeat_interval: (heartbeat_interval.microseconds / 1_000) as _, } } } @@ -233,42 +240,134 @@ impl background_jobs_core::Storage for Storage { } async fn push(&self, job: NewJobInfo) -> Result { - let postgres_job: PostgresJob = job.build().into(); - let id = postgres_job.id; + self.insert(job.build()).await + } - let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + async fn pop(&self, in_queue: &str, in_runner_id: Uuid) -> Result { + loop { + tracing::trace!("pop: looping"); - { - use schema::job_queue::dsl::*; + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; - postgres_job - .insert_into(job_queue) + let notifier: Arc = self + .inner + .queue_notifications + .entry(String::from(in_queue)) + .or_insert_with(|| Arc::new(Notify::const_new())) + .clone(); + + diesel::sql_query("LISTEN queue_status_channel;") .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; + + let count = { + use schema::job_queue::dsl::*; + + diesel::update(job_queue) + .filter(heartbeat.is_not_null().and(heartbeat.assume_not_null().le( + // not allowed to multiply heartbeat_interval. thanks diesel + diesel::dsl::sql::("NOW() - heartbeat_interval * 5"), + ))) + .set(( + heartbeat.eq(Option::::None), + status.eq(JobStatus::New), + runner_id.eq(Option::::None), + )) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)? + }; + + if count > 0 { + tracing::info!("Reset {count} jobs"); + } + + let id_query = { + use schema::job_queue::dsl::*; + + let queue_alias = diesel::alias!(schema::job_queue as queue_alias); + + queue_alias + .select(queue_alias.field(id)) + .filter( + queue_alias + .field(status) + .eq(JobStatus::New) + .and(queue_alias.field(queue).eq(in_queue)) + .and(queue_alias.field(next_queue).le(diesel::dsl::now)), + ) + .order(queue_alias.field(next_queue)) + .for_update() + .skip_locked() + .single_value() + }; + + let opt = { + use schema::job_queue::dsl::*; + + diesel::update(job_queue) + .filter(id.nullable().eq(id_query)) + .filter(status.eq(JobStatus::New)) + .set(( + heartbeat.eq(diesel::dsl::now), + status.eq(JobStatus::Running), + runner_id.eq(in_runner_id), + )) + .returning(PostgresJob::as_returning()) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + }; + + if let Some(postgres_job) = opt { + return Ok(postgres_job.into()); + } + + let next_queue = { + use schema::job_queue::dsl::*; + + job_queue + .filter(queue.eq(in_queue).and(status.eq(JobStatus::New))) + .select(diesel::dsl::sql::("NOW() - next_queue")) + .get_result::(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + }; + + let sleep_duration = next_queue + .map(|interval| { + if interval.microseconds < 0 { + Duration::from_micros(interval.microseconds.abs_diff(0)) + } else { + Duration::from_secs(0) + } + }) + .unwrap_or(Duration::from_secs(5)); + + drop(conn); + if tokio::time::timeout(sleep_duration, notifier.notified()) + .await + .is_ok() + { + tracing::debug!("Notified"); + } else { + tracing::debug!("Timed out"); + } } - - Ok(id) - } - - async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { - todo!() } async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> { let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; - let now = to_primitive(OffsetDateTime::now_utc()); - { use schema::job_queue::dsl::*; diesel::update(job_queue) .filter(id.eq(job_id)) - .set(( - heartbeat.eq(PrimitiveDateTime::new(now.date(), now.time())), - runner_id.eq(in_runner_id), - )) + .set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id))) .execute(&mut conn) .await .map_err(PostgresError::Diesel)?; @@ -278,13 +377,45 @@ impl background_jobs_core::Storage for Storage { } async fn complete(&self, return_job_info: ReturnJobInfo) -> Result { - todo!() - } -} + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; -fn to_primitive(timestamp: OffsetDateTime) -> PrimitiveDateTime { - let timestamp = timestamp.to_offset(time::UtcOffset::UTC); - PrimitiveDateTime::new(timestamp.date(), timestamp.time()) + let job = { + use schema::job_queue::dsl::*; + + diesel::delete(job_queue) + .filter(id.eq(return_job_info.id)) + .returning(PostgresJob::as_returning()) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + }; + + let mut job: JobInfo = if let Some(job) = job { + job.into() + } else { + return Ok(true); + }; + + match return_job_info.result { + // successful jobs are removed + JobResult::Success => Ok(true), + // Unregistered or Unexecuted jobs are restored as-is + JobResult::Unexecuted | JobResult::Unregistered => { + self.insert(job).await?; + + Ok(false) + } + // retryable failed jobs are restored + JobResult::Failure if job.prepare_retry() => { + self.insert(job).await?; + + Ok(false) + } + // dead jobs are removed + JobResult::Failure => Ok(true), + } + } } impl Storage { @@ -348,6 +479,25 @@ impl Storage { Ok(Storage { inner, drop_handle }) } + + async fn insert(&self, job_info: JobInfo) -> Result { + let postgres_job: PostgresJob = job_info.into(); + let id = postgres_job.id; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + { + use schema::job_queue::dsl::*; + + postgres_job + .insert_into(job_queue) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + } + + Ok(id) + } } impl<'a> JobNotifierState<'a> { diff --git a/jobs-postgres/src/migrations/V001__create_queue.rs b/jobs-postgres/src/migrations/V001__create_queue.rs index 3f58181..9149c62 100644 --- a/jobs-postgres/src/migrations/V001__create_queue.rs +++ b/jobs-postgres/src/migrations/V001__create_queue.rs @@ -21,7 +21,10 @@ pub(crate) fn migration() -> String { t.add_column("backoff_multiplier", types::integer().nullable(false)); t.add_column("backoff", types::custom("backoff_strategy").nullable(false)); t.add_column("next_queue", types::datetime().nullable(false)); - t.add_column("timeout", types::integer().nullable(false)); + t.add_column( + "heartbeat_interval", + types::custom("INTERVAL").nullable(false), + ); t.add_column( "runner_id", types::uuid().nullable(true).indexed(false).unique(false), diff --git a/jobs-postgres/src/schema.rs b/jobs-postgres/src/schema.rs index ac9d2ab..d788ed1 100644 --- a/jobs-postgres/src/schema.rs +++ b/jobs-postgres/src/schema.rs @@ -31,7 +31,7 @@ diesel::table! { backoff_multiplier -> Int4, backoff -> BackoffStrategy, next_queue -> Timestamp, - timeout -> Int4, + heartbeat_interval -> Interval, runner_id -> Nullable, status -> JobStatus, heartbeat -> Nullable,