diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 3a1afd5..8792811 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -28,7 +28,7 @@ impl ServerConfig { pub fn start(self) -> QueueHandle where - S: Clone + Send + 'static, + S: Clone + 'static, { let ServerConfig { server_id, db_path } = self; @@ -44,7 +44,7 @@ impl ServerConfig { pub struct WorkerConfig where - S: Clone + Send + 'static, + S: Clone + 'static, { processors: ProcessorMap, queues: BTreeMap, @@ -52,11 +52,11 @@ where impl WorkerConfig where - S: Clone + Send + 'static, + S: Clone + 'static, { - pub fn new(state: S) -> Self { + pub fn new(state_fn: impl Fn() -> S + Send + Sync + 'static) -> Self { WorkerConfig { - processors: ProcessorMap::new(state), + processors: ProcessorMap::new(Box::new(state_fn)), queues: BTreeMap::new(), } } @@ -95,14 +95,14 @@ where #[derive(Clone)] pub struct QueueHandle where - S: Clone + Send + 'static, + S: Clone + 'static, { inner: Addr>>, } impl QueueHandle where - S: Clone + Send + 'static, + S: Clone + 'static, { pub fn queue

(&self, job: P::Job) -> Result<(), Error> where diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index c00a91e..6a22a80 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -25,7 +25,7 @@ impl Message for ProcessJob { pub struct LocalWorker where - State: Clone + Send + 'static, + State: Clone + 'static, { id: usize, queue: String, @@ -35,7 +35,7 @@ where impl LocalWorker where - State: Clone + Send + 'static, + State: Clone + 'static, { pub fn new( id: usize, @@ -54,7 +54,7 @@ where impl Actor for LocalWorker where - State: Clone + Send + 'static, + State: Clone + 'static, { type Context = Context; @@ -66,7 +66,7 @@ where impl Handler for LocalWorker where - State: Clone + Send + 'static, + State: Clone + 'static, { type Result = (); diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 8c243a2..a4c67dc 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -26,7 +26,7 @@ use crate::{Backoff, MaxRetries}; /// The Job trait defines parameters pertaining to an instance of background job pub trait Job: Serialize + DeserializeOwned where - S: Clone + Send + 'static, + S: Clone + 'static, { /// Users of this library must define what it means to run a job. /// diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs index 2ae98c6..98e9d31 100644 --- a/jobs-core/src/processor.rs +++ b/jobs-core/src/processor.rs @@ -83,7 +83,7 @@ use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; /// ``` pub trait Processor: Clone where - S: Clone + Send + 'static, + S: Clone + 'static, { type Job: Job + 'static; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index e31b4b2..f50a477 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -32,8 +32,11 @@ use crate::{JobError, JobInfo, Processor}; /// directly, the /// [`ProcessorMap`](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.ProcessorMap.html) /// struct stores these `ProcessFn` types that don't expose differences in Job types. -pub type ProcessFn = - Box Box + Send> + Send>; +pub type ProcessFn = + Box Box + Send> + Send>; + + +pub type StateFn = Box S + Send + Sync>; /// A type for storing the relationships between processor names and the processor itself /// @@ -44,23 +47,23 @@ pub struct ProcessorMap where S: Clone, { - inner: HashMap, - state: S, + inner: HashMap>, + state_fn: StateFn, } impl ProcessorMap where - S: Clone + Send + 'static, + S: Clone + 'static, { /// Intialize a `ProcessorMap` /// /// The state passed into this method will be passed to all jobs executed through this /// ProcessorMap. The state argument could be useful for containing a hook into something like /// r2d2, or the address of an actor in an actix-based system. - pub fn new(state: S) -> Self { + pub fn new(state_fn: StateFn) -> Self { ProcessorMap { inner: HashMap::new(), - state, + state_fn, } } @@ -74,11 +77,9 @@ where where P: Processor + Send + 'static, { - let state = self.state.clone(); - self.inner.insert( P::NAME.to_owned(), - Box::new(move |value| processor.process(value, state.clone())), + Box::new(move |value, state| processor.process(value, state)), ); } @@ -90,7 +91,7 @@ where let opt = self .inner .get(job.processor()) - .map(|processor| process(processor, job.clone())); + .map(|processor| process(processor, (self.state_fn)(), job.clone())); if let Some(fut) = opt { Either::A(fut) @@ -101,12 +102,12 @@ where } } -fn process(process_fn: &ProcessFn, mut job: JobInfo) -> impl Future { +fn process(process_fn: &ProcessFn, state: S, mut job: JobInfo) -> impl Future { let args = job.args(); let processor = job.processor().to_owned(); - process_fn(args).then(move |res| match res { + process_fn(args, state).then(move |res| match res { Ok(_) => { info!("Job {} completed, {}", job.id(), processor); job.pass(); diff --git a/src/lib.rs b/src/lib.rs index 7ac2395..0a80e65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -275,8 +275,5 @@ pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Processor, Stat, Stats}; -#[cfg(feature = "background-jobs-server")] -pub use background_jobs_server::{ServerConfig, SpawnerConfig, SyncJob, WorkerConfig}; - #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{QueueHandle, ServerConfig, WorkerConfig};