From 77cdeab57e91faae5540864aa648863064ce8731 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 22 Jul 2023 21:11:28 -0500 Subject: [PATCH] Improve metrics, add job metrics --- src/backgrounded.rs | 8 +++---- src/generate.rs | 6 +---- src/ingest.rs | 8 +++---- src/process.rs | 8 ++----- src/queue.rs | 41 +++++++++++++++++++++++++++++++- src/repo/sled.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 108 insertions(+), 20 deletions(-) diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 69c79721..7a8b9ecb 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -74,9 +74,11 @@ where S: Store, { fn drop(&mut self) { - if self.identifier.is_some() || self.upload_id.is_some() { - metrics::increment_counter!("pict-rs.background.upload.failure"); + let any_items = self.identifier.is_some() || self.upload_id.is_some(); + metrics::increment_counter!("pict-rs.background.upload", "completed" => (!any_items).to_string()); + + if any_items { let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped backgrounded cleanup"); cleanup_parent_span.follows_from(Span::current()); @@ -110,8 +112,6 @@ where ) }); } - } else { - metrics::increment_counter!("pict-rs.background.upload.success"); } } } diff --git a/src/generate.rs b/src/generate.rs index 3f12855f..8c0ad00b 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -34,11 +34,7 @@ impl MetricsGuard { impl Drop for MetricsGuard { fn drop(&mut self) { metrics::histogram!("pict-rs.generate.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string()); - if self.armed { - metrics::increment_counter!("pict-rs.generate.failure"); - } else { - metrics::increment_counter!("pict-rs.generate.success"); - } + metrics::increment_counter!("pict-rs.generate.end", "completed" => (!self.armed).to_string()); } } diff --git a/src/ingest.rs b/src/ingest.rs index 037a147c..7c77ef43 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -224,9 +224,11 @@ where S: Store, { fn drop(&mut self) { - if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { - metrics::increment_counter!("pict-rs.ingest.failure"); + let any_items = self.hash.is_some() || self.alias.is_some() || self.identifier.is_some(); + metrics::increment_counter!("pict-rs.ingest.end", "completed" => (!any_items).to_string()); + + if self.hash.is_some() || self.alias.is_some() | self.identifier.is_some() { let cleanup_parent_span = tracing::info_span!(parent: None, "Dropped session cleanup"); cleanup_parent_span.follows_from(Span::current()); @@ -281,8 +283,6 @@ where ) }); } - } else { - metrics::increment_counter!("pict-rs.ingest.success"); } } } diff --git a/src/process.rs b/src/process.rs index 63ca8f46..abd7fc35 100644 --- a/src/process.rs +++ b/src/process.rs @@ -22,7 +22,7 @@ struct MetricsGuard { impl MetricsGuard { fn guard(command: String) -> Self { - metrics::increment_counter!("pict-rs.process.spawn", "command" => command.clone()); + metrics::increment_counter!("pict-rs.process.start", "command" => command.clone()); Self { start: Instant::now(), @@ -45,11 +45,7 @@ impl Drop for MetricsGuard { "completed" => (!self.armed).to_string(), ); - if self.armed { - metrics::increment_counter!("pict-rs.process.failure", "command" => self.command.clone()); - } else { - metrics::increment_counter!("pict-rs.process.success", "command" => self.command.clone()); - } + metrics::increment_counter!("pict-rs.process.end", "completed" => (!self.armed).to_string() , "command" => self.command.clone()); } } diff --git a/src/queue.rs b/src/queue.rs index b0e5633e..d36c758a 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -10,7 +10,7 @@ use crate::{ store::{Identifier, Store}, }; use base64::{prelude::BASE64_STANDARD, Engine}; -use std::{future::Future, path::PathBuf, pin::Pin}; +use std::{future::Future, path::PathBuf, pin::Pin, time::Instant}; use tracing::Instrument; mod cleanup; @@ -235,6 +235,37 @@ async fn process_jobs( } } +struct MetricsGuard { + worker_id: String, + queue: &'static str, + start: Instant, + armed: bool, +} + +impl MetricsGuard { + fn guard(worker_id: String, queue: &'static str) -> Self { + metrics::increment_counter!("pict-rs.job.start", "queue" => queue, "worker-id" => worker_id.clone()); + + Self { + worker_id, + queue, + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for MetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.job.duration", self.start.elapsed().as_secs_f64(), "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string()); + metrics::increment_counter!("pict-rs.job.end", "queue" => self.queue, "worker-id" => self.worker_id.clone(), "completed" => (!self.armed).to_string()); + } +} + async fn job_loop( repo: &R, store: &S, @@ -255,9 +286,13 @@ where let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + let guard = MetricsGuard::guard(worker_id.clone(), queue); + span.in_scope(|| (callback)(repo, store, config, bytes.as_ref())) .instrument(span) .await?; + + guard.disarm(); } } @@ -331,8 +366,12 @@ where let span = tracing::info_span!("Running Job", worker_id = ?worker_id); + let guard = MetricsGuard::guard(worker_id.clone(), queue); + span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref())) .instrument(span) .await?; + + guard.disarm(); } } diff --git a/src/repo/sled.rs b/src/repo/sled.rs index 302f3236..0d0f28a1 100644 --- a/src/repo/sled.rs +++ b/src/repo/sled.rs @@ -19,6 +19,7 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, }, + time::Instant, }; use tokio::{sync::Notify, task::JoinHandle}; @@ -468,6 +469,54 @@ impl From for UploadResult { } } +struct PushMetricsGuard { + queue: &'static str, + armed: bool, +} + +struct PopMetricsGuard { + queue: &'static str, + start: Instant, + armed: bool, +} + +impl PushMetricsGuard { + fn guard(queue: &'static str) -> Self { + Self { queue, armed: true } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl PopMetricsGuard { + fn guard(queue: &'static str) -> Self { + Self { + queue, + start: Instant::now(), + armed: true, + } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +impl Drop for PushMetricsGuard { + fn drop(&mut self) { + metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + +impl Drop for PopMetricsGuard { + fn drop(&mut self) { + metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue); + metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue); + } +} + #[async_trait::async_trait(?Send)] impl UploadRepo for SledRepo { #[tracing::instrument(level = "trace", skip(self))] @@ -577,6 +626,8 @@ impl QueueRepo for SledRepo { #[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))] async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> { + let metrics_guard = PushMetricsGuard::guard(queue_name); + let id = self.db.generate_id().map_err(SledError::from)?; let mut key = queue_name.as_bytes().to_vec(); key.extend(id.to_be_bytes()); @@ -585,6 +636,7 @@ impl QueueRepo for SledRepo { if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) { notifier.notify_one(); + metrics_guard.disarm(); return Ok(()); } @@ -595,6 +647,8 @@ impl QueueRepo for SledRepo { .or_insert_with(|| Arc::new(Notify::new())) .notify_one(); + metrics_guard.disarm(); + Ok(()) } @@ -604,6 +658,8 @@ impl QueueRepo for SledRepo { queue_name: &'static str, worker_id: Vec, ) -> Result { + let metrics_guard = PopMetricsGuard::guard(queue_name); + loop { let in_progress_queue = self.in_progress_queue.clone(); @@ -633,6 +689,7 @@ impl QueueRepo for SledRepo { }); if let Some(job) = job { + metrics_guard.disarm(); return Ok(job); }