diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index 03755a6..ac04969 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -50,6 +50,8 @@ //! //! // tokio::signal::ctrl_c().await?; //! +//! drop(queue_handle); +//! //! Ok(()) //! } //! @@ -118,10 +120,11 @@ use background_jobs_core::{ memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait, }; use std::{ - collections::BTreeMap, - sync::Arc, + collections::{BTreeMap, HashMap}, + sync::{Arc, Mutex}, time::{Duration, SystemTime}, }; +use tokio::task::{JoinHandle, JoinSet}; mod every; mod spawn; @@ -151,6 +154,7 @@ where { QueueHandle { inner: Storage::new(storage), + manager_handle: Some(Arc::new(Mutex::new(None))), } } @@ -218,23 +222,80 @@ where } /// Start the workers in the provided arbiter - pub fn start(self) -> QueueHandle { - for (key, count) in self.queues.iter() { + pub fn start(self) -> std::io::Result { + let Self { + processors, + queues, + queue_handle, + } = self; + + let mut sets = HashMap::new(); + + for (key, count) in queues.iter() { + let mut set = JoinSet::new(); + for _ in 0..*count { let queue = key.clone(); - let processors = self.processors.clone(); - let server = self.queue_handle.inner.clone(); + let processors = processors.clone(); + let server = queue_handle.inner.clone(); - if let Err(e) = spawn::spawn( + spawn::spawn_in( + &mut set, "local-worker", worker::local_worker(queue, processors.clone(), server), - ) { - tracing::error!("Failed to spawn worker {e}"); - } + )?; } + + sets.insert(key.clone(), set); } - self.queue_handle + let server = queue_handle.inner.clone(); + + let manager_task = crate::spawn::spawn("set-supervisor", async move { + let mut superset = JoinSet::new(); + + for (queue, mut set) in sets { + let server = server.clone(); + let processors = processors.clone(); + + if let Err(e) = spawn::spawn_in(&mut superset, "worker-supervisor", async move { + while let Some(_) = set.join_next().await { + metrics::counter!("background-jobs.tokio.worker.finished", "queue" => queue.clone()) + .increment(1); + + tracing::warn!("worker closed, spawning another"); + + if let Err(e) = spawn::spawn_in( + &mut set, + "local-worker", + worker::local_worker(queue.clone(), processors.clone(), server.clone()), + ) { + tracing::warn!("Failed to respawn worker: {e}"); + break; + } + metrics::counter!("background-jobs.tokio.worker.restart").increment(1); + } + }) { + tracing::warn!("Failed to spawn worker supervisor: {e}"); + break; + } + } + + let mut count = 0; + while superset.join_next().await.is_some() { + count += 1; + tracing::info!("Joined worker-supervisor {count}"); + } + })?; + + *queue_handle + .manager_handle + .as_ref() + .unwrap() + .lock() + .unwrap() = Some(manager_task); + + Ok(queue_handle) } } @@ -245,6 +306,7 @@ where #[derive(Clone)] pub struct QueueHandle { inner: Storage, + manager_handle: Option>>>>, } impl QueueHandle { @@ -285,3 +347,17 @@ impl QueueHandle { spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ()) } } + +impl Drop for QueueHandle { + fn drop(&mut self) { + if let Some(handle) = self + .manager_handle + .take() + .and_then(Arc::into_inner) + .and_then(|m| m.lock().unwrap().take()) + { + tracing::debug!("Dropping last QueueHandle"); + handle.abort(); + } + } +} diff --git a/jobs-tokio/src/spawn.rs b/jobs-tokio/src/spawn.rs index e3b4597..01c11f6 100644 --- a/jobs-tokio/src/spawn.rs +++ b/jobs-tokio/src/spawn.rs @@ -1,6 +1,6 @@ use std::future::Future; -use tokio::task::JoinHandle; +use tokio::task::{AbortHandle, JoinHandle, JoinSet}; #[cfg(tokio_unstable)] pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> @@ -11,6 +11,19 @@ where tokio::task::Builder::new().name(name).spawn(future) } +#[cfg(tokio_unstable)] +pub(crate) fn spawn_in( + set: &mut JoinSet, + name: &str, + future: F, +) -> std::io::Result +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + set.build_task().name(name).spawn(future) +} + #[cfg(not(tokio_unstable))] pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> where @@ -20,3 +33,17 @@ where let _ = name; Ok(tokio::task::spawn(future)) } + +#[cfg(not(tokio_unstable))] +pub(crate) fn spawn_in( + set: &mut JoinSet, + name: &str, + future: F, +) -> std::io::Result +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + let _ = name; + Ok(set.spawn(future)) +} diff --git a/jobs-tokio/src/worker.rs b/jobs-tokio/src/worker.rs index f5e7d1f..f47b8b4 100644 --- a/jobs-tokio/src/worker.rs +++ b/jobs-tokio/src/worker.rs @@ -7,52 +7,6 @@ use std::{ use tracing::{Instrument, Span}; use uuid::Uuid; -struct LocalWorkerStarter { - queue: String, - processors: ProcessorMap, - server: Storage, -} - -#[cfg(tokio_unstable)] -fn test_runtime() -> anyhow::Result<()> { - tokio::task::Builder::new() - .name("runtime-test") - .spawn(async move {}) - .map(|_| ()) - .map_err(From::from) -} - -#[cfg(not(tokio_unstable))] -fn test_runtime() -> anyhow::Result<()> { - std::panic::catch_unwind(|| tokio::spawn(async move {})).map(|_| ()).map_err(From::from) -} - -impl Drop for LocalWorkerStarter where State: Send + Clone + 'static { - fn drop(&mut self) { - metrics::counter!("background-jobs.tokio.worker.finished", "queue" => self.queue.clone()) - .increment(1); - - let res = test_runtime(); - - if res.is_ok() { - if let Err(e) = crate::spawn::spawn( - "local-worker", - local_worker( - self.queue.clone(), - self.processors.clone(), - self.server.clone(), - ), - ) { - tracing::error!("Failed to re-spawn local worker: {e}"); - } else { - metrics::counter!("background-jobs.tokio.worker.restart").increment(1); - } - } else { - tracing::info!("Shutting down worker"); - } - } -} - struct RunOnDrop(F) where F: Fn(); @@ -148,18 +102,13 @@ pub(crate) async fn local_worker( ) where State: Send + Clone + 'static, { - metrics::counter!("background-jobs.tokio.worker.started", "queue" => queue.clone()).increment(1); - - let starter = LocalWorkerStarter { - queue: queue.clone(), - processors: processors.clone(), - server: server.clone(), - }; + metrics::counter!("background-jobs.tokio.worker.started", "queue" => queue.clone()) + .increment(1); let id = Uuid::now_v7(); let log_on_drop = RunOnDrop(|| { - make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing")); + make_span(id, &queue, "closing").in_scope(|| tracing::debug!("Worker closing")); }); loop { @@ -219,7 +168,6 @@ pub(crate) async fn local_worker( } drop(log_on_drop); - drop(starter); } fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {