diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index bc9633b..3e16dd0 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1", default-features = false, features = [ "macros", "rt", "sync", + "time", "tracing", ] } uuid = { version = "1", features = ["v7", "serde"] } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index bd2d70a..4c1613c 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -99,11 +99,11 @@ //! // Jobs can optionally override this value //! const BACKOFF: Backoff = Backoff::Exponential(2); //! -//! // When should the job be considered dead +//! // This is important for allowing the job server to reap processes that were started but never +//! // completed. //! // -//! // The timeout defines when a job is allowed to be considered dead, and so can be retried -//! // by the job processor. The value is in milliseconds and defaults to 15,000 -//! const TIMEOUT: i64 = 15_000; +//! // Defaults to 5 seconds +//! const HEARTBEAT_INTERVAL: u64 = 5_000; //! //! fn run(self, state: MyState) -> Self::Future { //! println!("{}: args, {:?}", state.app_name, self); @@ -197,7 +197,7 @@ impl Manager { notified.await; - metrics::counter!("background-jobs.worker-arbiter.restart", "number" => i.to_string()).increment(1); + metrics::counter!("background-jobs.actix.worker-arbiter.restart", "number" => i.to_string()).increment(1); tracing::warn!("Recovering from dead worker arbiter"); drop(worker_arbiter); diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index b1738ac..2f089df 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -16,7 +16,7 @@ struct LocalWorkerStarter { impl Drop for LocalWorkerStarter { fn drop(&mut self) { - metrics::counter!("background-jobs.worker.finished", "queue" => self.queue.clone()) + metrics::counter!("background-jobs.actix.worker.finished", "queue" => self.queue.clone()) .increment(1); let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {})); @@ -141,7 +141,8 @@ pub(crate) async fn local_worker( State: Clone + 'static, Extras: 'static, { - metrics::counter!("background-jobs.worker.started", "queue" => queue.clone()).increment(1); + metrics::counter!("background-jobs.actix.worker.started", "queue" => queue.clone()) + .increment(1); let starter = LocalWorkerStarter { queue: queue.clone(), @@ -166,7 +167,7 @@ pub(crate) async fn local_worker( { Ok(job) => job, Err(e) => { - metrics::counter!("background-jobs.worker.failed-request").increment(1); + metrics::counter!("background-jobs.actix.worker.failed-request").increment(1); let display_val = format!("{}", e); let debug = format!("{:?}", e); @@ -201,7 +202,7 @@ pub(crate) async fn local_worker( .instrument(return_span.clone()) .await { - metrics::counter!("background-jobs.worker.failed-return").increment(1); + metrics::counter!("background-jobs.actix.worker.failed-return").increment(1); let display_val = format!("{}", e); let debug = format!("{:?}", e);