diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 9990bbd..c268387 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -1,11 +1,9 @@ -use crate::{ - storage::{ActixStorage, StorageWrapper}, - worker::Worker, -}; +use crate::storage::{ActixStorage, StorageWrapper}; use anyhow::Error; -use background_jobs_core::{NewJobInfo, ReturnJobInfo, Storage}; +use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage}; use std::sync::Arc; -use tracing::{error, trace}; +use tracing::trace; +use uuid::Uuid; /// The server Actor /// @@ -28,33 +26,16 @@ impl Server { } pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { - let ready = job.is_ready(); - self.storage.new_job(job).await?; - - if !ready { - trace!("New job is not ready for processing yet, returning"); - return Ok(()); - } - - Ok(()) + self.storage.new_job(job).await.map(|_| ()) } pub(crate) async fn request_job( &self, - worker: Box, - ) -> Result<(), Error> { - trace!("Worker {} requested job", worker.id()); - let job = self - .storage - .request_job(worker.queue(), worker.id()) - .await?; - - if let Err(job) = worker.process(job).await { - error!("Worker {} has hung up", worker.id()); - self.storage.return_job(job.unexecuted()).await?; - } - - Ok(()) + worker_id: Uuid, + worker_queue: &str, + ) -> Result { + trace!("Worker {} requested job", worker_id); + self.storage.request_job(worker_queue, worker_id).await } pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index c307a6b..9581d09 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,63 +1,10 @@ -use std::future::Future; - use crate::Server; -use background_jobs_core::{CachedProcessorMap, JobInfo}; -use tokio::sync::mpsc::{channel, Sender}; +use background_jobs_core::CachedProcessorMap; +use std::future::Future; use tracing::Span; use tracing_futures::Instrument; use uuid::Uuid; -#[async_trait::async_trait] -pub trait Worker { - async fn process(&self, job: JobInfo) -> Result<(), JobInfo>; - - fn id(&self) -> Uuid; - - fn queue(&self) -> &str; -} - -#[derive(Clone)] -pub(crate) struct LocalWorkerHandle { - tx: Sender, - id: Uuid, - queue: String, -} - -impl LocalWorkerHandle { - fn span(&self, operation: &str) -> Span { - tracing::info_span!( - "Worker", - worker.id = tracing::field::display(&self.id), - worker.queue = tracing::field::display(&self.queue), - worker.operation.id = tracing::field::display(&Uuid::new_v4()), - worker.operation.name = tracing::field::display(operation), - exception.message = tracing::field::Empty, - exception.details = tracing::field::Empty, - ) - } -} - -#[async_trait::async_trait] -impl Worker for LocalWorkerHandle { - async fn process(&self, job: JobInfo) -> Result<(), JobInfo> { - match self.tx.clone().send(job).await { - Err(e) => { - tracing::error!("Unable to send job"); - Err(e.0) - } - _ => Ok(()), - } - } - - fn id(&self) -> Uuid { - self.id - } - - fn queue(&self) -> &str { - &self.queue - } -} - struct LocalWorkerStarter { queue: String, processors: CachedProcessorMap, @@ -146,53 +93,66 @@ pub(crate) async fn local_worker( }; let id = Uuid::new_v4(); - let (tx, mut rx) = channel(16); - let handle = LocalWorkerHandle { tx, id, queue }; - - let log_on_drop = LogOnDrop(|| handle.span("closing")); + let log_on_drop = LogOnDrop(|| make_span(id, &queue, "closing")); loop { - let span = handle.span("request"); - if let Err(e) = server - .request_job(Box::new(handle.clone())) - .instrument(span.clone()) + let request_span = make_span(id, &queue, "request"); + + let job = match request_span + .in_scope(|| server.request_job(id, &queue)) + .instrument(request_span.clone()) .await { - metrics::counter!("background-jobs.worker.failed-request", 1); + Ok(job) => job, + Err(e) => { + metrics::counter!("background-jobs.worker.failed-request", 1); - let display = format!("{}", e); - let debug = format!("{:?}", e); - span.record("exception.message", &tracing::field::display(&display)); - span.record("exception.details", &tracing::field::display(&debug)); - span.in_scope(|| tracing::error!("Failed to notify server of ready worker, {}", e)); - break; - } - drop(span); - - if let Some(job) = rx.recv().await { - let job_id = job.id(); - let return_job = time_job(Box::pin(processors.process(job)), job_id) - .instrument(handle.span("process")) - .await; - - let span = handle.span("return"); - if let Err(e) = server.return_job(return_job).instrument(span.clone()).await { - metrics::counter!("background-jobs.worker.failed-return", 1); - - let display = format!("{}", e); + let display_val = format!("{}", e); let debug = format!("{:?}", e); - span.record("exception.message", &tracing::field::display(&display)); - span.record("exception.details", &tracing::field::display(&debug)); - span.in_scope(|| tracing::warn!("Failed to return completed job, {}", e)); + request_span.record("exception.message", &tracing::field::display(&display_val)); + request_span.record("exception.details", &tracing::field::display(&debug)); + request_span + .in_scope(|| tracing::error!("Failed to notify server of ready worker")); + break; } + }; + drop(request_span); - continue; + let job_id = job.id(); + let return_job = time_job(Box::pin(processors.process(job)), job_id) + .instrument(make_span(id, &queue, "process")) + .await; + + let return_span = make_span(id, &queue, "return"); + if let Err(e) = return_span + .in_scope(|| server.return_job(return_job)) + .instrument(return_span.clone()) + .await + { + metrics::counter!("background-jobs.worker.failed-return", 1); + + let display_val = format!("{}", e); + let debug = format!("{:?}", e); + return_span.record("exception.message", &tracing::field::display(&display_val)); + return_span.record("exception.details", &tracing::field::display(&debug)); + return_span.in_scope(|| tracing::warn!("Failed to return completed job")); } - - break; + drop(return_span); } drop(log_on_drop); drop(starter); } + +fn make_span(id: Uuid, queue: &str, operation: &str) -> Span { + tracing::info_span!( + "Worker", + worker.id = tracing::field::display(id), + worker.queue = tracing::field::display(queue), + worker.operation.id = tracing::field::display(&Uuid::new_v4()), + worker.operation.name = tracing::field::display(operation), + exception.message = tracing::field::Empty, + exception.details = tracing::field::Empty, + ) +}