diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs index 7d1fa89..f5bbdfe 100644 --- a/examples/basic-example/src/main.rs +++ b/examples/basic-example/src/main.rs @@ -35,10 +35,11 @@ async fn main() -> Result<(), Error> { let arbiter = Arbiter::new(); // Configure and start our workers - let queue_handle = WorkerConfig::new(move || MyState::new("My App")) - .register::() - .set_worker_count(DEFAULT_QUEUE, 16) - .start_in_arbiter(&arbiter.handle(), storage); + let queue_handle = + WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App")) + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start(); // Queue our jobs queue_handle.queue(MyJob::new(1, 2)).await?; diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index 5c90707..aae5588 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -36,11 +36,11 @@ async fn main() -> Result<(), Error> { let storage = Storage::new(db)?; // Configure and start our workers - let manager = WorkerConfig::new(move || MyState::new("My App")) + let manager = WorkerConfig::new_managed(storage, move |_| MyState::new("My App")) .register::() .register::() .set_worker_count(DEFAULT_QUEUE, 16) - .managed(storage); + .start(); // Queue our jobs manager.queue(MyJob::new(1, 2)).await?; diff --git a/examples/panic-example/src/main.rs b/examples/panic-example/src/main.rs index 9c62572..ef70a28 100644 --- a/examples/panic-example/src/main.rs +++ b/examples/panic-example/src/main.rs @@ -38,11 +38,12 @@ async fn main() -> Result<(), Error> { let arbiter = Arbiter::new(); // Configure and start our workers - let queue_handle = WorkerConfig::new(move || MyState::new("My App")) - .register::() - .register::() - .set_worker_count(DEFAULT_QUEUE, 16) - .start_in_arbiter(&arbiter.handle(), storage); + let queue_handle = + WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App")) + .register::() + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start(); // Queue some panicking job for _ in 0..32 { diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index e44716b..5b45d3b 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -37,10 +37,10 @@ //! let storage = Storage::new(); //! //! // Configure and start our workers -//! let queue_handle = WorkerConfig::new(move || MyState::new("My App")) +//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App")) //! .register::() //! .set_worker_count(DEFAULT_QUEUE, 16) -//! .start(storage); +//! .start(); //! //! // Queue our jobs //! queue_handle.queue(MyJob::new(1, 2))?; @@ -117,7 +117,7 @@ use actix_rt::{Arbiter, ArbiterHandle}; use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use chrono::{DateTime, Utc}; -use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, marker::PhantomData, ops::Deref, sync::Arc, time::Duration}; mod every; mod server; @@ -148,36 +148,29 @@ impl Manager { /// /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it /// spins up another one and spawns the workers again - fn new(storage: S, worker_config: WorkerConfig) -> Self + fn new(worker_config: WorkerConfig) -> Self where - S: Storage + Sync + 'static, State: Clone, { let arbiter = Arbiter::new(); let worker_arbiter = Arbiter::new(); let notifier = Arc::new(tokio::sync::Notify::new()); - let queue_handle = create_server_managed(storage); + let queue_handle = worker_config.queue_handle.clone(); let drop_notifier = DropNotifier::new(Arc::clone(¬ifier)); - let queue_handle_2 = queue_handle.clone(); arbiter.spawn(async move { - let queue_handle = queue_handle_2; - let mut drop_notifier = drop_notifier; let mut arbiter = ArbiterDropper { arbiter: Some(worker_arbiter), }; loop { - queue_handle + worker_config + .queue_handle .inner .ticker(arbiter.handle(), drop_notifier.clone()); - worker_config.start_managed( - &arbiter.handle(), - queue_handle.clone(), - &drop_notifier, - ); + worker_config.start_managed(&arbiter.handle(), &drop_notifier); notifier.notified().await; // drop_notifier needs to live at least as long as notifier.notified().await @@ -284,36 +277,109 @@ where } } +/// Marker type for Unmanaged workers +pub struct Unmanaged; +/// Marker type for Managed workers +pub struct Managed; + /// Worker Configuration /// /// This type is used for configuring and creating workers to process jobs. Before starting the /// workers, register `Job` types with this struct. This worker registration allows for /// different worker processes to handle different sets of workers. #[derive(Clone)] -pub struct WorkerConfig +pub struct WorkerConfig where State: Clone + 'static, { processors: ProcessorMap, queues: BTreeMap, + arbiter: Option, + queue_handle: QueueHandle, + managed: PhantomData, } -impl WorkerConfig +impl WorkerConfig where State: Clone + 'static, { - /// Create a new WorkerConfig + /// Create a new managed WorkerConfig /// /// The supplied function should return the State required by the jobs intended to be /// processed. The function must be sharable between threads, but the state itself does not /// have this requirement. - pub fn new(state_fn: impl Fn() -> State + Send + Sync + 'static) -> Self { + pub fn new_managed( + storage: S, + state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static, + ) -> Self { + let queue_handle = create_server_managed(storage); + let q2 = queue_handle.clone(); + WorkerConfig { - processors: ProcessorMap::new(Arc::new(state_fn)), + processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))), queues: BTreeMap::new(), + arbiter: None, + queue_handle, + managed: PhantomData, } } + /// Start the workers on a managed arbiter, and return the manager struct + pub fn start(self) -> Manager { + Manager::new(self) + } +} + +impl WorkerConfig +where + State: Clone + 'static, +{ + /// Create a new WorkerConfig in the current arbiter + /// + /// The supplied function should return the State required by the jobs intended to be + /// processed. The function must be sharable between threads, but the state itself does not + /// have this requirement. + pub fn new( + storage: S, + state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static, + ) -> Self { + Self::new_in_arbiter(Arbiter::current(), storage, state_fn) + } + + /// Create a new WorkerConfig in the provided arbiter + /// + /// The supplied function should return the State required by the jobs intended to be + /// processed. The function must be sharable between threads, but the state itself does not + /// have this requirement. + pub fn new_in_arbiter( + arbiter: ArbiterHandle, + storage: S, + state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static, + ) -> Self { + let queue_handle = create_server_in_arbiter(arbiter.clone(), storage); + let q2 = queue_handle.clone(); + + WorkerConfig { + processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))), + queues: BTreeMap::new(), + arbiter: Some(arbiter), + queue_handle, + managed: PhantomData, + } + } + + /// Start the workers in the provided arbiter + pub fn start(self) -> QueueHandle { + self.start_managed(self.arbiter.as_ref().unwrap(), &()); + + self.queue_handle + } +} + +impl WorkerConfig +where + State: Clone + 'static, +{ /// Register a `Job` with the worker /// /// This enables the worker to handle jobs associated with this processor. If a processor is @@ -339,41 +405,17 @@ where self } - /// Start the workers in the current arbiter - /// - /// This method will panic if not called from an actix runtime - pub fn start(self, storage: S) -> QueueHandle { - self.start_in_arbiter(&Arbiter::current(), storage) - } - - /// Start the workers in the provided arbiter - pub fn start_in_arbiter( - self, - arbiter: &ArbiterHandle, - storage: S, - ) -> QueueHandle { - let queue_handle = create_server_in_arbiter(arbiter.clone(), storage); - self.start_managed(arbiter, queue_handle.clone(), &()); - queue_handle - } - - /// Start the workers on a managed arbiter, and return the manager struct - pub fn managed(self, storage: S) -> Manager { - Manager::new(storage, self) - } - /// Start a workers in a managed way fn start_managed( &self, arbiter: &ArbiterHandle, - queue_handle: QueueHandle, extras: &Extras, ) { for (key, count) in self.queues.iter() { for _ in 0..*count { let queue = key.clone(); let processors = self.processors.clone(); - let server = queue_handle.inner.clone(); + let server = self.queue_handle.inner.clone(); let extras_2 = extras.clone();