From 092f36d4aabcc1b1ba4d8df3a2a7bde9c9bfd369 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 1 Jun 2019 10:58:25 -0500 Subject: [PATCH] Add start_in_arbiter for workers --- jobs-actix/src/lib.rs | 75 +++++++++++++++++++++++++++++++++++++--- jobs-actix/src/worker.rs | 11 ++---- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index a5d3972..fcc9835 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -1,6 +1,6 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use actix::{Actor, Addr, SyncArbiter}; +use actix::{Actor, Addr, Arbiter, SyncArbiter}; use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; use failure::Error; use futures::Future; @@ -74,6 +74,12 @@ where } } +/// Worker Configuration +/// +/// This type is used for configuring and creating workers to process jobs. Before starting the +/// workers, register `Processor` types with this struct. This worker registration allows for +/// different worker processes to handle different sets of workers. +#[derive(Clone)] pub struct WorkerConfig where State: Clone + 'static, @@ -86,16 +92,26 @@ impl WorkerConfig where State: Clone + 'static, { + /// Create a new WorkerConfig + /// + /// The supplied function should return the State required by the jobs intended to be + /// processed. The function must be sharable between threads, but the state itself does not + /// have this requirement. pub fn new(state_fn: impl Fn() -> State + Send + Sync + 'static) -> Self { WorkerConfig { - processors: ProcessorMap::new(Box::new(state_fn)), + processors: ProcessorMap::new(Arc::new(state_fn)), queues: BTreeMap::new(), } } + /// Register a `Processor` with the worker + /// + /// This enables the worker to handle jobs associated with this processor. If a processor is + /// not registered, none of it's jobs will be run, even if another processor handling the same + /// job queue is registered. pub fn register(mut self, processor: P) -> Self where - P: Processor + Send + 'static, + P: Processor + Send + Sync + 'static, J: Job, { self.queues.insert(P::QUEUE.to_owned(), 4); @@ -103,13 +119,20 @@ where self } + /// Set the number of workers to run for a given queue + /// + /// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto + /// will handle processing all workers, regardless of how many are configured. + /// + /// By default, 4 workers are spawned pub fn set_processor_count(mut self, queue: &str, count: u64) -> Self { self.queues.insert(queue.to_owned(), count); self } + /// Start the workers in the current arbiter pub fn start(self, queue_handle: QueueHandle) { - let processors = Arc::new(self.processors); + let processors = self.processors.clone(); self.queues.into_iter().fold(0, |acc, (key, count)| { (0..count).for_each(|i| { @@ -125,14 +148,44 @@ where acc + count }); } + + /// Start the workers in the provided arbiter + pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) { + let processors = self.processors.clone(); + self.queues.into_iter().fold(0, |acc, (key, count)| { + (0..count).for_each(|i| { + let processors = processors.clone(); + let queue_handle = queue_handle.clone(); + let key = key.clone(); + LocalWorker::start_in_arbiter(arbiter, move |_| { + LocalWorker::new( + acc + i + 1000, + key.clone(), + processors.clone(), + queue_handle.inner.clone(), + ) + }); + }); + + acc + count + }); + } } +/// A handle to the job server, used for queuing new jobs +/// +/// `QueueHandle` should be stored in your application's state in order to allow all parts of your +/// application to spawn jobs. #[derive(Clone)] pub struct QueueHandle { inner: Addr, } impl QueueHandle { + /// Queues a job for execution + /// + /// This job will be sent to the server for storage, and will execute whenever a worker for the + /// job's queue is free to do so. pub fn queue(&self, job: J) -> Result<(), Error> where J: Job, @@ -141,6 +194,18 @@ impl QueueHandle { Ok(()) } + /// Queues a job for recurring execution + /// + /// This job will be added to it's queue on the server once every `Duration`. It will be + /// processed whenever workers are free to do so. + pub fn every(&self, duration: Duration, job: J) + where + J: Job + Clone + 'static, + { + Every::new(self.clone(), duration, job).start(); + } + + /// Return an overview of the processor's statistics pub fn get_stats(&self) -> Box + Send> { Box::new(self.inner.send(GetStats).then(coerce)) } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index afcbae3..d106890 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use actix::{ dev::ToEnvelope, fut::{wrap_future, ActorFuture}, @@ -54,7 +52,7 @@ where { id: u64, queue: String, - processors: Arc>, + processors: ProcessorMap, server: Addr, } @@ -64,12 +62,7 @@ where S::Context: ToEnvelope + ToEnvelope, State: Clone + 'static, { - pub fn new( - id: u64, - queue: String, - processors: Arc>, - server: Addr, - ) -> Self { + pub fn new(id: u64, queue: String, processors: ProcessorMap, server: Addr) -> Self { LocalWorker { id, queue,