diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 53c3795..fa3dc0a 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -12,7 +12,6 @@ edition = "2021" [dependencies] actix-rt = "2.5.1" anyhow = "1.0" -async-mutex = "1.0.1" async-trait = "0.1.24" background-jobs-core = { version = "0.14.0", path = "../jobs-core", features = [ "with-actix", diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 84576c4..5410fef 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,7 +1,6 @@ use crate::{Job, QueueHandle}; use actix_rt::time::{interval_at, Instant}; use std::time::Duration; -use tracing::error; /// A type used to schedule recurring jobs. /// @@ -20,7 +19,7 @@ where let job = job.clone(); if spawner.queue::(job).await.is_err() { - error!("Failed to queue job: {}", J::NAME); + tracing::error!("Failed to queue job: {}", J::NAME); } } } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index c268387..8f03d72 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -2,7 +2,6 @@ use crate::storage::{ActixStorage, StorageWrapper}; use anyhow::Error; use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage}; use std::sync::Arc; -use tracing::trace; use uuid::Uuid; /// The server Actor @@ -34,7 +33,7 @@ impl Server { worker_id: Uuid, worker_queue: &str, ) -> Result { - trace!("Worker {} requested job", worker_id); + tracing::trace!("Worker {} requested job", worker_id); self.storage.request_job(worker_queue, worker_id).await } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 9581d09..b86e82a 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -34,16 +34,16 @@ impl Drop for LocalWorkerStarter(F) +struct RunOnDrop(F) where - F: Fn() -> Span; + F: Fn(); -impl Drop for LogOnDrop +impl Drop for RunOnDrop where - F: Fn() -> Span, + F: Fn(), { fn drop(&mut self) { - (self.0)().in_scope(|| tracing::info!("Worker closing")); + (self.0)(); } } @@ -94,7 +94,9 @@ pub(crate) async fn local_worker( let id = Uuid::new_v4(); - let log_on_drop = LogOnDrop(|| make_span(id, &queue, "closing")); + let log_on_drop = RunOnDrop(|| { + make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing")); + }); loop { let request_span = make_span(id, &queue, "request"); @@ -119,9 +121,11 @@ pub(crate) async fn local_worker( }; drop(request_span); + let process_span = make_span(id, &queue, "process"); let job_id = job.id(); - let return_job = time_job(Box::pin(processors.process(job)), job_id) - .instrument(make_span(id, &queue, "process")) + let return_job = process_span + .in_scope(|| time_job(Box::pin(processors.process(job)), job_id)) + .instrument(process_span) .await; let return_span = make_span(id, &queue, "return"); @@ -147,6 +151,7 @@ pub(crate) async fn local_worker( fn make_span(id: Uuid, queue: &str, operation: &str) -> Span { tracing::info_span!( + parent: None, "Worker", worker.id = tracing::field::display(id), worker.queue = tracing::field::display(queue),