diff --git a/Cargo.toml b/Cargo.toml index 2d50897..b6069ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "jobs-core", "jobs-sled", "examples/actix-example", + "examples/managed-example", ] [features] diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 8c6ea0b..2932afc 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -38,7 +38,7 @@ async fn main() -> Result<(), Error> { let arbiter = Arbiter::new(); // Start the application server. This guards access to to the jobs store - let queue_handle = create_server_in_arbiter(storage, arbiter.handle()); + let queue_handle = create_server_in_arbiter(arbiter.handle(), storage); // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) diff --git a/examples/managed-example/.gitignore b/examples/managed-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/managed-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml new file mode 100644 index 0000000..0306e1c --- /dev/null +++ b/examples/managed-example/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "managed-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-rt = "2.0.0" +anyhow = "1.0" +async-trait = "0.1.24" +background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.2", features = ["fmt"] } +serde = { version = "1.0", features = ["derive"] } +sled = "0.34" diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs new file mode 100644 index 0000000..e8b72c5 --- /dev/null +++ b/examples/managed-example/src/main.rs @@ -0,0 +1,143 @@ +use actix_rt::Arbiter; +use anyhow::Error; +use background_jobs::{ActixJob as Job, Manager, MaxRetries, WorkerConfig}; +use background_jobs_sled_storage::Storage; +use chrono::{Duration, Utc}; +use std::future::{ready, Ready}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +const DEFAULT_QUEUE: &str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct StopJob; + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + + // Set up our Storage + let db = sled::Config::new().temporary(true).open()?; + let storage = Storage::new(db)?; + + // Configure and start our workers + let worker_config = WorkerConfig::new(move || MyState::new("My App")) + .register::() + .register::() + .set_worker_count(DEFAULT_QUEUE, 16); + + // Start the application server. This guards access to to the jobs store + let manager = Manager::new(storage, worker_config); + + // Queue our jobs + manager.queue_handle().queue(MyJob::new(1, 2)).await?; + manager.queue_handle().queue(MyJob::new(3, 4)).await?; + manager.queue_handle().queue(MyJob::new(5, 6)).await?; + manager + .queue_handle() + .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + // kill the current arbiter + manager.queue_handle().queue(StopJob).await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + // See that the workers have respawned + manager.queue_handle().queue(MyJob::new(1, 2)).await?; + manager.queue_handle().queue(MyJob::new(3, 4)).await?; + manager.queue_handle().queue(MyJob::new(5, 6)).await?; + manager + .queue_handle() + .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + drop(manager); + + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +#[async_trait::async_trait] +impl Job for MyJob { + type State = MyState; + type Future = Ready>; + + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being + // registered. + const NAME: &'static str = "MyJob"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + ready(Ok(())) + } +} + +#[async_trait::async_trait] +impl Job for StopJob { + type State = MyState; + type Future = Ready>; + + const NAME: &'static str = "StopJob"; + const QUEUE: &'static str = DEFAULT_QUEUE; + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, _: MyState) -> Self::Future { + Arbiter::current().stop(); + + ready(Ok(())) + } +} diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index efa117e..84576c4 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -9,21 +9,18 @@ use tracing::error; /// let server = create_server(storage); /// server.every(Duration::from_secs(60 * 30), MyJob::new()); /// ``` -pub(crate) fn every(spawner: &QueueHandle, duration: Duration, job: J) +pub(crate) async fn every(spawner: QueueHandle, duration: Duration, job: J) where J: Job + Clone + Send, { - let spawner_clone = spawner.clone(); - spawner.tokio_rt.spawn(async move { - let mut interval = interval_at(Instant::now(), duration); + let mut interval = interval_at(Instant::now(), duration); - loop { - interval.tick().await; + loop { + interval.tick().await; - let job = job.clone(); - if spawner_clone.queue::(job).await.is_err() { - error!("Failed to queue job: {}", J::NAME); - } + let job = job.clone(); + if spawner.queue::(job).await.is_err() { + error!("Failed to queue job: {}", J::NAME); } - }); + } } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 999bba9..8c22055 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -120,17 +120,136 @@ use actix_rt::{Arbiter, ArbiterHandle}; use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use chrono::{DateTime, Utc}; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{collections::BTreeMap, ops::Deref, sync::Arc, time::Duration}; mod every; mod server; mod storage; mod worker; -use self::{every::every, server::Server, worker::local_worker}; +use self::{every::every, server::Server, worker::LocalWorkerStarter}; pub use background_jobs_core::ActixJob; +/// Manager for worker threads +/// +/// Manager attempts to restart workers as their arbiters die +pub struct Manager { + // the manager arbiter + _arbiter: ArbiterDropper, + + // handle for queueing + queue_handle: QueueHandle, +} + +struct ArbiterDropper { + arbiter: Option, +} + +impl Manager { + /// Create a new manager to keep jobs alive + /// + /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it + /// spins up another one and spawns the workers again + pub fn new(storage: S, worker_config: WorkerConfig) -> Self + where + S: Storage + Sync + 'static, + State: Clone, + { + let arbiter = Arbiter::new(); + let worker_arbiter = Arbiter::new(); + let notifier = Arc::new(tokio::sync::Notify::new()); + + let queue_handle = create_server_managed(storage); + + let drop_notifier = DropNotifier::new(Arc::clone(¬ifier)); + let queue_handle_2 = queue_handle.clone(); + arbiter.spawn(async move { + let queue_handle = queue_handle_2; + + let mut drop_notifier = drop_notifier; + let mut arbiter = ArbiterDropper { + arbiter: Some(worker_arbiter), + }; + + loop { + queue_handle + .inner + .ticker(arbiter.handle(), drop_notifier.clone()); + worker_config.start_managed( + &arbiter.handle(), + queue_handle.clone(), + &drop_notifier, + ); + + notifier.notified().await; + // drop_notifier needs to live at least as long as notifier.notified().await + // in order to ensure we get notified by ticker or a worker, and not ourselves + drop(drop_notifier); + + // Assume arbiter is dead if we were notified + if arbiter.spawn(async {}) { + panic!("Arbiter should be dead by now"); + } + + arbiter = ArbiterDropper { + arbiter: Some(Arbiter::new()), + }; + + drop_notifier = DropNotifier::new(Arc::clone(¬ifier)); + } + }); + + Manager { + _arbiter: ArbiterDropper { + arbiter: Some(arbiter), + }, + queue_handle, + } + } + + /// Retrieve the QueueHandle for the managed workers + pub fn queue_handle(&self) -> &QueueHandle { + &self.queue_handle + } +} + +impl Deref for ArbiterDropper { + type Target = Arbiter; + + fn deref(&self) -> &Self::Target { + self.arbiter.as_ref().unwrap() + } +} + +impl Drop for ArbiterDropper { + fn drop(&mut self) { + self.stop(); + let _ = self.arbiter.take().unwrap().join(); + } +} + +#[derive(Clone)] +struct DropNotifier { + inner: Arc>>>, +} + +impl DropNotifier { + fn new(notify: Arc) -> Self { + DropNotifier { + inner: Arc::new(std::sync::Mutex::new(Some(notify))), + } + } +} + +impl Drop for DropNotifier { + fn drop(&mut self) { + if let Some(notifier) = self.inner.lock().unwrap().take() { + notifier.notify_one(); + } + } +} + /// Create a new Server /// /// In previous versions of this library, the server itself was run on it's own dedicated threads @@ -142,7 +261,7 @@ pub fn create_server(storage: S) -> QueueHandle where S: Storage + Sync + 'static, { - create_server_in_arbiter(storage, Arbiter::current()) + create_server_in_arbiter(Arbiter::current(), storage) } /// Create a new Server @@ -152,15 +271,26 @@ where /// primitives, the Server has become an object that gets shared between client threads. /// /// This method will panic if not called from an actix runtime -pub fn create_server_in_arbiter(storage: S, arbiter: ArbiterHandle) -> QueueHandle +pub fn create_server_in_arbiter(arbiter: ArbiterHandle, storage: S) -> QueueHandle where S: Storage + Sync + 'static, { - let tokio_rt = tokio::runtime::Handle::current(); + let handle = create_server_managed(storage); + handle.inner.ticker(arbiter, ()); + handle +} +/// Create a new managed Server +/// +/// In previous versions of this library, the server itself was run on it's own dedicated threads +/// and guarded access to jobs via messages. Since we now have futures-aware synchronization +/// primitives, the Server has become an object that gets shared between client threads. +pub fn create_server_managed(storage: S) -> QueueHandle +where + S: Storage + Sync + 'static, +{ QueueHandle { - inner: Server::new(arbiter, storage), - tokio_rt, + inner: Server::new(storage), } } @@ -228,14 +358,31 @@ where /// Start the workers in the provided arbiter pub fn start_in_arbiter(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) { - for (key, count) in self.queues.into_iter() { - for _ in 0..count { - let key = key.clone(); + self.start_managed(arbiter, queue_handle, &()) + } + + /// Start a workers in a managed way + pub fn start_managed( + &self, + arbiter: &ArbiterHandle, + queue_handle: QueueHandle, + extras: &Extras, + ) { + for (key, count) in self.queues.iter() { + for _ in 0..*count { + let queue = key.clone(); let processors = self.processors.clone(); let server = queue_handle.inner.clone(); + let extras_2 = extras.clone(); + arbiter.spawn_fn(move || { - actix_rt::spawn(local_worker(key, processors.cached(), server)); + drop(LocalWorkerStarter::new( + queue, + processors.cached(), + server, + extras_2, + )); }); } } @@ -249,7 +396,6 @@ where #[derive(Clone)] pub struct QueueHandle { inner: Server, - tokio_rt: tokio::runtime::Handle, } impl QueueHandle { @@ -266,17 +412,6 @@ impl QueueHandle { Ok(()) } - /// Queues a job for execution - /// - /// This job will be sent to the server for storage, and will execute whenever a worker for the - /// job's queue is free to do so. - pub async fn blocking_queue(&self, job: J) -> Result<(), Error> - where - J: Job, - { - self.tokio_rt.block_on(self.queue(job)) - } - /// Schedule a job for execution later /// /// This job will be sent to the server for storage, and will execute after the specified time @@ -291,17 +426,6 @@ impl QueueHandle { Ok(()) } - /// Schedule a job for execution later - /// - /// This job will be sent to the server for storage, and will execute after the specified time - /// and when a worker for the job's queue is free to do so. - pub fn blocking_schedule(&self, job: J, after: DateTime) -> Result<(), Error> - where - J: Job, - { - self.tokio_rt.block_on(self.schedule(job, after)) - } - /// Queues a job for recurring execution /// /// This job will be added to it's queue on the server once every `Duration`. It will be @@ -310,7 +434,7 @@ impl QueueHandle { where J: Job + Clone + Send + 'static, { - every(self, duration, job); + actix_rt::spawn(every(self.clone(), duration, job)); } /// Return an overview of the processor's statistics diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index fd287f6..8f53876 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -23,19 +23,24 @@ pub(crate) struct ServerCache { cache: Arc>>, } -struct Ticker { +pub(super) struct Ticker { server: Server, + extras: Option, + arbiter: ArbiterHandle, } -impl Drop for Ticker { +impl Drop for Ticker { fn drop(&mut self) { - let online = self.server.arbiter.spawn(async move {}); + let online = self.arbiter.spawn(async move {}); + + let extras = self.extras.take().unwrap(); if online { let server = self.server.clone(); - self.server.arbiter.spawn(async move { - let _ticker = server.ticker(); + let arbiter = self.arbiter.clone(); + let spawned = self.arbiter.spawn(async move { + let _ticker = server.ticker(arbiter, extras); let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); loop { @@ -45,9 +50,13 @@ impl Drop for Ticker { } } }); - } else { - warn!("Not restarting ticker, arbiter is dead"); + + if spawned { + return; + } } + + warn!("Not restarting ticker, arbiter is dead"); } } @@ -59,29 +68,30 @@ impl Drop for Ticker { pub(crate) struct Server { storage: Arc, cache: ServerCache, - arbiter: ArbiterHandle, } impl Server { - fn ticker(&self) -> Ticker { + pub(super) fn ticker( + &self, + arbiter: ArbiterHandle, + extras: Extras, + ) -> Ticker { Ticker { server: self.clone(), + extras: Some(extras), + arbiter, } } + /// Create a new Server from a compatible storage implementation - pub(crate) fn new(arbiter: ArbiterHandle, storage: S) -> Self + pub(crate) fn new(storage: S) -> Self where S: Storage + Sync + 'static, { - let server = Server { + Server { storage: Arc::new(StorageWrapper(storage)), cache: ServerCache::new(), - arbiter, - }; - - drop(server.ticker()); - - server + } } async fn check_db(&self) -> Result<(), Error> { diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index c2767db..3247ceb 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -56,24 +56,45 @@ impl Worker for LocalWorkerHandle { } } -struct LocalWorkerStarter { +pub(crate) struct LocalWorkerStarter { queue: String, processors: CachedProcessorMap, server: Server, + extras: Option, } -impl Drop for LocalWorkerStarter { +impl LocalWorkerStarter { + pub(super) fn new( + queue: String, + processors: CachedProcessorMap, + server: Server, + extras: Extras, + ) -> Self { + LocalWorkerStarter { + queue, + processors, + server, + extras: Some(extras), + } + } +} + +impl Drop for LocalWorkerStarter { fn drop(&mut self) { let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {})); + let extras = self.extras.take().unwrap(); + if let Ok(true) = res { actix_rt::spawn(local_worker( self.queue.clone(), self.processors.clone(), self.server.clone(), + extras, )); } else { warn!("Not restarting worker, Arbiter is dead"); + drop(extras); } } } @@ -91,18 +112,22 @@ where } } -pub(crate) async fn local_worker( +async fn local_worker( queue: String, processors: CachedProcessorMap, server: Server, + extras: Extras, ) where State: Clone + 'static, + Extras: 'static, { let starter = LocalWorkerStarter { queue: queue.clone(), processors: processors.clone(), server: server.clone(), + extras: Some(extras), }; + let id = Uuid::new_v4(); let (tx, mut rx) = channel(16); diff --git a/src/lib.rs b/src/lib.rs index b79629a..525c182 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,5 +173,5 @@ pub mod dev { #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{ - create_server, create_server_in_arbiter, ActixJob, QueueHandle, WorkerConfig, + create_server, create_server_in_arbiter, ActixJob, Manager, QueueHandle, WorkerConfig, };