actix: Move QueueHandle to StateFn argument

This commit is contained in:
Aode (lion) 2021-10-29 19:19:04 -05:00
parent db9af6a3b8
commit 70ab459ae9
4 changed files with 100 additions and 56 deletions

View file

@ -35,10 +35,11 @@ async fn main() -> Result<(), Error> {
let arbiter = Arbiter::new();
// Configure and start our workers
let queue_handle = WorkerConfig::new(move || MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start_in_arbiter(&arbiter.handle(), storage);
let queue_handle =
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start();
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2)).await?;

View file

@ -36,11 +36,11 @@ async fn main() -> Result<(), Error> {
let storage = Storage::new(db)?;
// Configure and start our workers
let manager = WorkerConfig::new(move || MyState::new("My App"))
let manager = WorkerConfig::new_managed(storage, move |_| MyState::new("My App"))
.register::<StopJob>()
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.managed(storage);
.start();
// Queue our jobs
manager.queue(MyJob::new(1, 2)).await?;

View file

@ -38,11 +38,12 @@ async fn main() -> Result<(), Error> {
let arbiter = Arbiter::new();
// Configure and start our workers
let queue_handle = WorkerConfig::new(move || MyState::new("My App"))
.register::<PanickingJob>()
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start_in_arbiter(&arbiter.handle(), storage);
let queue_handle =
WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App"))
.register::<PanickingJob>()
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start();
// Queue some panicking job
for _ in 0..32 {

View file

@ -37,10 +37,10 @@
//! let storage = Storage::new();
//!
//! // Configure and start our workers
//! let queue_handle = WorkerConfig::new(move || MyState::new("My App"))
//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
//! .register::<MyJob>()
//! .set_worker_count(DEFAULT_QUEUE, 16)
//! .start(storage);
//! .start();
//!
//! // Queue our jobs
//! queue_handle.queue(MyJob::new(1, 2))?;
@ -117,7 +117,7 @@ use actix_rt::{Arbiter, ArbiterHandle};
use anyhow::Error;
use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage};
use chrono::{DateTime, Utc};
use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Duration};
use std::{collections::BTreeMap, marker::PhantomData, ops::Deref, sync::Arc, time::Duration};
mod every;
mod server;
@ -148,36 +148,29 @@ impl Manager {
///
/// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it
/// spins up another one and spawns the workers again
fn new<S, State>(storage: S, worker_config: WorkerConfig<State>) -> Self
fn new<State>(worker_config: WorkerConfig<State, Managed>) -> Self
where
S: Storage + Sync + 'static,
State: Clone,
{
let arbiter = Arbiter::new();
let worker_arbiter = Arbiter::new();
let notifier = Arc::new(tokio::sync::Notify::new());
let queue_handle = create_server_managed(storage);
let queue_handle = worker_config.queue_handle.clone();
let drop_notifier = DropNotifier::new(Arc::clone(&notifier));
let queue_handle_2 = queue_handle.clone();
arbiter.spawn(async move {
let queue_handle = queue_handle_2;
let mut drop_notifier = drop_notifier;
let mut arbiter = ArbiterDropper {
arbiter: Some(worker_arbiter),
};
loop {
queue_handle
worker_config
.queue_handle
.inner
.ticker(arbiter.handle(), drop_notifier.clone());
worker_config.start_managed(
&arbiter.handle(),
queue_handle.clone(),
&drop_notifier,
);
worker_config.start_managed(&arbiter.handle(), &drop_notifier);
notifier.notified().await;
// drop_notifier needs to live at least as long as notifier.notified().await
@ -284,36 +277,109 @@ where
}
}
/// Marker type for Unmanaged workers
pub struct Unmanaged;
/// Marker type for Managed workers
pub struct Managed;
/// Worker Configuration
///
/// This type is used for configuring and creating workers to process jobs. Before starting the
/// workers, register `Job` types with this struct. This worker registration allows for
/// different worker processes to handle different sets of workers.
#[derive(Clone)]
pub struct WorkerConfig<State>
pub struct WorkerConfig<State, M>
where
State: Clone + 'static,
{
processors: ProcessorMap<State>,
queues: BTreeMap<String, u64>,
arbiter: Option<ArbiterHandle>,
queue_handle: QueueHandle,
managed: PhantomData<M>,
}
impl<State> WorkerConfig<State>
impl<State> WorkerConfig<State, Managed>
where
State: Clone + 'static,
{
/// Create a new WorkerConfig
/// Create a new managed WorkerConfig
///
/// The supplied function should return the State required by the jobs intended to be
/// processed. The function must be sharable between threads, but the state itself does not
/// have this requirement.
pub fn new(state_fn: impl Fn() -> State + Send + Sync + 'static) -> Self {
pub fn new_managed<S: Storage + Send + Sync + 'static>(
storage: S,
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
) -> Self {
let queue_handle = create_server_managed(storage);
let q2 = queue_handle.clone();
WorkerConfig {
processors: ProcessorMap::new(Arc::new(state_fn)),
processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
queues: BTreeMap::new(),
arbiter: None,
queue_handle,
managed: PhantomData,
}
}
/// Start the workers on a managed arbiter, and return the manager struct
pub fn start(self) -> Manager {
Manager::new(self)
}
}
impl<State> WorkerConfig<State, Unmanaged>
where
State: Clone + 'static,
{
/// Create a new WorkerConfig in the current arbiter
///
/// The supplied function should return the State required by the jobs intended to be
/// processed. The function must be sharable between threads, but the state itself does not
/// have this requirement.
pub fn new<S: Storage + Send + Sync + 'static>(
storage: S,
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
) -> Self {
Self::new_in_arbiter(Arbiter::current(), storage, state_fn)
}
/// Create a new WorkerConfig in the provided arbiter
///
/// The supplied function should return the State required by the jobs intended to be
/// processed. The function must be sharable between threads, but the state itself does not
/// have this requirement.
pub fn new_in_arbiter<S: Storage + Send + Sync + 'static>(
arbiter: ArbiterHandle,
storage: S,
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
) -> Self {
let queue_handle = create_server_in_arbiter(arbiter.clone(), storage);
let q2 = queue_handle.clone();
WorkerConfig {
processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
queues: BTreeMap::new(),
arbiter: Some(arbiter),
queue_handle,
managed: PhantomData,
}
}
/// Start the workers in the provided arbiter
pub fn start(self) -> QueueHandle {
self.start_managed(self.arbiter.as_ref().unwrap(), &());
self.queue_handle
}
}
impl<State, M> WorkerConfig<State, M>
where
State: Clone + 'static,
{
/// Register a `Job` with the worker
///
/// This enables the worker to handle jobs associated with this processor. If a processor is
@ -339,41 +405,17 @@ where
self
}
/// Start the workers in the current arbiter
///
/// This method will panic if not called from an actix runtime
pub fn start<S: Storage + Send + Sync + 'static>(self, storage: S) -> QueueHandle {
self.start_in_arbiter(&Arbiter::current(), storage)
}
/// Start the workers in the provided arbiter
pub fn start_in_arbiter<S: Storage + Send + Sync + 'static>(
self,
arbiter: &ArbiterHandle,
storage: S,
) -> QueueHandle {
let queue_handle = create_server_in_arbiter(arbiter.clone(), storage);
self.start_managed(arbiter, queue_handle.clone(), &());
queue_handle
}
/// Start the workers on a managed arbiter, and return the manager struct
pub fn managed<S: Storage + Send + Sync + 'static>(self, storage: S) -> Manager {
Manager::new(storage, self)
}
/// Start a workers in a managed way
fn start_managed<Extras: Clone + Send + 'static>(
&self,
arbiter: &ArbiterHandle,
queue_handle: QueueHandle,
extras: &Extras,
) {
for (key, count) in self.queues.iter() {
for _ in 0..*count {
let queue = key.clone();
let processors = self.processors.clone();
let server = queue_handle.inner.clone();
let server = self.queue_handle.inner.clone();
let extras_2 = extras.clone();