From afff7783098ad56b7b9d0fb99ded96bcaf05bc94 Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 7 Nov 2018 20:20:30 -0600 Subject: [PATCH] Split core into multiple files --- jobs-actix/src/lib.rs | 2 +- jobs-core/src/job_info.rs | 114 ++++++++++++++++ jobs-core/src/lib.rs | 264 ++---------------------------------- jobs-core/src/processor.rs | 109 +++++++++++++++ jobs-core/src/processors.rs | 70 ++++++++++ jobs-core/src/storage.rs | 2 +- jobs-tokio/src/lib.rs | 2 +- src/lib.rs | 3 +- 8 files changed, 305 insertions(+), 261 deletions(-) create mode 100644 jobs-core/src/job_info.rs create mode 100644 jobs-core/src/processor.rs create mode 100644 jobs-core/src/processors.rs diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 7d74213..8823b78 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -15,7 +15,7 @@ use actix::{ }; use failure::Error; use futures::Future; -use jobs_core::{storage::Storage, JobInfo, Processor, Processors}; +use jobs_core::{JobInfo, Processor, Processors, Storage}; fn coerce(res: Result, F>) -> Result where diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs new file mode 100644 index 0000000..90e9ae4 --- /dev/null +++ b/jobs-core/src/job_info.rs @@ -0,0 +1,114 @@ +use chrono::{offset::Utc, DateTime, Duration as OldDuration}; +use serde_json::Value; + +use crate::{Backoff, JobStatus, MaxRetries, ShouldStop}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct JobInfo { + /// ID of the job, None means an ID has not been set + id: Option, + + /// Name of the processor that should handle this job + processor: String, + + /// Arguments for a given job + args: Value, + + /// Status of the job + status: JobStatus, + + /// Retries left for this job, None means no limit + retry_count: u32, + + /// the initial MaxRetries value, for comparing to the current retry count + max_retries: MaxRetries, + + /// How often retries should be scheduled + backoff_strategy: Backoff, + + /// The time this job was re-queued + next_queue: Option>, +} + +impl JobInfo { + pub(crate) fn new( + processor: String, + args: Value, + max_retries: MaxRetries, + backoff_strategy: Backoff, + ) -> Self { + JobInfo { + id: None, + processor, + status: JobStatus::Pending, + args, + retry_count: 0, + max_retries, + next_queue: None, + backoff_strategy, + } + } + pub(crate) fn processor(&self) -> &str { + &self.processor + } + + pub(crate) fn args(&self) -> Value { + self.args.clone() + } + + pub(crate) fn status(&self) -> JobStatus { + self.status.clone() + } + + pub(crate) fn id(&self) -> Option { + self.id.clone() + } + + pub(crate) fn set_id(&mut self, id: usize) { + if self.id.is_none() { + self.id = Some(id); + } + } + + pub(crate) fn increment(&mut self) -> ShouldStop { + self.retry_count += 1; + self.max_retries.compare(self.retry_count) + } + + pub(crate) fn next_queue(&mut self) { + let now = Utc::now(); + + let next_queue = match self.backoff_strategy { + Backoff::Linear(secs) => now + OldDuration::seconds(secs as i64), + Backoff::Exponential(base) => { + let secs = base.pow(self.retry_count); + now + OldDuration::seconds(secs as i64) + } + }; + + self.next_queue = Some(next_queue); + } + + pub(crate) fn is_ready(&self, now: DateTime) -> bool { + match self.next_queue { + Some(ref time) => now > *time, + None => true, + } + } + + pub(crate) fn is_failed(&self) -> bool { + self.status == JobStatus::Failed + } + + pub(crate) fn pending(&mut self) { + self.status = JobStatus::Pending; + } + + pub(crate) fn fail(&mut self) { + self.status = JobStatus::Failed; + } + + pub(crate) fn pass(&mut self) { + self.status = JobStatus::Finished; + } +} diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index b8c9ad2..961a444 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -5,15 +5,16 @@ extern crate log; #[macro_use] extern crate serde_derive; -use std::collections::HashMap; - -use chrono::{offset::Utc, DateTime, Duration as OldDuration}; use failure::Error; -use futures::future::{Either, Future, IntoFuture}; -use serde::{de::DeserializeOwned, ser::Serialize}; -use serde_json::Value; -pub mod storage; +mod job_info; +mod processor; +mod processors; +mod storage; + +pub use crate::{ + job_info::JobInfo, processor::Processor, processors::Processors, storage::Storage, +}; #[derive(Debug, Fail)] pub enum JobError { @@ -25,110 +26,6 @@ pub enum JobError { MissingProcessor, } -/// The Processor trait -/// -/// Processors define the logic for executing jobs -pub trait Processor: Clone { - type Arguments: Serialize + DeserializeOwned; - - /// The name of the processor - /// - /// This name must be unique!!! It is used to look up which processor should handle a job - fn name() -> &'static str; - - /// Define the default number of retries for a given processor - /// - /// Jobs can override - fn max_retries() -> MaxRetries; - - /// Define the default backoff strategy for a given processor - /// - /// Jobs can override - fn backoff_strategy() -> Backoff; - - /// Defines how jobs for this processor are processed - /// - /// Please do not perform blocking operations in the process method except if put behind - /// tokio's `blocking` abstraction - fn process(&self, args: Self::Arguments) -> Box + Send>; - - /// A provided method to create a new Job from provided arguments - /// - /// ### Example - /// - /// ```rust - /// #[macro_use] - /// extern crate log; - /// - /// use jobs::{Processor, MaxRetries}; - /// use failure::Error; - /// use futures::future::{Future, IntoFuture}; - /// - /// struct MyProcessor; - /// - /// impl Processor for MyProcessor { - /// type Arguments = i32; - /// - /// fn name() -> &'static str { - /// "IncrementProcessor" - /// } - /// - /// fn max_retries() -> MaxRetries { - /// MaxRetries::Count(1) - /// } - /// - /// fn backoff_strategy() -> Backoff { - /// Backoff::Exponential(2) - /// } - /// - /// fn process( - /// &self, - /// args: Self::Arguments, - /// ) -> Box + Send> { - /// info!("Processing {}", args); - /// - /// Box::new(Ok(()).into_future()) - /// } - /// } - /// - /// fn main() -> Result<(), Error> { - /// let job = MyProcessor::new_job(1234, None)?; - /// - /// Ok(()) - /// } - /// ``` - fn new_job( - args: Self::Arguments, - max_retries: Option, - backoff_strategy: Option, - ) -> Result { - let job = JobInfo { - id: None, - processor: Self::name().to_owned(), - status: JobStatus::Pending, - args: serde_json::to_value(args)?, - retry_count: 0, - max_retries: max_retries.unwrap_or(Self::max_retries()), - next_queue: None, - backoff_strategy: backoff_strategy.unwrap_or(Self::backoff_strategy()), - }; - - Ok(job) - } - - /// A provided method to coerce arguments into the expected type - fn do_processing(&self, args: Value) -> Box + Send> { - let res = serde_json::from_value::(args); - - let fut = match res { - Ok(item) => Either::A(self.process(item).map_err(JobError::Processing)), - Err(_) => Either::B(Err(JobError::Json).into_future()), - }; - - Box::new(fut) - } -} - /// Set the status of a job when storing it #[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] pub enum JobStatus { @@ -189,148 +86,3 @@ impl ShouldStop { *self == ShouldStop::Requeue } } - -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -pub struct JobInfo { - /// ID of the job, None means an ID has not been set - id: Option, - - /// Name of the processor that should handle this job - processor: String, - - /// Arguments for a given job - args: Value, - - /// Status of the job - status: JobStatus, - - /// Retries left for this job, None means no limit - retry_count: u32, - - /// the initial MaxRetries value, for comparing to the current retry count - max_retries: MaxRetries, - - /// How often retries should be scheduled - backoff_strategy: Backoff, - - /// The time this job was re-queued - next_queue: Option>, -} - -impl JobInfo { - fn id(&self) -> Option { - self.id.clone() - } - - fn set_id(&mut self, id: usize) { - if self.id.is_none() { - self.id = Some(id); - } - } - - fn increment(&mut self) -> ShouldStop { - self.retry_count += 1; - self.max_retries.compare(self.retry_count) - } - - fn next_queue(&mut self) { - let now = Utc::now(); - - let next_queue = match self.backoff_strategy { - Backoff::Linear(secs) => now + OldDuration::seconds(secs as i64), - Backoff::Exponential(base) => { - let secs = base.pow(self.retry_count); - now + OldDuration::seconds(secs as i64) - } - }; - - self.next_queue = Some(next_queue); - } - - fn is_ready(&self, now: DateTime) -> bool { - match self.next_queue { - Some(ref time) => now > *time, - None => true, - } - } - - fn is_failed(&self) -> bool { - self.status == JobStatus::Failed - } - - fn pending(&mut self) { - self.status = JobStatus::Pending; - } - - fn fail(&mut self) { - self.status = JobStatus::Failed; - } - - fn pass(&mut self) { - self.status = JobStatus::Finished; - } -} - -pub type ProcessFn = - Box Box + Send> + Send>; - -pub struct Processors { - inner: HashMap, -} - -impl Processors { - pub fn new() -> Self { - Default::default() - } - - pub fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + Sync + 'static, - { - self.inner.insert( - P::name().to_owned(), - Box::new(move |value| processor.do_processing(value)), - ); - } - - pub fn process_job(&self, job: JobInfo) -> impl Future { - let opt = self - .inner - .get(&job.processor) - .map(|processor| process(processor, job.clone())); - - if let Some(fut) = opt { - Either::A(fut) - } else { - error!("Processor {} not present", job.processor); - Either::B(Ok(job).into_future()) - } - } -} - -impl Default for Processors { - fn default() -> Self { - Processors { - inner: Default::default(), - } - } -} - -fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { - let args = job.args.clone(); - - let processor = job.processor.clone(); - - process_fn(args).then(move |res| match res { - Ok(_) => { - info!("Job completed, {}", processor); - job.pass(); - Ok(job) - } - Err(e) => { - error!("Job errored, {}, {}", processor, e); - job.fail(); - Ok(job) - } - }) -} diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs new file mode 100644 index 0000000..b92045d --- /dev/null +++ b/jobs-core/src/processor.rs @@ -0,0 +1,109 @@ +use failure::Error; +use futures::{ + future::{Either, IntoFuture}, + Future, +}; +use serde::{de::DeserializeOwned, ser::Serialize}; +use serde_json::Value; + +use crate::{Backoff, JobError, JobInfo, MaxRetries}; + +/// The Processor trait +/// +/// Processors define the logic for executing jobs +pub trait Processor: Clone { + type Arguments: Serialize + DeserializeOwned; + + /// The name of the processor + /// + /// This name must be unique!!! It is used to look up which processor should handle a job + fn name() -> &'static str; + + /// Define the default number of retries for a given processor + /// + /// Jobs can override + fn max_retries() -> MaxRetries; + + /// Define the default backoff strategy for a given processor + /// + /// Jobs can override + fn backoff_strategy() -> Backoff; + + /// Defines how jobs for this processor are processed + /// + /// Please do not perform blocking operations in the process method except if put behind + /// tokio's `blocking` abstraction + fn process(&self, args: Self::Arguments) -> Box + Send>; + + /// A provided method to create a new Job from provided arguments + /// + /// ### Example + /// + /// ```rust + /// #[macro_use] + /// extern crate log; + /// + /// use jobs::{Processor, MaxRetries}; + /// use failure::Error; + /// use futures::future::{Future, IntoFuture}; + /// + /// struct MyProcessor; + /// + /// impl Processor for MyProcessor { + /// type Arguments = i32; + /// + /// fn name() -> &'static str { + /// "IncrementProcessor" + /// } + /// + /// fn max_retries() -> MaxRetries { + /// MaxRetries::Count(1) + /// } + /// + /// fn backoff_strategy() -> Backoff { + /// Backoff::Exponential(2) + /// } + /// + /// fn process( + /// &self, + /// args: Self::Arguments, + /// ) -> Box + Send> { + /// info!("Processing {}", args); + /// + /// Box::new(Ok(()).into_future()) + /// } + /// } + /// + /// fn main() -> Result<(), Error> { + /// let job = MyProcessor::new_job(1234, None)?; + /// + /// Ok(()) + /// } + /// ``` + fn new_job( + args: Self::Arguments, + max_retries: Option, + backoff_strategy: Option, + ) -> Result { + let job = JobInfo::new( + Self::name().to_owned(), + serde_json::to_value(args)?, + max_retries.unwrap_or(Self::max_retries()), + backoff_strategy.unwrap_or(Self::backoff_strategy()), + ); + + Ok(job) + } + + /// A provided method to coerce arguments into the expected type + fn do_processing(&self, args: Value) -> Box + Send> { + let res = serde_json::from_value::(args); + + let fut = match res { + Ok(item) => Either::A(self.process(item).map_err(JobError::Processing)), + Err(_) => Either::B(Err(JobError::Json).into_future()), + }; + + Box::new(fut) + } +} diff --git a/jobs-core/src/processors.rs b/jobs-core/src/processors.rs new file mode 100644 index 0000000..0c3f1ef --- /dev/null +++ b/jobs-core/src/processors.rs @@ -0,0 +1,70 @@ +use std::collections::HashMap; + +use futures::future::{Either, Future, IntoFuture}; +use serde_json::Value; + +use crate::{JobError, JobInfo, Processor}; + +pub type ProcessFn = + Box Box + Send> + Send>; + +pub struct Processors { + inner: HashMap, +} + +impl Processors { + pub fn new() -> Self { + Default::default() + } + + pub fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + self.inner.insert( + P::name().to_owned(), + Box::new(move |value| processor.do_processing(value)), + ); + } + + pub fn process_job(&self, job: JobInfo) -> impl Future { + let opt = self + .inner + .get(job.processor()) + .map(|processor| process(processor, job.clone())); + + if let Some(fut) = opt { + Either::A(fut) + } else { + error!("Processor {} not present", job.processor()); + Either::B(Ok(job).into_future()) + } + } +} + +impl Default for Processors { + fn default() -> Self { + Processors { + inner: Default::default(), + } + } +} + +fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { + let args = job.args(); + + let processor = job.processor().to_owned(); + + process_fn(args).then(move |res| match res { + Ok(_) => { + info!("Job completed, {}", processor); + job.pass(); + Ok(job) + } + Err(e) => { + error!("Job errored, {}, {}", processor, e); + job.fail(); + Ok(job) + } + }) +} diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 7b0d970..e7861b1 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -176,7 +176,7 @@ impl Storage { } } - let status = job.status.clone(); + let status = job.status(); let job_value = Json::to_value_buf(job)?; trace!("Storing job"); diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index e00e4ed..154e607 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -11,7 +11,7 @@ use futures::{ sync::mpsc::{channel, Receiver, SendError, Sender}, Future, Sink, Stream, }; -use jobs_core::{storage::Storage, JobInfo, Processor, Processors}; +use jobs_core::{JobInfo, Processor, Processors, Storage}; use tokio::timer::Interval; use tokio_threadpool::blocking; diff --git a/src/lib.rs b/src/lib.rs index 977ffdb..87aba3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ pub use jobs_core::{ - storage::Storage, Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, - ShouldStop, + Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, ShouldStop, Storage, }; #[cfg(feature = "jobs-tokio")]