From 007d53b3c586e16ade4bf4b36c30dacc27767ba2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 21 Mar 2020 14:10:29 -0500 Subject: [PATCH] Lessen log levels, return unexecuted jobs --- jobs-actix/src/server.rs | 14 +++++++------- jobs-core/src/lib.rs | 5 +++++ jobs-core/src/storage.rs | 7 ++++--- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index eee99eb..ed4925d 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -5,7 +5,7 @@ use crate::{ use actix::clock::{interval_at, Duration, Instant}; use anyhow::Error; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; -use log::{debug, error}; +use log::{error, trace}; use std::{ collections::{HashMap, VecDeque}, sync::Arc, @@ -54,14 +54,14 @@ impl Server { } async fn check_db(&self) -> Result<(), Error> { - debug!("Checking db for ready jobs"); + trace!("Checking db for ready jobs"); for queue in self.cache.keys().await { 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { if !self.try_turning(queue.clone(), worker).await? { break 'worker_loop; } } - debug!("Finished job lookups for queue {}", queue); + trace!("Finished job lookups for queue {}", queue); } Ok(()) @@ -73,7 +73,7 @@ impl Server { self.storage.new_job(job).await?; if !ready { - debug!("New job is not ready for processing yet, returning"); + trace!("New job is not ready for processing yet, returning"); return Ok(()); } @@ -88,7 +88,7 @@ impl Server { &self, worker: Box, ) -> Result<(), Error> { - debug!("Worker {} requested job", worker.id()); + trace!("Worker {} requested job", worker.id()); self.try_turning(worker.queue().to_owned(), worker).await?; @@ -100,14 +100,14 @@ impl Server { queue: String, worker: Box, ) -> Result { - debug!("Trying to find job for worker {}", worker.id()); + trace!("Trying to find job for worker {}", worker.id()); if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if let Err(job) = worker.process_job(job).await { error!("Worker has hung up"); self.storage.return_job(job.unexecuted()).await? } } else { - debug!("No job exists, returning worker {}", worker.id()); + trace!("No job exists, returning worker {}", worker.id()); self.cache.push(queue.clone(), worker).await; return Ok(false); } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 85807f9..67fe03d 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -86,6 +86,11 @@ impl JobResult { pub fn is_missing_processor(&self) -> bool { *self == JobResult::MissingProcessor } + + /// Check if the job was returned without an execution attempt + pub fn is_unexecuted(&self) -> bool { + *self == JobResult::Unexecuted + } } #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index d2f0804..383e036 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,5 +1,5 @@ use chrono::offset::Utc; -use log::error; +use log::info; use std::error::Error; use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; @@ -85,7 +85,7 @@ pub trait Storage: Clone + Send { Ok(Some(job)) } else { - error!( + info!( "Not fetching job {}, it is not ready for processing", job.id() ); @@ -109,13 +109,14 @@ pub trait Storage: Clone + Send { self.save_job(job).await?; self.update_stats(Stats::retry_job).await } else { + info!("Job {} failed permanently", id); self.delete_job(id).await?; self.update_stats(Stats::fail_job).await } } else { Ok(()) } - } else if result.is_missing_processor() { + } else if result.is_missing_processor() || result.is_unexecuted() { if let Some(mut job) = self.fetch_job(id).await? { job.pending(); self.queue_job(job.queue(), id).await?;