diff --git a/Cargo.toml b/Cargo.toml index 1cfa13c..9a18838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,6 @@ completion-logging = [ ] error-logging = ["background-jobs-core/error-logging"] -[dependencies] - [dependencies.background-jobs-core] version = "0.17.0" path = "jobs-core" diff --git a/jobs-postgres/Cargo.toml b/jobs-postgres/Cargo.toml index de2bf74..8a77d5b 100644 --- a/jobs-postgres/Cargo.toml +++ b/jobs-postgres/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + [dependencies] async-trait = "0.1.24" background-jobs-core = { version = "0.17.0-beta.1", path = "../jobs-core" } @@ -17,6 +18,7 @@ diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } flume = "0.11.0" futures-core = "0.3.30" metrics = "0.22.0" +pin-project-lite = "0.2.13" refinery = { version = "0.8.11", features = ["postgres", "tokio-postgres"] } serde_json = "1.0.111" time = "0.3.31" diff --git a/jobs-postgres/src/future.rs b/jobs-postgres/src/future.rs new file mode 100644 index 0000000..94439c1 --- /dev/null +++ b/jobs-postgres/src/future.rs @@ -0,0 +1,74 @@ +use std::{ + future::Future, + time::{Duration, Instant}, +}; + +pub(super) trait Timeout: Future { + fn timeout(self, duration: Duration) -> tokio::time::Timeout + where + Self: Sized, + { + tokio::time::timeout(duration, self) + } +} + +pub(super) trait Metrics: Future { + fn metrics(self, name: &'static str) -> MetricsFuture + where + Self: Sized, + { + MetricsFuture { + future: self, + metrics: MetricsGuard { + name, + start: Instant::now(), + complete: false, + }, + } + } +} + +impl Metrics for F where F: Future {} +impl Timeout for F where F: Future {} + +pin_project_lite::pin_project! { + pub(super) struct MetricsFuture { + #[pin] + future: F, + + metrics: MetricsGuard, + } +} + +struct MetricsGuard { + name: &'static str, + start: Instant, + complete: bool, +} + +impl Future for MetricsFuture +where + F: Future, +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + + let out = std::task::ready!(this.future.poll(cx)); + + this.metrics.complete = true; + + std::task::Poll::Ready(out) + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::histogram!(self.name, "complete" => self.complete.to_string()) + .record(self.start.elapsed().as_secs_f64()); + } +} diff --git a/jobs-postgres/src/lib.rs b/jobs-postgres/src/lib.rs index 36962fd..fe32e99 100644 --- a/jobs-postgres/src/lib.rs +++ b/jobs-postgres/src/lib.rs @@ -1,6 +1,9 @@ mod embedded; +mod future; mod schema; +use future::{Metrics as _, Timeout as _}; + use std::{ collections::{BTreeSet, VecDeque}, error::Error, @@ -227,7 +230,10 @@ impl background_jobs_core::Storage for Storage { .select(PostgresJob::as_select()) .filter(id.eq(job_id)) .get_result(&mut conn) + .metrics("background-jobs.postgres.info") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? }; @@ -258,7 +264,10 @@ impl background_jobs_core::Storage for Storage { diesel::sql_query("LISTEN queue_status_channel;") .execute(&mut conn) + .metrics("background-jobs.postgres.listen") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; let count = { @@ -275,6 +284,7 @@ impl background_jobs_core::Storage for Storage { runner_id.eq(Option::::None), )) .execute(&mut conn) + .metrics("background-jobs.postgres.requeue") .await .map_err(PostgresError::Diesel)? }; @@ -316,7 +326,10 @@ impl background_jobs_core::Storage for Storage { )) .returning(PostgresJob::as_returning()) .get_result(&mut conn) + .metrics("background-jobs.postgres.claim") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? }; @@ -325,33 +338,31 @@ impl background_jobs_core::Storage for Storage { return Ok(postgres_job.into()); } - let next_queue = { + let sleep_duration = { use schema::job_queue::dsl::*; job_queue .filter(queue.eq(in_queue).and(status.eq(JobStatus::New))) .select(diesel::dsl::sql::("NOW() - next_queue")) .get_result::(&mut conn) + .metrics("background-jobs.postgres.next-queue") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? + .map(|interval| { + if interval.microseconds < 0 { + Duration::from_micros(interval.microseconds.abs_diff(0)) + } else { + Duration::from_secs(0) + } + }) + .unwrap_or(Duration::from_secs(5)) }; - let sleep_duration = next_queue - .map(|interval| { - if interval.microseconds < 0 { - Duration::from_micros(interval.microseconds.abs_diff(0)) - } else { - Duration::from_secs(0) - } - }) - .unwrap_or(Duration::from_secs(5)); - drop(conn); - if tokio::time::timeout(sleep_duration, notifier.notified()) - .await - .is_ok() - { + if notifier.notified().timeout(sleep_duration).await.is_ok() { tracing::debug!("Notified"); } else { tracing::debug!("Timed out"); @@ -369,7 +380,10 @@ impl background_jobs_core::Storage for Storage { .filter(id.eq(job_id)) .set((heartbeat.eq(diesel::dsl::now), runner_id.eq(in_runner_id))) .execute(&mut conn) + .metrics("background-jobs.postgres.heartbeat") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; } @@ -386,7 +400,10 @@ impl background_jobs_core::Storage for Storage { .filter(id.eq(return_job_info.id)) .returning(PostgresJob::as_returning()) .get_result(&mut conn) + .metrics("background-jobs.postgres.complete") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .optional() .map_err(PostgresError::Diesel)? }; @@ -492,7 +509,10 @@ impl Storage { postgres_job .insert_into(job_queue) .execute(&mut conn) + .metrics("background-jobs.postgres.insert") + .timeout(Duration::from_secs(5)) .await + .map_err(|_| PostgresError::DbTimeout)? .map_err(PostgresError::Diesel)?; }