From 6e79341b38288aea976edf4ea47f8ffa5f2591c4 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 13 Dec 2018 11:08:28 -0600 Subject: [PATCH] Add scheduled jobs, fix spawning non-default jobs --- examples/server-jobs-example/src/bin/spawner.rs | 2 +- examples/server-jobs-example/src/bin/sync_spawner.rs | 2 +- jobs-core/Cargo.toml | 2 +- jobs-core/src/job_info.rs | 4 ++++ jobs-core/src/processor.rs | 9 +++++++++ jobs-server/Cargo.toml | 2 +- jobs-server/src/spawner.rs | 10 ++++++---- 7 files changed, 23 insertions(+), 8 deletions(-) diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs index 24dbb90..b4126c9 100644 --- a/examples/server-jobs-example/src/bin/spawner.rs +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -35,7 +35,7 @@ fn main() { tokio::run(lazy(move || { for job in jobs { - tokio::spawn(spawner.queue::(job).map_err(|_| ())); + tokio::spawn(spawner.queue::(job).map_err(|_| ())); } Ok(()) diff --git a/examples/server-jobs-example/src/bin/sync_spawner.rs b/examples/server-jobs-example/src/bin/sync_spawner.rs index 1860735..b5682cb 100644 --- a/examples/server-jobs-example/src/bin/sync_spawner.rs +++ b/examples/server-jobs-example/src/bin/sync_spawner.rs @@ -34,7 +34,7 @@ fn main() -> Result<(), Error> { let spawner = SpawnerConfig::new("localhost", 5555); for job in jobs { - spawner.queue_sync::(job)?; + spawner.queue_sync::(job)?; } Ok(()) diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 36c8ed4..138d115 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor on tokio" -version = "0.3.1" +version = "0.3.2" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 82519eb..a2567dd 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -128,6 +128,10 @@ impl JobInfo { self.next_queue = Some(next_queue); } + pub(crate) fn schedule(&mut self, time: DateTime) { + self.next_queue = Some(time); + } + pub(crate) fn is_stale(&self) -> bool { self.updated_at < Utc::now() - OldDuration::days(1) } diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 6c52a36..3bfa831 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -17,6 +17,7 @@ * along with Background Jobs. If not, see . */ +use chrono::{offset::Utc, DateTime}; use failure::Error; use futures::{ future::{Either, IntoFuture}, @@ -127,6 +128,14 @@ where Ok(job) } + /// Create a JobInfo to schedule a job to be performed after a certain time + fn new_scheduled_job(job: Self::Job, after: DateTime) -> Result { + let mut job = Self::new_job(job)?; + job.schedule(after); + + Ok(job) + } + /// A provided method to coerce arguments into the expected type and run the job /// /// Advanced users may want to override this method in order to provide their own custom diff --git a/jobs-server/Cargo.toml b/jobs-server/Cargo.toml index 51a81d0..6dd8af4 100644 --- a/jobs-server/Cargo.toml +++ b/jobs-server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-server" description = "Jobs processor server based on ZeroMQ" -version = "0.3.0" +version = "0.3.1" license = "GPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-server/src/spawner.rs b/jobs-server/src/spawner.rs index 0ddbefe..7419b1e 100644 --- a/jobs-server/src/spawner.rs +++ b/jobs-server/src/spawner.rs @@ -74,9 +74,10 @@ impl SpawnerConfig { } /// Queue a job to be executed in the background - pub fn queue

(&self, job: P::Job) -> impl Future + pub fn queue(&self, job: P::Job) -> impl Future where - P: Processor, + P: Processor, + S: Clone + Send + Sync + 'static, { let msg = P::new_job(job) .map_err(Error::from) @@ -105,9 +106,10 @@ impl SpawnerConfig { /// sending the message to the jobs server. /// /// If you have a tokio-based application, you should use `queue` instead. - pub fn queue_sync

(&self, job: P::Job) -> Result<(), Error> + pub fn queue_sync(&self, job: P::Job) -> Result<(), Error> where - P: Processor, + P: Processor, + S: Clone + Send + Sync + 'static, { use zmq::PUSH;