diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index ee8fbf6..6787309 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.14.1" +version = "0.14.2" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 01471f9..b60011e 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -181,7 +181,7 @@ impl Manager { let worker_config = worker_config.clone(); manager_arbiter.spawn(async move { - let mut worker_arbiter = Arbiter::new(); + let mut worker_arbiter = ArbiterDropper::new(); loop { let notifier = DropNotifier::default(); @@ -200,10 +200,9 @@ impl Manager { metrics::counter!("background-jobs.worker-arbiter.restart", 1, "number" => i.to_string()); tracing::warn!("Recovering from dead worker arbiter"); - worker_arbiter.stop(); - let _ = worker_arbiter.join(); + drop(worker_arbiter); - worker_arbiter = Arbiter::new(); + worker_arbiter = ArbiterDropper::new(); } }); } @@ -249,6 +248,34 @@ impl Drop for DropNotifier { } } +struct ArbiterDropper { + arbiter: Option, +} + +impl ArbiterDropper { + fn new() -> Self { + Self { + arbiter: Some(Arbiter::new()), + } + } +} + +impl Deref for ArbiterDropper { + type Target = Arbiter; + + fn deref(&self) -> &Self::Target { + self.arbiter.as_ref().unwrap() + } +} + +impl Drop for ArbiterDropper { + fn drop(&mut self) { + let arbiter = self.arbiter.take().unwrap(); + arbiter.stop(); + let _ = arbiter.join(); + } +} + /// Create a new managed Server /// /// In previous versions of this library, the server itself was run on it's own dedicated threads