diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index c51c875..8c6ea0b 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,5 +1,6 @@ +use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{create_server, ActixJob as Job, MaxRetries, WorkerConfig}; +use background_jobs::{create_server_in_arbiter, ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; use chrono::{Duration, Utc}; use std::future::{ready, Ready}; @@ -34,15 +35,17 @@ async fn main() -> Result<(), Error> { let db = sled::Config::new().temporary(true).open()?; let storage = Storage::new(db)?; + let arbiter = Arbiter::new(); + // Start the application server. This guards access to to the jobs store - let queue_handle = create_server(storage); + let queue_handle = create_server_in_arbiter(storage, arbiter.handle()); // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) .register::() .register::() .set_worker_count(DEFAULT_QUEUE, 16) - .start(queue_handle.clone()); + .start_in_arbiter(&arbiter.handle(), queue_handle.clone()); // Queue some panicking job for _ in 0..32 { @@ -59,6 +62,10 @@ async fn main() -> Result<(), Error> { // Block on Actix actix_rt::signal::ctrl_c().await?; + + arbiter.stop(); + let _ = arbiter.join(); + Ok(()) } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 13421b0..999bba9 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -142,7 +142,7 @@ pub fn create_server(storage: S) -> QueueHandle where S: Storage + Sync + 'static, { - create_server_in_arbiter(storage, &Arbiter::current()) + create_server_in_arbiter(storage, Arbiter::current()) } /// Create a new Server @@ -152,14 +152,14 @@ where /// primitives, the Server has become an object that gets shared between client threads. /// /// This method will panic if not called from an actix runtime -pub fn create_server_in_arbiter(storage: S, arbiter: &ArbiterHandle) -> QueueHandle +pub fn create_server_in_arbiter(storage: S, arbiter: ArbiterHandle) -> QueueHandle where S: Storage + Sync + 'static, { let tokio_rt = tokio::runtime::Handle::current(); QueueHandle { - inner: Server::new(&arbiter, storage), + inner: Server::new(arbiter, storage), tokio_rt, } } diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index fe9c687..fd287f6 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -14,7 +14,7 @@ use std::{ sync::Arc, time::Duration, }; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; type WorkerQueue = VecDeque>; @@ -23,6 +23,34 @@ pub(crate) struct ServerCache { cache: Arc>>, } +struct Ticker { + server: Server, +} + +impl Drop for Ticker { + fn drop(&mut self) { + let online = self.server.arbiter.spawn(async move {}); + + if online { + let server = self.server.clone(); + + self.server.arbiter.spawn(async move { + let _ticker = server.ticker(); + let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); + + loop { + interval.tick().await; + if let Err(e) = server.check_db().await { + error!("Error while checking database for new jobs, {}", e); + } + } + }); + } else { + warn!("Not restarting ticker, arbiter is dead"); + } + } +} + /// The server Actor /// /// This server guards access to Thee storage, and keeps a list of workers that are waiting for @@ -31,32 +59,29 @@ pub(crate) struct ServerCache { pub(crate) struct Server { storage: Arc, cache: ServerCache, + arbiter: ArbiterHandle, } impl Server { + fn ticker(&self) -> Ticker { + Ticker { + server: self.clone(), + } + } /// Create a new Server from a compatible storage implementation - pub(crate) fn new(arbiter: &ArbiterHandle, storage: S) -> Self + pub(crate) fn new(arbiter: ArbiterHandle, storage: S) -> Self where S: Storage + Sync + 'static, { let server = Server { storage: Arc::new(StorageWrapper(storage)), cache: ServerCache::new(), + arbiter, }; - let server2 = server.clone(); - arbiter.spawn(async move { - let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); + drop(server.ticker()); - loop { - interval.tick().await; - if let Err(e) = server.check_db().await { - error!("Error while checking database for new jobs, {}", e); - } - } - }); - - server2 + server } async fn check_db(&self) -> Result<(), Error> { diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 039a2eb..c2767db 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -56,6 +56,28 @@ impl Worker for LocalWorkerHandle { } } +struct LocalWorkerStarter { + queue: String, + processors: CachedProcessorMap, + server: Server, +} + +impl Drop for LocalWorkerStarter { + fn drop(&mut self) { + let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {})); + + if let Ok(true) = res { + actix_rt::spawn(local_worker( + self.queue.clone(), + self.processors.clone(), + self.server.clone(), + )); + } else { + warn!("Not restarting worker, Arbiter is dead"); + } + } +} + struct LogOnDrop(F) where F: Fn() -> Span; @@ -76,6 +98,11 @@ pub(crate) async fn local_worker( ) where State: Clone + 'static, { + let starter = LocalWorkerStarter { + queue: queue.clone(), + processors: processors.clone(), + server: server.clone(), + }; let id = Uuid::new_v4(); let (tx, mut rx) = channel(16); @@ -121,4 +148,5 @@ pub(crate) async fn local_worker( } drop(log_on_drop); + drop(starter); } diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 841effd..45a31f6 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -30,6 +30,7 @@ pub struct ProcessorMap { /// /// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an /// application before workers are spawned in order to handle queued jobs. +#[derive(Clone)] pub struct CachedProcessorMap { inner: HashMap>, state: S, diff --git a/src/lib.rs b/src/lib.rs index be302c1..b79629a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,4 +172,6 @@ pub mod dev { } #[cfg(feature = "background-jobs-actix")] -pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig}; +pub use background_jobs_actix::{ + create_server, create_server_in_arbiter, ActixJob, QueueHandle, WorkerConfig, +};