diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..bff29e6 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/.gitignore b/.gitignore index 731ab4a..d8dc7f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target +/examples/postgres-example/storage **/*/target **/*.rs.bk Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 23f0105..1cfa13c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,14 @@ members = [ "jobs-actix", "jobs-core", "jobs-metrics", + "jobs-postgres", "jobs-sled", "examples/basic-example", "examples/long-example", "examples/managed-example", "examples/metrics-example", "examples/panic-example", + "examples/postgres-example", ] [features] @@ -45,3 +47,8 @@ optional = true version = "0.17.0" path = "jobs-metrics" optional = true + +[dependencies.background-jobs-postgres] +version = "0.17.0" +path = "jobs-postgres" +optional = true diff --git a/examples/postgres-example/Cargo.toml b/examples/postgres-example/Cargo.toml new file mode 100644 index 0000000..5c43086 --- /dev/null +++ b/examples/postgres-example/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "postgres-example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." } +tokio = { version = "1.35.1", features = ["full"] } diff --git a/examples/postgres-example/docker-compose.yml b/examples/postgres-example/docker-compose.yml new file mode 100644 index 0000000..6f200bf --- /dev/null +++ b/examples/postgres-example/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3.3' + +services: + postgres: + image: postgres:15-alpine + ports: + - "5432:5432" + environment: + - PGDATA=/var/lib/postgresql/data + - POSTGRES_DB=db + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - ./storage/postgres:/var/lib/postgresql/data diff --git a/examples/postgres-example/src/main.rs b/examples/postgres-example/src/main.rs new file mode 100644 index 0000000..16877e6 --- /dev/null +++ b/examples/postgres-example/src/main.rs @@ -0,0 +1,9 @@ +use background_jobs::postgres::Storage; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let storage = + Storage::connect("postgres://postgres:postgres@localhost:5432/db".parse()?).await?; + println!("Hello, world!"); + Ok(()) +} diff --git a/flake.nix b/flake.nix index 83f55b2..00c6c08 100644 --- a/flake.nix +++ b/flake.nix @@ -17,7 +17,17 @@ packages.default = pkgs.hello; devShell = with pkgs; mkShell { - nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ]; + nativeBuildInputs = [ + cargo + cargo-outdated + clippy + diesel-cli + rust-analyzer + rustc + rustfmt + stdenv.cc + taplo + ]; RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}"; }; diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index a973c99..440ac58 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -82,7 +82,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// /// Defaults to 15 seconds /// Jobs can override - const TIMEOUT: i64 = 15_000; + const TIMEOUT: u64 = 15_000; /// Users of this library must define what it means to run a job. /// @@ -123,7 +123,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> i64 { + fn timeout(&self) -> u64 { Self::TIMEOUT } } diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index acb3a1b..7f55928 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -60,7 +60,7 @@ pub struct NewJobInfo { /// Milliseconds from execution until the job is considered dead /// /// This is important for storage implementations to reap unfinished jobs - timeout: i64, + timeout: u64, } impl NewJobInfo { @@ -73,7 +73,7 @@ impl NewJobInfo { queue: String, max_retries: MaxRetries, backoff_strategy: Backoff, - timeout: i64, + timeout: u64, args: Value, ) -> Self { NewJobInfo { @@ -113,7 +113,6 @@ impl NewJobInfo { max_retries: self.max_retries, next_queue: self.next_queue.unwrap_or(OffsetDateTime::now_utc()), backoff_strategy: self.backoff_strategy, - updated_at: OffsetDateTime::now_utc(), timeout: self.timeout, } } @@ -157,13 +156,10 @@ pub struct JobInfo { /// The time this job should be dequeued pub next_queue: OffsetDateTime, - /// The time this job was last updated - pub updated_at: OffsetDateTime, - /// Milliseconds from execution until the job is considered dead /// /// This is important for storage implementations to reap unfinished jobs - pub timeout: i64, + pub timeout: u64, } impl JobInfo { @@ -182,7 +178,6 @@ impl JobInfo { // Increment the retry-count and determine if the job should be requeued fn increment(&mut self) -> ShouldStop { - self.updated_at = OffsetDateTime::now_utc(); self.retry_count += 1; self.max_retries.compare(self.retry_count) } diff --git a/jobs-core/src/unsend_job.rs b/jobs-core/src/unsend_job.rs index 1eb63e4..6e47b16 100644 --- a/jobs-core/src/unsend_job.rs +++ b/jobs-core/src/unsend_job.rs @@ -84,7 +84,7 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static { /// /// Defaults to 15 seconds /// Jobs can override - const TIMEOUT: i64 = 15_000; + const TIMEOUT: u64 = 15_000; /// Users of this library must define what it means to run a job. /// @@ -125,7 +125,7 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static { /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> i64 { + fn timeout(&self) -> u64 { Self::TIMEOUT } } @@ -156,7 +156,7 @@ where const QUEUE: &'static str = ::QUEUE; const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; const BACKOFF: Backoff = ::BACKOFF; - const TIMEOUT: i64 = ::TIMEOUT; + const TIMEOUT: u64 = ::TIMEOUT; fn run(self, state: Self::State) -> Self::Future { UnwrapFuture(T::Spawner::spawn( @@ -180,7 +180,7 @@ where UnsendJob::backoff_strategy(self) } - fn timeout(&self) -> i64 { + fn timeout(&self) -> u64 { UnsendJob::timeout(self) } } diff --git a/jobs-postgres/Cargo.toml b/jobs-postgres/Cargo.toml new file mode 100644 index 0000000..de2bf74 --- /dev/null +++ b/jobs-postgres/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "background-jobs-postgres" +version = "0.17.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.24" +background-jobs-core = { version = "0.17.0-beta.1", path = "../jobs-core" } +barrel = { version = "0.7.0", features = ["pg"] } +dashmap = "5.5.3" +deadpool = { version = "0.9", features = ["rt_tokio_1"] } +diesel = { version = "2.1.4", features = ["postgres_backend", "serde_json", "time", "uuid"] } +diesel-async = { version = "0.4.1", default-features = false, features = ["deadpool", "postgres"] } +diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } +flume = "0.11.0" +futures-core = "0.3.30" +metrics = "0.22.0" +refinery = { version = "0.8.11", features = ["postgres", "tokio-postgres"] } +serde_json = "1.0.111" +time = "0.3.31" +tokio = { version = "1.35.1", default-features = false, features = ["rt", "tracing"] } +tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] } +tracing = "0.1.40" +url = "2.5.0" +uuid = { version = "1.6.1", features = ["v7"] } diff --git a/jobs-postgres/src/embedded.rs b/jobs-postgres/src/embedded.rs new file mode 100644 index 0000000..e72e32d --- /dev/null +++ b/jobs-postgres/src/embedded.rs @@ -0,0 +1,3 @@ +use refinery::embed_migrations; + +embed_migrations!("./src/migrations"); diff --git a/jobs-postgres/src/lib.rs b/jobs-postgres/src/lib.rs new file mode 100644 index 0000000..35592ba --- /dev/null +++ b/jobs-postgres/src/lib.rs @@ -0,0 +1,567 @@ +mod embedded; +mod schema; + +use std::{ + collections::{BTreeSet, VecDeque}, + error::Error, + future::Future, + ops::Deref, + sync::Arc, + time::Duration, +}; + +use background_jobs_core::{Backoff, JobInfo, MaxRetries, NewJobInfo, ReturnJobInfo}; +use dashmap::DashMap; +use diesel::prelude::*; +use diesel_async::{ + pooled_connection::{ + deadpool::{BuildError, Hook, Pool, PoolError}, + AsyncDieselConnectionManager, ManagerConfig, + }, + AsyncConnection, AsyncPgConnection, RunQueryDsl, +}; +use futures_core::future::BoxFuture; +use serde_json::Value; +use time::{OffsetDateTime, PrimitiveDateTime}; +use tokio::{sync::Notify, task::JoinHandle}; +use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket}; +use tracing::Instrument; +use url::Url; +use uuid::Uuid; + +type ConfigFn = + Box BoxFuture<'_, ConnectionResult> + Send + Sync + 'static>; + +#[derive(Clone)] +pub struct Storage { + inner: Arc, + #[allow(dead_code)] + drop_handle: Arc>, +} + +struct Inner { + pool: Pool, + queue_notifications: DashMap>, +} + +struct DropHandle { + handle: JoinHandle, +} + +fn spawn(name: &str, future: F) -> DropHandle +where + F::Output: Send, +{ + DropHandle { + handle: tokio::task::Builder::new() + .name(name) + .spawn(future) + .expect("Spawned task"), + } +} + +#[derive(Debug)] +pub enum ConnectPostgresError { + /// Error connecting to postgres to run migrations + ConnectForMigration(tokio_postgres::Error), + + /// Error running migrations + Migration(refinery::Error), + + /// Error constructing the connection pool + BuildPool(BuildError), +} + +#[derive(Debug)] +pub enum PostgresError { + Pool(PoolError), + + Diesel(diesel::result::Error), + + DbTimeout, +} + +struct JobNotifierState<'a> { + inner: &'a Inner, + capacity: usize, + jobs: BTreeSet, + jobs_ordered: VecDeque, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, diesel_derive_enum::DbEnum)] +#[ExistingTypePath = "crate::schema::sql_types::JobStatus"] +enum JobStatus { + New, + Running, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, diesel_derive_enum::DbEnum)] +#[ExistingTypePath = "crate::schema::sql_types::BackoffStrategy"] +enum BackoffStrategy { + Linear, + Exponential, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, diesel_derive_enum::DbEnum)] +#[ExistingTypePath = "crate::schema::sql_types::RetryStrategy"] +enum RetryStrategy { + Infinite, + Count, +} + +#[derive(diesel::Insertable, diesel::Queryable, diesel::Selectable)] +#[diesel(table_name = crate::schema::job_queue)] +struct PostgresJob { + id: Uuid, + name: String, + queue: String, + args: Value, + retry_count: i32, + max_retries: i32, + retry: RetryStrategy, + backoff_multiplier: i32, + backoff: BackoffStrategy, + next_queue: PrimitiveDateTime, + timeout: i32, +} + +impl From for PostgresJob { + fn from(value: JobInfo) -> Self { + let JobInfo { + id, + name, + queue, + args, + retry_count, + max_retries, + backoff_strategy, + next_queue, + timeout, + } = value; + + PostgresJob { + id, + name, + queue, + args, + retry_count: retry_count as _, + max_retries: match max_retries { + MaxRetries::Count(count) => count as _, + MaxRetries::Infinite => 0, + }, + retry: match max_retries { + MaxRetries::Infinite => RetryStrategy::Infinite, + MaxRetries::Count(_) => RetryStrategy::Count, + }, + backoff_multiplier: match backoff_strategy { + Backoff::Linear(multiplier) => multiplier as _, + Backoff::Exponential(multiplier) => multiplier as _, + }, + backoff: match backoff_strategy { + Backoff::Linear(_) => BackoffStrategy::Linear, + Backoff::Exponential(_) => BackoffStrategy::Exponential, + }, + next_queue: PrimitiveDateTime::new(next_queue.date(), next_queue.time()), + timeout: timeout as _, + } + } +} + +impl From for JobInfo { + fn from(value: PostgresJob) -> Self { + let PostgresJob { + id, + name, + queue, + args, + retry_count, + max_retries, + retry, + backoff_multiplier, + backoff, + next_queue, + timeout, + } = value; + + JobInfo { + id, + name, + queue, + args, + retry_count: retry_count as _, + max_retries: match retry { + RetryStrategy::Count => MaxRetries::Count(max_retries as _), + RetryStrategy::Infinite => MaxRetries::Infinite, + }, + backoff_strategy: match backoff { + BackoffStrategy::Linear => Backoff::Linear(backoff_multiplier as _), + BackoffStrategy::Exponential => Backoff::Exponential(backoff_multiplier as _), + }, + next_queue: next_queue.assume_utc(), + timeout: timeout as _, + } + } +} + +#[async_trait::async_trait] +impl background_jobs_core::Storage for Storage { + type Error = PostgresError; + + async fn info( + &self, + job_id: Uuid, + ) -> Result, Self::Error> { + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let opt = { + use schema::job_queue::dsl::*; + + job_queue + .select(PostgresJob::as_select()) + .filter(id.eq(job_id)) + .get_result(&mut conn) + .await + .optional() + .map_err(PostgresError::Diesel)? + }; + + if let Some(postgres_job) = opt { + Ok(Some(postgres_job.into())) + } else { + Ok(None) + } + } + + async fn push(&self, job: NewJobInfo) -> Result { + let postgres_job: PostgresJob = job.build().into(); + let id = postgres_job.id; + + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + { + use schema::job_queue::dsl::*; + + postgres_job + .insert_into(job_queue) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + } + + Ok(id) + } + + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { + todo!() + } + + async fn heartbeat(&self, job_id: Uuid, in_runner_id: Uuid) -> Result<(), Self::Error> { + let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?; + + let now = to_primitive(OffsetDateTime::now_utc()); + + { + use schema::job_queue::dsl::*; + + diesel::update(job_queue) + .filter(id.eq(job_id)) + .set(( + heartbeat.eq(PrimitiveDateTime::new(now.date(), now.time())), + runner_id.eq(in_runner_id), + )) + .execute(&mut conn) + .await + .map_err(PostgresError::Diesel)?; + } + + Ok(()) + } + + async fn complete(&self, return_job_info: ReturnJobInfo) -> Result { + todo!() + } +} + +fn to_primitive(timestamp: OffsetDateTime) -> PrimitiveDateTime { + let timestamp = timestamp.to_offset(time::UtcOffset::UTC); + PrimitiveDateTime::new(timestamp.date(), timestamp.time()) +} + +impl Storage { + pub async fn connect(postgres_url: Url) -> Result { + let (mut client, conn) = tokio_postgres::connect(postgres_url.as_str(), NoTls) + .await + .map_err(ConnectPostgresError::ConnectForMigration)?; + + let handle = spawn("postgres-migrations", conn); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .map_err(ConnectPostgresError::Migration)?; + + handle.abort(); + let _ = handle.await; + + let parallelism = std::thread::available_parallelism() + .map(|u| u.into()) + .unwrap_or(1_usize); + + let (tx, rx) = flume::bounded(10); + + let mut config = ManagerConfig::default(); + config.custom_setup = build_handler(tx); + + let mgr = AsyncDieselConnectionManager::::new_with_config( + postgres_url, + config, + ); + + let pool = Pool::builder(mgr) + .runtime(deadpool::Runtime::Tokio1) + .wait_timeout(Some(Duration::from_secs(10))) + .create_timeout(Some(Duration::from_secs(2))) + .recycle_timeout(Some(Duration::from_secs(2))) + .post_create(Hook::sync_fn(|_, _| { + metrics::counter!("background-jobs.postgres.pool.connection.create").increment(1); + Ok(()) + })) + .post_recycle(Hook::sync_fn(|_, _| { + metrics::counter!("background-jobs.postgres.pool.connection.recycle").increment(1); + Ok(()) + })) + .max_size(parallelism * 8) + .build() + .map_err(ConnectPostgresError::BuildPool)?; + + let inner = Arc::new(Inner { + pool, + queue_notifications: DashMap::new(), + }); + + let handle = spawn( + "postgres-delegate-notifications", + delegate_notifications(rx, inner.clone(), parallelism * 8), + ); + + let drop_handle = Arc::new(handle); + + Ok(Storage { inner, drop_handle }) + } +} + +impl<'a> JobNotifierState<'a> { + fn handle(&mut self, payload: &str) { + let Some((job_id, queue_name)) = payload.split_once(' ') else { + tracing::warn!("Invalid queue payload {payload}"); + return; + }; + + let Ok(job_id) = job_id.parse::() else { + tracing::warn!("Invalid job ID {job_id}"); + return; + }; + + if !self.jobs.insert(job_id) { + // duplicate job + return; + } + + self.jobs_ordered.push_back(job_id); + + if self.jobs_ordered.len() > self.capacity { + if let Some(job_id) = self.jobs_ordered.pop_front() { + self.jobs.remove(&job_id); + } + } + + self.inner + .queue_notifications + .entry(queue_name.to_string()) + .or_insert_with(|| Arc::new(Notify::const_new())) + .notify_one(); + + metrics::counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string()).increment(1); + } +} + +async fn delegate_notifications( + receiver: flume::Receiver, + inner: Arc, + capacity: usize, +) { + let mut job_notifier_state = JobNotifierState { + inner: &inner, + capacity, + jobs: BTreeSet::new(), + jobs_ordered: VecDeque::new(), + }; + + while let Ok(notification) = receiver.recv_async().await { + tracing::trace!("delegate_notifications: looping"); + metrics::counter!("pict-rs.postgres.notification").increment(1); + + match notification.channel() { + "queue_status_channel" => { + // new job inserted for queue + job_notifier_state.handle(notification.payload()); + } + channel => { + tracing::info!( + "Unhandled postgres notification: {channel}: {}", + notification.payload() + ); + } + } + } + + tracing::warn!("Notification delegator shutting down"); +} + +fn build_handler(sender: flume::Sender) -> ConfigFn { + Box::new( + move |config: &str| -> BoxFuture<'_, ConnectionResult> { + let sender = sender.clone(); + + let connect_span = tracing::trace_span!(parent: None, "connect future"); + + Box::pin( + async move { + let (client, conn) = + tokio_postgres::connect(config, tokio_postgres::tls::NoTls) + .await + .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; + + // not very cash money (structured concurrency) of me + spawn_db_notification_task(sender, conn) + .map_err(|e| ConnectionError::BadConnection(e.to_string()))?; + + AsyncPgConnection::try_from(client).await + } + .instrument(connect_span), + ) + }, + ) +} + +fn spawn_db_notification_task( + sender: flume::Sender, + mut conn: Connection, +) -> std::io::Result<()> { + tokio::task::Builder::new().name("postgres-notifications").spawn(async move { + while let Some(res) = std::future::poll_fn(|cx| conn.poll_message(cx)).await { + tracing::trace!("db_notification_task: looping"); + + match res { + Err(e) => { + tracing::error!("Database Connection {e:?}"); + return; + } + Ok(AsyncMessage::Notice(e)) => { + tracing::warn!("Database Notice {e:?}"); + } + Ok(AsyncMessage::Notification(notification)) => { + if sender.send_async(notification).await.is_err() { + tracing::warn!("Missed notification. Are we shutting down?"); + } + } + Ok(_) => { + tracing::warn!("Unhandled AsyncMessage!!! Please contact the developer of this application"); + } + } + } + })?; + + Ok(()) +} + +impl Future for DropHandle { + type Output = as Future>::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + std::pin::Pin::new(&mut self.get_mut().handle).poll(cx) + } +} + +impl Drop for DropHandle { + fn drop(&mut self) { + self.handle.abort(); + } +} + +impl Deref for DropHandle { + type Target = JoinHandle; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + +impl std::fmt::Debug for Storage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Storage").finish() + } +} + +impl From for ConnectPostgresError { + fn from(value: refinery::Error) -> Self { + Self::Migration(value) + } +} + +impl From for ConnectPostgresError { + fn from(value: tokio_postgres::Error) -> Self { + Self::ConnectForMigration(value) + } +} + +impl From for ConnectPostgresError { + fn from(value: BuildError) -> Self { + Self::BuildPool(value) + } +} + +impl std::fmt::Display for ConnectPostgresError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BuildPool(_) => write!(f, "Failed to build postgres connection pool"), + Self::ConnectForMigration(_) => { + write!(f, "Failed to connect to postgres for migrations") + } + Self::Migration(_) => write!(f, "Failed to run migrations"), + } + } +} + +impl std::error::Error for ConnectPostgresError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::BuildPool(e) => Some(e), + Self::ConnectForMigration(e) => Some(e), + Self::Migration(e) => Some(e), + } + } +} + +impl std::fmt::Display for PostgresError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Pool(_) => write!(f, "Error in db pool"), + Self::Diesel(_) => write!(f, "Error in database"), + Self::DbTimeout => write!(f, "Timed out waiting for postgres"), + } + } +} + +impl Error for PostgresError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Pool(e) => Some(e), + Self::Diesel(e) => Some(e), + Self::DbTimeout => None, + } + } +} diff --git a/jobs-postgres/src/migrations/V000__enable_pgcrypto.rs b/jobs-postgres/src/migrations/V000__enable_pgcrypto.rs new file mode 100644 index 0000000..4c8bd57 --- /dev/null +++ b/jobs-postgres/src/migrations/V000__enable_pgcrypto.rs @@ -0,0 +1,11 @@ +use barrel::backend::Pg; +use barrel::functions::AutogenFunction; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.inject_custom("CREATE EXTENSION IF NOT EXISTS pgcrypto;"); + + m.make::().to_string() +} diff --git a/jobs-postgres/src/migrations/V001__create_queue.rs b/jobs-postgres/src/migrations/V001__create_queue.rs new file mode 100644 index 0000000..3f58181 --- /dev/null +++ b/jobs-postgres/src/migrations/V001__create_queue.rs @@ -0,0 +1,65 @@ +use barrel::backend::Pg; +use barrel::functions::AutogenFunction; +use barrel::{types, Migration}; + +pub(crate) fn migration() -> String { + let mut m = Migration::new(); + + m.inject_custom("CREATE TYPE job_status AS ENUM ('new', 'running');"); + m.inject_custom("CREATE TYPE retry_strategy AS ENUM ('infinite', 'count');"); + m.inject_custom("CREATE TYPE backoff_strategy AS ENUM ('linear', 'exponential');"); + + m.create_table("job_queue", |t| { + t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#); + + t.add_column("name", types::text().size(128).nullable(false)); + t.add_column("queue", types::text().size(128).nullable(false)); + t.add_column("args", types::custom("jsonb").nullable(false)); + t.add_column("retry_count", types::integer().nullable(false)); + t.add_column("max_retries", types::integer().nullable(false)); + t.add_column("retry", types::custom("retry_strategy").nullable(false)); + t.add_column("backoff_multiplier", types::integer().nullable(false)); + t.add_column("backoff", types::custom("backoff_strategy").nullable(false)); + t.add_column("next_queue", types::datetime().nullable(false)); + t.add_column("timeout", types::integer().nullable(false)); + t.add_column( + "runner_id", + types::uuid().nullable(true).indexed(false).unique(false), + ); + t.add_column( + "status", + types::custom("job_status").nullable(false).default("new"), + ); + t.add_column("heartbeat", types::datetime().nullable(true)); + + t.add_index("heartbeat_index", types::index(["heartbeat"])); + t.add_index("next_queue_index", types::index(["next_queue"])); + }); + + m.inject_custom( + r#" +CREATE OR REPLACE FUNCTION queue_status_notify() + RETURNS trigger AS +$$ +BEGIN + PERFORM pg_notify('queue_status_channel', NEW.id::text || ' ' || NEW.queue::text); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + "# + .trim(), + ); + + m.inject_custom( + r#" +CREATE TRIGGER queue_status + AFTER INSERT OR UPDATE OF status + ON job_queue + FOR EACH ROW +EXECUTE PROCEDURE queue_status_notify(); + "# + .trim(), + ); + + m.make::().to_string() +} diff --git a/jobs-postgres/src/schema.rs b/jobs-postgres/src/schema.rs new file mode 100644 index 0000000..ac9d2ab --- /dev/null +++ b/jobs-postgres/src/schema.rs @@ -0,0 +1,56 @@ +// @generated automatically by Diesel CLI. + +pub mod sql_types { + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "backoff_strategy"))] + pub struct BackoffStrategy; + + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "job_status"))] + pub struct JobStatus; + + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "retry_strategy"))] + pub struct RetryStrategy; +} + +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::RetryStrategy; + use super::sql_types::BackoffStrategy; + use super::sql_types::JobStatus; + + job_queue (id) { + id -> Uuid, + name -> Text, + queue -> Text, + args -> Jsonb, + retry_count -> Int4, + max_retries -> Int4, + retry -> RetryStrategy, + backoff_multiplier -> Int4, + backoff -> BackoffStrategy, + next_queue -> Timestamp, + timeout -> Int4, + runner_id -> Nullable, + status -> JobStatus, + heartbeat -> Nullable, + } +} + +diesel::table! { + refinery_schema_history (version) { + version -> Int4, + #[max_length = 255] + name -> Nullable, + #[max_length = 255] + applied_on -> Nullable, + #[max_length = 255] + checksum -> Nullable, + } +} + +diesel::allow_tables_to_appear_in_same_query!( + job_queue, + refinery_schema_history, +); diff --git a/scripts/update-schema.sh b/scripts/update-schema.sh new file mode 100755 index 0000000..ddff400 --- /dev/null +++ b/scripts/update-schema.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +diesel \ + --database-url 'postgres://postgres:postgres@localhost:5432/db' \ + print-schema \ + --custom-type-derives "diesel::query_builder::QueryId" \ + > jobs-postgres/src/schema.rs diff --git a/src/lib.rs b/src/lib.rs index f3ba79c..7f1624a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,3 +188,8 @@ pub mod memory_storage { #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{ActixSpawner, Manager, QueueHandle, WorkerConfig}; + +#[cfg(feature = "background-jobs-postgres")] +pub mod postgres { + pub use background_jobs_postgres::Storage; +}