From 6511e7f32e8e9cb8ad89a42179b9e56bff986cba Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 22 Mar 2020 16:18:36 -0500 Subject: [PATCH] Use postgres for job storage --- Cargo.lock | 24 +-- Cargo.toml | 6 +- Dockerfile.migrations.arm64v8 | 5 + build.sh | 14 +- .../2020-03-22-194453_create-jobs/down.sql | 3 + .../2020-03-22-194453_create-jobs/up.sql | 17 ++ src/db.rs | 4 + src/jobs/mod.rs | 10 +- src/jobs/storage.rs | 165 ++++++++++++++++++ src/main.rs | 4 +- src/schema.rs | 16 ++ 11 files changed, 249 insertions(+), 19 deletions(-) create mode 100644 Dockerfile.migrations.arm64v8 create mode 100644 migrations/2020-03-22-194453_create-jobs/down.sql create mode 100644 migrations/2020-03-22-194453_create-jobs/up.sql create mode 100644 src/jobs/storage.rs diff --git a/Cargo.lock b/Cargo.lock index d2f6cbc..df83d63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,7 +455,7 @@ dependencies = [ [[package]] name = "background-jobs" version = "0.8.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -464,7 +464,7 @@ dependencies = [ [[package]] name = "background-jobs-actix" version = "0.7.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" dependencies = [ "actix", "actix-rt", @@ -479,12 +479,13 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "uuid", ] [[package]] name = "background-jobs-core" version = "0.7.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#799391811c795f0918e0df2b1a1b5fc44f55f89f" dependencies = [ "anyhow", "async-trait", @@ -494,6 +495,7 @@ dependencies = [ "serde 1.0.105", "serde_json", "thiserror", + "uuid", ] [[package]] @@ -1664,8 +1666,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e634590e8812c500088d88db721195979223dabb05149f43cb50931d0ff5865d" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", + "serde 1.0.105", + "serde_json", + "uuid", ] [[package]] @@ -1712,14 +1718,9 @@ dependencies = [ [[package]] name = "proc-macro-hack" -version = "0.5.12" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f918f2b601f93baa836c1c2945faef682ba5b6d4828ecb45eeb7cc3c71b811b4" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] +checksum = "fcfdefadc3d57ca21cf17990a28ef4c0f7c61383a28cb7604cf4a18e6ede1420" [[package]] name = "proc-macro-nested" @@ -1826,7 +1827,9 @@ dependencies = [ "actix-web", "actix-webfinger", "anyhow", + "async-trait", "background-jobs", + "background-jobs-core", "base64 0.12.0", "bb8-postgres", "config", @@ -2640,6 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" dependencies = [ "rand", + "serde 1.0.105", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9ca7d6e..1b0de02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,11 @@ actix-rt = "1.0.0" actix-web = { version = "3.0.0-alpha.1", features = ["rustls"] } actix-webfinger = "0.3.0-alpha.3" activitystreams = "0.5.0-alpha.11" +async-trait = "0.1.24" background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } +background-jobs-core = { version = "0.7.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" } base64 = "0.12" -bb8-postgres = "0.4.0" +bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] } config = "0.10.1" dotenv = "0.15.0" env_logger = "0.7.1" @@ -43,7 +45,7 @@ structopt = "0.3.12" thiserror = "1.0" tokio = { version = "0.2.13", features = ["sync"] } ttl_cache = "0.5.1" -uuid = { version = "0.8", features = ["v4"] } +uuid = { version = "0.8", features = ["v4", "serde"] } [build-dependencies] anyhow = "1.0" diff --git a/Dockerfile.migrations.arm64v8 b/Dockerfile.migrations.arm64v8 new file mode 100644 index 0000000..7533fda --- /dev/null +++ b/Dockerfile.migrations.arm64v8 @@ -0,0 +1,5 @@ +FROM asonix/diesel-cli:v1.4.0-r0-arm64v8 + +COPY migrations /migrations + +CMD ["diesel", "migration", "run", "--migration-dir", "/migrations"] diff --git a/build.sh b/build.sh index 54c5945..9db2e56 100755 --- a/build.sh +++ b/build.sh @@ -32,6 +32,7 @@ cross build \ mkdir -p artifacts cp ./target/aarch64-unknown-linux-musl/release/relay artifacts/relay +cp -r ./migrations artifacts/migrations # from `sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes` docker build \ @@ -39,12 +40,23 @@ docker build \ --no-cache \ --build-arg BUILD_DATE="${BUILD_DATE}" \ --build-arg TAG="${TAG}" \ - -f "Dockerfile.arm64v8" \ + -f Dockerfile.arm64v8 \ -t "asonix/relay:${VERSION}-arm64v8" \ -t "asonix/relay:latest-arm64v8" \ -t "asonix/relay:latest" \ ./artifacts +docker build \ + --pull \ + --no-cache \ + --build-arg BUILD_DATE="${BUILD_DATE}" \ + --build-arg TAG="${TAG}" \ + -f Dockerfile.migrations.arm64v8 \ + -t "asonix/relay-migrations:${VERSION}-arm64v8" \ + -t "asonix/relay-migrations:latest-arm64v8" \ + -t "asonix/relay-migrations:latest" \ + ./artifacts + docker push "asonix/relay:${VERSION}-arm64v8" docker push "asonix/relay:latest-arm64v8" docker push "asonix/relay:latest" diff --git a/migrations/2020-03-22-194453_create-jobs/down.sql b/migrations/2020-03-22-194453_create-jobs/down.sql new file mode 100644 index 0000000..8774d0f --- /dev/null +++ b/migrations/2020-03-22-194453_create-jobs/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP INDEX jobs_queue_status_index; +DROP TABLE jobs; diff --git a/migrations/2020-03-22-194453_create-jobs/up.sql b/migrations/2020-03-22-194453_create-jobs/up.sql new file mode 100644 index 0000000..2504d28 --- /dev/null +++ b/migrations/2020-03-22-194453_create-jobs/up.sql @@ -0,0 +1,17 @@ +-- Your SQL goes here +CREATE TABLE jobs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID UNIQUE NOT NULL, + job_queue TEXT NOT NULL, + job_timeout BIGINT NOT NULL, + job_updated TIMESTAMP NOT NULL, + job_status TEXT NOT NULL, + job_value JSONB NOT NULL, + job_next_run TIMESTAMP, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX jobs_queue_status_index ON jobs(job_queue, job_status); + +SELECT diesel_manage_updated_at('jobs'); diff --git a/src/db.rs b/src/db.rs index 1c9dc99..6c663e5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -33,6 +33,10 @@ impl Db { Ok(Db { pool }) } + pub fn pool(&self) -> &Pool { + &self.pool + } + pub async fn remove_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> { let conn = self.pool.get().await?; diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 5ba38d2..12a9d33 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,17 +1,19 @@ mod deliver; mod deliver_many; +mod storage; pub use self::{deliver::Deliver, deliver_many::DeliverMany}; use crate::{ + db::Db, error::MyError, - jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor}, + jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor, storage::Storage}, requests::Requests, state::State, }; -use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; +use background_jobs::{Job, QueueHandle, WorkerConfig}; -pub fn create_server() -> JobServer { - JobServer::new(background_jobs::create_server(Storage::new())) +pub fn create_server(db: Db) -> JobServer { + JobServer::new(background_jobs::create_server(Storage::new(db))) } pub fn create_workers(state: State, job_server: JobServer) { diff --git a/src/jobs/storage.rs b/src/jobs/storage.rs new file mode 100644 index 0000000..61095a5 --- /dev/null +++ b/src/jobs/storage.rs @@ -0,0 +1,165 @@ +use crate::{db::Db, error::MyError}; +use background_jobs_core::{JobInfo, Stats}; +use log::debug; +use uuid::Uuid; + +#[derive(Clone)] +pub struct Storage { + db: Db, +} + +impl Storage { + pub fn new(db: Db) -> Self { + Storage { db } + } +} + +#[async_trait::async_trait] +impl background_jobs_core::Storage for Storage { + type Error = MyError; + + async fn generate_id(&self) -> Result { + // TODO: Ensure unique job id + Ok(Uuid::new_v4()) + } + + async fn save_job(&self, job: JobInfo) -> Result<(), MyError> { + let id = job.id(); + let queue = job.queue().to_owned(); + let timeout = job.timeout(); + let updated = job.updated_at().naive_utc(); + let status = job.status().to_string(); + let next_queue = job.next_queue().map(|q| q.naive_utc()); + let value = serde_json::to_value(job)?; + + let conn = self.db.pool().get().await?; + debug!("Inserting job {} status {} for queue {}", id, status, queue); + conn.execute( + "INSERT INTO jobs + (job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at) + VALUES + ($1::UUID, $2::TEXT, $3::BIGINT, $4::TIMESTAMP, $5::TEXT, $6::TIMESTAMP, $7::JSONB, 'now') + ON CONFLICT (job_id) + DO UPDATE SET + job_updated = $4::TIMESTAMP, + job_status = $5::TEXT, + job_next_run = $6::TIMESTAMP, + job_value = $7::JSONB;", + &[&id, &queue, &timeout, &updated, &status, &next_queue, &value], + ) + .await?; + + Ok(()) + } + + async fn fetch_job(&self, id: Uuid) -> Result, MyError> { + let conn = self.db.pool().get().await?; + debug!( + "SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]", + id + ); + let rows = conn + .query( + "SELECT job_value + FROM jobs + WHERE job_id = $1::UUID + LIMIT 1;", + &[&id], + ) + .await?; + + let row = if let Some(row) = rows.into_iter().next() { + row + } else { + return Ok(None); + }; + + let value = row.try_get(0)?; + + Ok(Some(serde_json::from_value(value)?)) + } + + async fn fetch_job_from_queue(&self, queue: &str) -> Result, MyError> { + let conn = self.db.pool().get().await?; + let row = conn + .query_opt( + "UPDATE jobs + SET + job_status = 'Running', + job_updated = 'now' + WHERE + job_id = ( + SELECT job_id + FROM jobs + WHERE + job_queue = $1::TEXT + AND + ( + job_next_run IS NULL + OR + job_next_run < now() + ) + AND + ( + job_status = 'Pending' + OR + ( + job_status = 'Running' + AND + NOW() > (INTERVAL '1 millisecond' * job_timeout + job_updated) + ) + ) + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING job_value;", + &[&queue], + ) + .await?; + + let row = if let Some(row) = row { + row + } else { + return Ok(None); + }; + + let value = row.try_get(0)?; + + let job: JobInfo = serde_json::from_value(value)?; + debug!("Found job {} in queue {}", job.id(), queue); + + Ok(Some(job)) + } + + async fn queue_job(&self, _queue: &str, _id: Uuid) -> Result<(), MyError> { + // Queue Job is a no-op, since jobs are always in their queue + Ok(()) + } + + async fn run_job(&self, _id: Uuid, _runner_id: Uuid) -> Result<(), MyError> { + // Run Job is a no-op, since jobs are marked running at fetch + Ok(()) + } + + async fn delete_job(&self, id: Uuid) -> Result<(), MyError> { + let conn = self.db.pool().get().await?; + debug!("Deleting job {}", id); + conn.execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id]) + .await?; + + Ok(()) + } + + async fn get_stats(&self) -> Result { + // TODO: Stats are unimplemented + Ok(Stats::default()) + } + + async fn update_stats(&self, _f: F) -> Result<(), MyError> + where + F: Fn(Stats) -> Stats + Send, + { + // TODO: Stats are unimplemented + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index bf58e8a..7a32a2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> { let config = Config::build()?; if config.debug() { - std::env::set_var("RUST_LOG", "debug") + std::env::set_var("RUST_LOG", "debug,tokio_postgres=info") } else { std::env::set_var("RUST_LOG", "info") } @@ -109,7 +109,7 @@ async fn main() -> Result<(), anyhow::Error> { rehydrate::spawn(db.clone(), state.clone()); - let job_server = create_server(); + let job_server = create_server(db.clone()); let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); diff --git a/src/schema.rs b/src/schema.rs index 4d9d3e0..39c67ae 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -7,6 +7,21 @@ table! { } } +table! { + jobs (id) { + id -> Uuid, + job_id -> Uuid, + job_queue -> Text, + job_timeout -> Int8, + job_updated -> Timestamp, + job_status -> Text, + job_value -> Jsonb, + job_next_run -> Nullable, + created_at -> Timestamp, + updated_at -> Timestamp, + } +} + table! { listeners (id) { id -> Uuid, @@ -37,6 +52,7 @@ table! { allow_tables_to_appear_in_same_query!( blocks, + jobs, listeners, settings, whitelists,