From 1d1d2047dbfaecc907e032563026eafa4c1f6e51 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 22 Mar 2023 21:59:37 -0500 Subject: [PATCH] Initial metrics implementaiton --- .gitignore | 2 + Cargo.toml | 4 + flake.lock | 43 +++++ flake.nix | 25 +++ src/lib.rs | 38 +++++ src/recorder.rs | 367 +++++++++++++++++++++++++++++++++++++++++ src/recorder/bucket.rs | 96 +++++++++++ 7 files changed, 575 insertions(+) create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 src/recorder.rs create mode 100644 src/recorder/bucket.rs diff --git a/.gitignore b/.gitignore index 22b9211..731ab4a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ **/*/target **/*.rs.bk Cargo.lock +/.envrc +/.direnv diff --git a/Cargo.toml b/Cargo.toml index 892786f..5030abf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,10 @@ completion-logging = [ ] error-logging = ["background-jobs-core/error-logging"] +[dependencies] +metrics = "0.20.1" +metrics-util = "0.14.0" + [dependencies.background-jobs-core] version = "0.14.0" path = "jobs-core" diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..55f41fb --- /dev/null +++ b/flake.lock @@ -0,0 +1,43 @@ +{ + "nodes": { + "flake-utils": { + "locked": { + "lastModified": 1678901627, + "narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1679437018, + "narHash": "sha256-vOuiDPLHSEo/7NkiWtxpHpHgoXoNmrm+wkXZ6a072Fc=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "19cf008bb18e47b6e3b4e16e32a9a4bdd4b45f7e", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..83f55b2 --- /dev/null +++ b/flake.nix @@ -0,0 +1,25 @@ +{ + description = "background-jobs"; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = import nixpkgs { + inherit system; + }; + in + { + packages.default = pkgs.hello; + + devShell = with pkgs; mkShell { + nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ]; + + RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}"; + }; + }); +} diff --git a/src/lib.rs b/src/lib.rs index 20df228..4a98797 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -161,8 +161,46 @@ //! `background-jobs-core` crate, which provides the Job trait, as well as some //! other useful types for implementing a jobs processor and job store. +mod recorder; + pub use background_jobs_core::{Backoff, Job, MaxRetries}; +pub mod metrics { + //! Types for collecting stats from background-jobs + pub use metrics::SetRecorderError; + + pub use super::recorder::{JobStat, Stats, StatsHandle, StatsRecorder}; + + /// Install the stats recorder into the process + /// + /// ```rust + /// background_jobs::metrics::install().expect("Failed to install recorder"); + /// ``` + pub fn install() -> Result { + StatsRecorder::install() + } + + /// Build the stats recorder and fetch the handle. + /// + /// This can be used in conjunction with `metrics_util::layers::FanoutBuilder` to add it in + /// addition to another recorder + /// + /// ```rust + /// let (jobs_recorder, handle) = background_jobs::metrics::build(); + /// + /// let recorder = metrics_util::layers::FanoutBuilder::default() + /// .add_recorder(jobs_recorder) + /// .build(); + /// + /// metrics::set_boxed_recorder(Box::new(recorder)).expect("Failed to set recorder"); + /// + /// println!("{:?}", handle.get()); + /// ``` + pub fn build() -> (StatsRecorder, StatsHandle) { + StatsRecorder::build() + } +} + pub mod dev { //! Useful types and methods for developing Storage and Processor implementations. pub use background_jobs_core::{ diff --git a/src/recorder.rs b/src/recorder.rs new file mode 100644 index 0000000..4171c61 --- /dev/null +++ b/src/recorder.rs @@ -0,0 +1,367 @@ +mod bucket; + +use self::bucket::Buckets; +use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError}; +use metrics_util::registry::{Registry, Storage}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant}, +}; + +/// Handle into the stats store +pub struct StatsHandle { + storage: Arc, +} + +pub struct StatsRecorder { + registry: Registry, +} + +pub struct StatsStorage { + inner: Arc, +} + +struct StatsStorageInner { + queued: Arc, + started: Arc, + finished: Arc, + complete: Arc>, + dead: Arc>, +} + +pub struct JobStatStorage { + hour: Buckets, + day: Buckets, + month: Buckets, + total: u64, +} + +#[derive(Clone)] +pub enum Counter { + Buckets { buckets: Arc> }, + Atomic { atomic: Arc }, + Empty, +} + +#[derive(Clone)] +pub struct Gauge; + +#[derive(Clone)] +pub struct Histogram; + +/// Stats about a particular field +#[derive(Clone, Debug)] +pub struct JobStat { + /// Occurrences in the last hour + pub hour: u64, + + /// Occurrences in the last day + pub day: u64, + + /// Occurrences in the last month + pub month: u64, + + /// Total occurrences + pub total: u64, +} + +/// Stats about the background jobs processor +#[derive(Clone, Debug)] +pub struct Stats { + /// Jobs that have not yet executed + pub pending: u64, + + /// Jobs that are currently executing + pub running: u64, + + /// Jobs that have completed + pub complete: JobStat, + + /// Jobs that are dead + pub dead: JobStat, +} + +impl StatsHandle { + /// Get the current stats about the background jobs processing + /// + /// ```rust + /// # let (recorder, handle) = background_jobs::metrics::StatsRecorder::build(); + /// println!("{:?}", handle.get()); + /// ``` + pub fn get(&self) -> Stats { + self.storage.snapshot() + } +} + +impl StatsRecorder { + /// Install the stats recorder into the process + /// + /// ```rust + /// # use background_jobs::metrics::StatsRecorder; + /// StatsRecorder::install().expect("Failed to install recorder"); + /// ``` + pub fn install() -> Result { + let (recorder, handle) = Self::build(); + + metrics::set_boxed_recorder(Box::new(recorder))?; + + Ok(handle) + } + + /// Build the stats recorder and fetch the handle. + /// + /// This can be used in conjunction with `metrics_util::layers::FanoutBuilder` to add it in + /// addition to another recorder + /// + /// ```rust + /// # use background_jobs::metrics::StatsRecorder; + /// let (jobs_recorder, handle) = StatsRecorder::build(); + /// + /// let recorder = metrics_util::layers::FanoutBuilder::default() + /// .add_recorder(jobs_recorder) + /// .build(); + /// + /// metrics::set_boxed_recorder(Box::new(recorder)).expect("Failed to set recorder"); + /// + /// println!("{:?}", handle.get()); + /// ``` + pub fn build() -> (StatsRecorder, StatsHandle) { + let storage = Arc::new(StatsStorageInner::new()); + + let registry = Registry::new(StatsStorage { + inner: Arc::clone(&storage), + }); + + (StatsRecorder { registry }, StatsHandle { storage }) + } +} + +impl JobStatStorage { + fn new() -> Self { + Self::default() + } + + fn snapshot(&self) -> JobStat { + JobStat { + hour: self.hour.sum(), + day: self.day.sum(), + month: self.month.sum(), + total: self.total, + } + } +} + +impl StatsStorageInner { + fn new() -> Self { + Self::default() + } + + fn snapshot(&self) -> Stats { + let complete = self.complete.lock().unwrap().snapshot(); + let dead = self.dead.lock().unwrap().snapshot(); + let queued = self.queued.load(Ordering::Relaxed); + + let started = self.started.load(Ordering::Relaxed); + let finished = self.finished.load(Ordering::Relaxed); + + let running = started.saturating_sub(finished); + + let pending = queued + .saturating_sub(complete.total) + .saturating_sub(dead.total) + .saturating_sub(running); + + Stats { + pending, + running, + complete, + dead, + } + } +} + +impl Recorder for StatsRecorder { + fn describe_counter( + &self, + _: metrics::KeyName, + _: Option, + _: metrics::SharedString, + ) { + } + fn describe_gauge( + &self, + _: metrics::KeyName, + _: Option, + _: metrics::SharedString, + ) { + } + fn describe_histogram( + &self, + _: metrics::KeyName, + _: Option, + _: metrics::SharedString, + ) { + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + self.registry + .get_or_create_counter(key, |c| c.clone().into()) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + self.registry.get_or_create_gauge(key, |c| c.clone().into()) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + self.registry + .get_or_create_histogram(key, |c| c.clone().into()) + } +} + +impl Storage for StatsStorage { + type Counter = Arc; + type Gauge = Arc; + type Histogram = Arc; + + fn counter(&self, key: &Key) -> Self::Counter { + match key.name() { + "background-jobs.job.created" => Arc::new(Counter::Atomic { + atomic: Arc::clone(&self.inner.queued), + }), + "background-jobs.job.started" => Arc::new(Counter::Atomic { + atomic: Arc::clone(&self.inner.started), + }), + "background-jobs.job.finished" => Arc::new(Counter::Atomic { + atomic: Arc::clone(&self.inner.finished), + }), + "background-jobs.job.completed" => Arc::new(Counter::Buckets { + buckets: Arc::clone(&self.inner.complete), + }), + "background-jobs.job.dead" => Arc::new(Counter::Buckets { + buckets: Arc::clone(&self.inner.dead), + }), + _ => Arc::new(Counter::Empty), + } + } + + fn gauge(&self, _: &Key) -> Self::Gauge { + Arc::new(Gauge) + } + + fn histogram(&self, _: &Key) -> Self::Histogram { + Arc::new(Histogram) + } +} + +impl CounterFn for Counter { + fn increment(&self, value: u64) { + match self { + Self::Buckets { ref buckets } => { + let timestamp = Instant::now(); + + let mut guard = buckets.lock().unwrap(); + guard.hour.count(value, timestamp); + guard.day.count(value, timestamp); + guard.month.count(value, timestamp); + guard.total = guard.total.saturating_add(value); + } + Self::Atomic { ref atomic } => { + let mut current = atomic.load(Ordering::Acquire); + loop { + match atomic.compare_exchange_weak( + current, + current + value, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(x) => current = x, + } + } + } + Self::Empty => {} + } + } + + fn absolute(&self, value: u64) { + match self { + Self::Buckets { ref buckets } => { + let mut guard = buckets.lock().unwrap(); + if guard.total < value { + guard.total = value; + } + } + Self::Atomic { ref atomic } => { + let mut current = atomic.load(Ordering::Acquire); + loop { + match atomic.compare_exchange_weak( + current, + value, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(x) => current = x, + } + } + } + Self::Empty => {} + } + } +} + +impl GaugeFn for Gauge { + fn increment(&self, _: f64) {} + + fn decrement(&self, _: f64) {} + + fn set(&self, _: f64) {} +} + +impl HistogramFn for Histogram { + fn record(&self, _: f64) {} +} + +const SECONDS: u64 = 1; +const MINUTES: u64 = 60 * SECONDS; +const HOURS: u64 = 60 * MINUTES; +const DAYS: u64 = 24 * HOURS; +const MONTHS: u64 = 30 * DAYS; + +impl Default for JobStatStorage { + fn default() -> Self { + JobStatStorage { + hour: Buckets::new( + Duration::from_secs(1 * HOURS), + Duration::from_secs(3 * MINUTES), + 20, + ), + day: Buckets::new( + Duration::from_secs(1 * DAYS), + Duration::from_secs(1 * HOURS), + 24, + ), + month: Buckets::new( + Duration::from_secs(1 * MONTHS), + Duration::from_secs(1 * DAYS), + 30, + ), + total: 0, + } + } +} + +impl Default for StatsStorageInner { + fn default() -> Self { + Self { + queued: Arc::new(AtomicU64::new(0)), + started: Arc::new(AtomicU64::new(0)), + finished: Arc::new(AtomicU64::new(0)), + complete: Arc::new(Mutex::new(JobStatStorage::new())), + dead: Arc::new(Mutex::new(JobStatStorage::new())), + } + } +} diff --git a/src/recorder/bucket.rs b/src/recorder/bucket.rs new file mode 100644 index 0000000..e5ef714 --- /dev/null +++ b/src/recorder/bucket.rs @@ -0,0 +1,96 @@ +use std::{ + collections::VecDeque, + time::{Duration, Instant}, +}; + +pub struct Bucket { + timestamp: Instant, + count: u64, +} + +pub(crate) struct Buckets { + oldest: Duration, + span: Duration, + max: usize, + buckets: VecDeque, +} + +impl Buckets { + pub(super) fn new(oldest: Duration, span: Duration, max: usize) -> Self { + Self { + oldest, + span, + max, + buckets: VecDeque::new(), + } + } + + pub(super) fn sum(&self) -> u64 { + self.buckets.iter().fold(0, |acc, item| acc + item.count) + } + + pub(super) fn count(&mut self, value: u64, timestamp: Instant) { + while let Some(bucket) = self.buckets.front() { + if bucket.timestamp + self.oldest < timestamp { + self.buckets.pop_front(); + continue; + } + + break; + } + + if let Some(bucket) = self.bucket_mut(timestamp) { + bucket.count += value; + return; + } + + self.insert(value, timestamp); + } + + fn bucket_mut(&mut self, timestamp: Instant) -> Option<&mut Bucket> { + self.buckets.iter_mut().find(|bucket| { + if let Some(upper) = bucket.timestamp.checked_add(self.span) { + bucket.timestamp < timestamp && timestamp <= upper + } else { + false + } + }) + } + + fn insert(&mut self, value: u64, timestamp: Instant) { + if self.buckets.len() == self.max { + self.buckets.pop_front(); + } + + let found = self + .buckets + .iter() + .enumerate() + .find(|(_, bucket)| timestamp < bucket.timestamp); + + if let Some((index, bucket)) = found { + let mut timestamp_index = bucket.timestamp; + + while let Some(lower) = timestamp_index.checked_sub(self.span) { + if lower < timestamp { + self.buckets.insert( + index, + Bucket { + timestamp: lower, + count: value, + }, + ); + + return; + } + + timestamp_index = lower; + } + } else { + self.buckets.push_back(Bucket { + timestamp, + count: value, + }); + } + } +}