From 3ab2bef826ef16e4e1daa90b880f6af69d67dbb1 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 8 Jan 2024 16:29:29 -0600 Subject: [PATCH] jobs-metrics: Add Storage type to provide metrics for any storage backend --- jobs-metrics/Cargo.toml | 4 ++ jobs-metrics/src/lib.rs | 100 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/jobs-metrics/Cargo.toml b/jobs-metrics/Cargo.toml index 973f27e..4533375 100644 --- a/jobs-metrics/Cargo.toml +++ b/jobs-metrics/Cargo.toml @@ -12,5 +12,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.24" +background-jobs-core = { version = "0.16.0", path = "../jobs-core" } metrics = "0.22.0" metrics-util = "0.16.0" +tracing = "0.1" +uuid = { version = "1.6", features = ["serde", "v7"] } diff --git a/jobs-metrics/src/lib.rs b/jobs-metrics/src/lib.rs index a9e3e25..4761ceb 100644 --- a/jobs-metrics/src/lib.rs +++ b/jobs-metrics/src/lib.rs @@ -53,3 +53,103 @@ pub fn install() -> Result> { pub fn build() -> (StatsRecorder, StatsHandle) { StatsRecorder::build() } + +#[derive(Clone, Debug)] +/// A wrapper for any Storage type adding metrics +pub struct MetricsStorage(S); + +impl MetricsStorage { + /// Add metrics to a provided Storage + pub const fn wrap(storage: S) -> MetricsStorage + where + S: background_jobs_core::Storage, + { + Self(storage) + } +} + +#[async_trait::async_trait] +impl background_jobs_core::Storage for MetricsStorage +where + S: background_jobs_core::Storage + Sync, +{ + type Error = S::Error; + + async fn info( + &self, + job_id: uuid::Uuid, + ) -> Result, Self::Error> { + self.0.info(job_id).await + } + + async fn push(&self, job: background_jobs_core::NewJobInfo) -> Result { + let queue = job.queue().to_string(); + let name = job.name().to_string(); + + let uuid = self.0.push(job).await?; + + metrics::counter!("background-jobs.job.created", "queue" => queue, "name" => name) + .increment(1); + + Ok(uuid) + } + + async fn pop( + &self, + queue: &str, + runner_id: uuid::Uuid, + ) -> Result { + let job_info = self.0.pop(queue, runner_id).await?; + + metrics::counter!("background-jobs.job.started", "queue" => job_info.queue.clone(), "name" => job_info.name.clone()).increment(1); + + Ok(job_info) + } + + async fn heartbeat( + &self, + job_id: uuid::Uuid, + runner_id: uuid::Uuid, + ) -> Result<(), Self::Error> { + self.0.heartbeat(job_id, runner_id).await + } + + async fn complete( + &self, + return_job_info: background_jobs_core::ReturnJobInfo, + ) -> Result { + let info = if let Some(info) = self.0.info(return_job_info.id).await? { + Some(info) + } else { + tracing::warn!("Returned non-existant job"); + metrics::counter!("background-jobs.job.missing").increment(1); + None + }; + + let result = return_job_info.result; + + let completed = self.0.complete(return_job_info).await?; + + if let Some(info) = info { + metrics::counter!("background-jobs.job.finished", "queue" => info.queue.clone(), "name" => info.name.clone()).increment(1); + + match result { + background_jobs_core::JobResult::Success => { + metrics::counter!("background-jobs.job.completed", "queue" => info.queue, "name" => info.name).increment(1); + } + background_jobs_core::JobResult::Failure if completed => { + metrics::counter!("background-jobs.job.dead", "queue" => info.queue, "name" => info.name).increment(1); + } + background_jobs_core::JobResult::Failure => { + metrics::counter!("background-jobs.job.failed", "queue" => info.queue, "name" => info.name).increment(1); + } + background_jobs_core::JobResult::Unexecuted + | background_jobs_core::JobResult::Unregistered => { + metrics::counter!("background-jobs.job.returned", "queue" => info.queue, "name" => info.name).increment(1); + } + } + } + + Ok(completed) + } +}