diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index f95aa00..4f6fa4f 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -2,6 +2,7 @@ use crate::{ storage::{ActixStorage, StorageWrapper}, worker::Worker, }; +use actix::clock::{interval_at, Duration, Instant}; use anyhow::Error; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use log::{error, trace}; @@ -32,10 +33,42 @@ impl Server { where S: Storage + Sync + 'static, { - Server { + let server = Server { storage: Arc::new(StorageWrapper(storage)), cache: ServerCache::new(), + }; + + let server2 = server.clone(); + actix::spawn(async move { + let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); + + loop { + interval.tick().await; + if let Err(e) = server.check_db().await { + error!("Error while checking database for new jobs, {}", e); + } + } + }); + + server2 + } + + async fn check_db(&self) -> Result<(), Error> { + for queue in self.cache.keys().await { + 'worker_loop: while let Some(worker) = self.cache.pop(queue.clone()).await { + if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { + if let Err(job) = worker.process_job(job).await { + error!("Worker has hung up"); + self.storage.return_job(job.unexecuted()).await? + } + } else { + self.cache.push(queue.clone(), worker).await; + break 'worker_loop; + } + } } + + Ok(()) } pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { @@ -100,6 +133,12 @@ impl ServerCache { } } + async fn keys(&self) -> Vec { + let cache = self.cache.lock().await; + + cache.keys().cloned().collect() + } + async fn push(&self, queue: String, worker: Box) { let mut cache = self.cache.lock().await;