Lessen log levels, return unexecuted jobs

This commit is contained in:
asonix 2020-03-21 14:10:29 -05:00
parent aba91d70b9
commit 007d53b3c5
3 changed files with 16 additions and 10 deletions

View file

@ -5,7 +5,7 @@ use crate::{
use actix::clock::{interval_at, Duration, Instant}; use actix::clock::{interval_at, Duration, Instant};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{debug, error}; use log::{error, trace};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
sync::Arc, sync::Arc,
@ -54,14 +54,14 @@ impl Server {
} }
async fn check_db(&self) -> Result<(), Error> { async fn check_db(&self) -> Result<(), Error> {
debug!("Checking db for ready jobs"); trace!("Checking db for ready jobs");
for queue in self.cache.keys().await { for queue in self.cache.keys().await {
'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await {
if !self.try_turning(queue.clone(), worker).await? { if !self.try_turning(queue.clone(), worker).await? {
break 'worker_loop; break 'worker_loop;
} }
} }
debug!("Finished job lookups for queue {}", queue); trace!("Finished job lookups for queue {}", queue);
} }
Ok(()) Ok(())
@ -73,7 +73,7 @@ impl Server {
self.storage.new_job(job).await?; self.storage.new_job(job).await?;
if !ready { if !ready {
debug!("New job is not ready for processing yet, returning"); trace!("New job is not ready for processing yet, returning");
return Ok(()); return Ok(());
} }
@ -88,7 +88,7 @@ impl Server {
&self, &self,
worker: Box<dyn Worker + Send + 'static>, worker: Box<dyn Worker + Send + 'static>,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Worker {} requested job", worker.id()); trace!("Worker {} requested job", worker.id());
self.try_turning(worker.queue().to_owned(), worker).await?; self.try_turning(worker.queue().to_owned(), worker).await?;
@ -100,14 +100,14 @@ impl Server {
queue: String, queue: String,
worker: Box<dyn Worker + Send + 'static>, worker: Box<dyn Worker + Send + 'static>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
debug!("Trying to find job for worker {}", worker.id()); trace!("Trying to find job for worker {}", worker.id());
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
if let Err(job) = worker.process_job(job).await { if let Err(job) = worker.process_job(job).await {
error!("Worker has hung up"); error!("Worker has hung up");
self.storage.return_job(job.unexecuted()).await? self.storage.return_job(job.unexecuted()).await?
} }
} else { } else {
debug!("No job exists, returning worker {}", worker.id()); trace!("No job exists, returning worker {}", worker.id());
self.cache.push(queue.clone(), worker).await; self.cache.push(queue.clone(), worker).await;
return Ok(false); return Ok(false);
} }

View file

@ -86,6 +86,11 @@ impl JobResult {
pub fn is_missing_processor(&self) -> bool { pub fn is_missing_processor(&self) -> bool {
*self == JobResult::MissingProcessor *self == JobResult::MissingProcessor
} }
/// Check if the job was returned without an execution attempt
pub fn is_unexecuted(&self) -> bool {
*self == JobResult::Unexecuted
}
} }
#[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]

View file

@ -1,5 +1,5 @@
use chrono::offset::Utc; use chrono::offset::Utc;
use log::error; use log::info;
use std::error::Error; use std::error::Error;
use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats};
@ -85,7 +85,7 @@ pub trait Storage: Clone + Send {
Ok(Some(job)) Ok(Some(job))
} else { } else {
error!( info!(
"Not fetching job {}, it is not ready for processing", "Not fetching job {}, it is not ready for processing",
job.id() job.id()
); );
@ -109,13 +109,14 @@ pub trait Storage: Clone + Send {
self.save_job(job).await?; self.save_job(job).await?;
self.update_stats(Stats::retry_job).await self.update_stats(Stats::retry_job).await
} else { } else {
info!("Job {} failed permanently", id);
self.delete_job(id).await?; self.delete_job(id).await?;
self.update_stats(Stats::fail_job).await self.update_stats(Stats::fail_job).await
} }
} else { } else {
Ok(()) Ok(())
} }
} else if result.is_missing_processor() { } else if result.is_missing_processor() || result.is_unexecuted() {
if let Some(mut job) = self.fetch_job(id).await? { if let Some(mut job) = self.fetch_job(id).await? {
job.pending(); job.pending();
self.queue_job(job.queue(), id).await?; self.queue_job(job.queue(), id).await?;