diff --git a/jobs-tokio/Cargo.toml b/jobs-tokio/Cargo.toml new file mode 100644 index 0000000..857d32b --- /dev/null +++ b/jobs-tokio/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "background-jobs-tokio" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.79" +async-trait = "0.1.77" +background-jobs-core = { version = "0.17.0", path = "../jobs-core" } +metrics = "0.22.0" +serde = "1.0.195" +serde_json = "1.0.111" +tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time", "tracing"] } +tracing = "0.1.40" +uuid = { version = "1.6.1", features = ["v7", "serde"] } diff --git a/jobs-tokio/src/every.rs b/jobs-tokio/src/every.rs new file mode 100644 index 0000000..d4b741d --- /dev/null +++ b/jobs-tokio/src/every.rs @@ -0,0 +1,26 @@ +use crate::QueueHandle; +use background_jobs_core::Job; +use std::time::Duration; +use tokio::time::{interval_at, Instant}; + +/// A type used to schedule recurring jobs. +/// +/// ```rust,ignore +/// let server = create_server(storage); +/// server.every(Duration::from_secs(60 * 30), MyJob::new()); +/// ``` +pub(crate) async fn every(spawner: QueueHandle, duration: Duration, job: J) +where + J: Job + Clone + Send, +{ + let mut interval = interval_at(Instant::now(), duration); + + loop { + interval.tick().await; + + let job = job.clone(); + if spawner.queue::(job).await.is_err() { + tracing::error!("Failed to queue job: {}", J::NAME); + } + } +} diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs new file mode 100644 index 0000000..03755a6 --- /dev/null +++ b/jobs-tokio/src/lib.rs @@ -0,0 +1,287 @@ +#![deny(missing_docs)] + +//! # A Tokio-based Jobs Processor +//! +//! This library will spin up as many actors as requested for each processor to process jobs +//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in +//! order to achieve parallel execution, multiple Arbiters must be in use. +//! +//! The thread count is used to spawn Synchronous Actors to handle the storage of job +//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be +//! used. By default, the number of cores of the running system is used. +//! +//! ### Example +//! ```rust +//! use anyhow::Error; +//! use background_jobs_core::{Backoff, Job, MaxRetries}; +//! use background_jobs_tokio::{TokioTimer, WorkerConfig}; +//! use std::future::{ready, Ready}; +//! +//! const DEFAULT_QUEUE: &'static str = "default"; +//! +//! #[derive(Clone, Debug)] +//! pub struct MyState { +//! pub app_name: String, +//! } +//! +//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +//! pub struct MyJob { +//! some_usize: usize, +//! other_usize: usize, +//! } +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Error> { +//! // Set up our Storage +//! // For this example, we use the default in-memory storage mechanism +//! use background_jobs_core::memory_storage::Storage; +//! let storage = Storage::new(TokioTimer); +//! +//! // Configure and start our workers +//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App")) +//! .register::() +//! .set_worker_count(DEFAULT_QUEUE, 16) +//! .start(); +//! +//! // Queue our jobs +//! queue_handle.queue(MyJob::new(1, 2)).await?; +//! queue_handle.queue(MyJob::new(3, 4)).await?; +//! queue_handle.queue(MyJob::new(5, 6)).await?; +//! +//! // tokio::signal::ctrl_c().await?; +//! +//! Ok(()) +//! } +//! +//! impl MyState { +//! pub fn new(app_name: &str) -> Self { +//! MyState { +//! app_name: app_name.to_owned(), +//! } +//! } +//! } +//! +//! impl MyJob { +//! pub fn new(some_usize: usize, other_usize: usize) -> Self { +//! MyJob { +//! some_usize, +//! other_usize, +//! } +//! } +//! } +//! +//! impl Job for MyJob { +//! type State = MyState; +//! type Future = Ready>; +//! +//! // The name of the job. It is super important that each job has a unique name, +//! // because otherwise one job will overwrite another job when they're being +//! // registered. +//! const NAME: &'static str = "MyJob"; +//! +//! // The queue that this processor belongs to +//! // +//! // Workers have the option to subscribe to specific queues, so this is important to +//! // determine which worker will call the processor +//! // +//! // Jobs can optionally override the queue they're spawned on +//! const QUEUE: &'static str = DEFAULT_QUEUE; +//! +//! // The number of times background-jobs should try to retry a job before giving up +//! // +//! // This value defaults to MaxRetries::Count(5) +//! // Jobs can optionally override this value +//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); +//! +//! // The logic to determine how often to retry this job if it fails +//! // +//! // This value defaults to Backoff::Exponential(2) +//! // Jobs can optionally override this value +//! const BACKOFF: Backoff = Backoff::Exponential(2); +//! +//! // This is important for allowing the job server to reap processes that were started but never +//! // completed. +//! // +//! // Defaults to 5 seconds +//! const HEARTBEAT_INTERVAL: u64 = 5_000; +//! +//! fn run(self, state: MyState) -> Self::Future { +//! println!("{}: args, {:?}", state.app_name, self); +//! +//! ready(Ok(())) +//! } +//! } +//! ``` + +use anyhow::Error; +use background_jobs_core::{ + memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait, +}; +use std::{ + collections::BTreeMap, + sync::Arc, + time::{Duration, SystemTime}, +}; + +mod every; +mod spawn; +mod storage; +mod worker; + +use self::{every::every, storage::Storage}; + +/// A timer implementation for the Memory Storage backend +#[derive(Debug, Clone)] +pub struct TokioTimer; + +#[async_trait::async_trait] +impl Timer for TokioTimer { + async fn timeout(&self, duration: Duration, future: F) -> Result + where + F: std::future::Future + Send + Sync, + { + tokio::time::timeout(duration, future).await.map_err(|_| ()) + } +} + +/// Create a new Server +fn create_server(storage: S) -> QueueHandle +where + S: StorageTrait + Sync + 'static, +{ + QueueHandle { + inner: Storage::new(storage), + } +} + +/// Worker Configuration +/// +/// This type is used for configuring and creating workers to process jobs. Before starting the +/// workers, register `Job` types with this struct. This worker registration allows for +/// different worker processes to handle different sets of workers. +#[derive(Clone)] +pub struct WorkerConfig +where + State: Clone + 'static, +{ + processors: ProcessorMap, + queues: BTreeMap, + queue_handle: QueueHandle, +} + +impl WorkerConfig +where + State: Send + Clone + 'static, +{ + /// Create a new WorkerConfig + /// + /// The supplied function should return the State required by the jobs intended to be + /// processed. The function must be sharable between threads, but the state itself does not + /// have this requirement. + pub fn new( + storage: S, + state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static, + ) -> Self { + let queue_handle = create_server(storage); + let q2 = queue_handle.clone(); + + WorkerConfig { + processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))), + queues: BTreeMap::new(), + queue_handle, + } + } + + /// Register a `Job` with the worker + /// + /// This enables the worker to handle jobs associated with this processor. If a processor is + /// not registered, none of it's jobs will be run, even if another processor handling the same + /// job queue is registered. + pub fn register(mut self) -> Self + where + J: Job, + { + self.queues.insert(J::QUEUE.to_owned(), 4); + self.processors.register::(); + self + } + + /// Set the number of workers to run for a given queue + /// + /// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto + /// will handle processing all workers, regardless of how many are configured. + /// + /// By default, 4 workers are spawned + pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self { + self.queues.insert(queue.to_owned(), count); + self + } + + /// Start the workers in the provided arbiter + pub fn start(self) -> QueueHandle { + for (key, count) in self.queues.iter() { + for _ in 0..*count { + let queue = key.clone(); + let processors = self.processors.clone(); + let server = self.queue_handle.inner.clone(); + + if let Err(e) = spawn::spawn( + "local-worker", + worker::local_worker(queue, processors.clone(), server), + ) { + tracing::error!("Failed to spawn worker {e}"); + } + } + } + + self.queue_handle + } +} + +/// A handle to the job server, used for queuing new jobs +/// +/// `QueueHandle` should be stored in your application's state in order to allow all parts of your +/// application to spawn jobs. +#[derive(Clone)] +pub struct QueueHandle { + inner: Storage, +} + +impl QueueHandle { + /// Queues a job for execution + /// + /// This job will be sent to the server for storage, and will execute whenever a worker for the + /// job's queue is free to do so. + pub async fn queue(&self, job: J) -> Result<(), Error> + where + J: Job, + { + let job = new_job(job)?; + self.inner.push(job).await?; + Ok(()) + } + + /// Schedule a job for execution later + /// + /// This job will be sent to the server for storage, and will execute after the specified time + /// and when a worker for the job's queue is free to do so. + pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> + where + J: Job, + { + let job = new_scheduled_job(job, after)?; + self.inner.push(job).await?; + Ok(()) + } + + /// Queues a job for recurring execution + /// + /// This job will be added to it's queue on the server once every `Duration`. It will be + /// processed whenever workers are free to do so. + pub fn every(&self, duration: Duration, job: J) -> std::io::Result<()> + where + J: Job + Clone + Send + 'static, + { + spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ()) + } +} diff --git a/jobs-tokio/src/spawn.rs b/jobs-tokio/src/spawn.rs new file mode 100644 index 0000000..e3b4597 --- /dev/null +++ b/jobs-tokio/src/spawn.rs @@ -0,0 +1,22 @@ +use std::future::Future; + +use tokio::task::JoinHandle; + +#[cfg(tokio_unstable)] +pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + tokio::task::Builder::new().name(name).spawn(future) +} + +#[cfg(not(tokio_unstable))] +pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + let _ = name; + Ok(tokio::task::spawn(future)) +} diff --git a/jobs-tokio/src/storage.rs b/jobs-tokio/src/storage.rs new file mode 100644 index 0000000..cf87230 --- /dev/null +++ b/jobs-tokio/src/storage.rs @@ -0,0 +1,67 @@ +use std::{ops::Deref, sync::Arc}; + +use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait}; +use uuid::Uuid; + +#[async_trait::async_trait] +pub trait TokioStorage: Send + Sync { + async fn push(&self, job: NewJobInfo) -> anyhow::Result; + + async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result; + + async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> anyhow::Result<()>; + + async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()>; +} + +#[derive(Clone)] +pub(crate) struct Storage { + inner: Arc, +} + +struct StorageWrapper(S); + +#[async_trait::async_trait] +impl TokioStorage for StorageWrapper +where + S: StorageTrait + Send + Sync + 'static, +{ + async fn push(&self, job: NewJobInfo) -> anyhow::Result { + self.0.push(job).await.map_err(From::from) + } + + async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result { + self.0.pop(queue, runner_id).await.map_err(From::from) + } + + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> anyhow::Result<()> { + self.0.heartbeat(job_id, runner_id).await.map_err(From::from) + } + + async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()> { + self.0 + .complete(return_job_info) + .await + .map(|_| ()) + .map_err(From::from) + } +} + +impl Storage { + pub(crate) fn new(storage: S) -> Self + where + S: StorageTrait + Send + Sync + 'static, + { + Self { + inner: Arc::new(StorageWrapper(storage)), + } + } +} + +impl Deref for Storage { + type Target = dyn TokioStorage + 'static; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} diff --git a/jobs-tokio/src/worker.rs b/jobs-tokio/src/worker.rs new file mode 100644 index 0000000..f5e7d1f --- /dev/null +++ b/jobs-tokio/src/worker.rs @@ -0,0 +1,236 @@ +use crate::storage::Storage; +use background_jobs_core::ProcessorMap; +use std::{ + future::{poll_fn, Future}, + pin::Pin, +}; +use tracing::{Instrument, Span}; +use uuid::Uuid; + +struct LocalWorkerStarter { + queue: String, + processors: ProcessorMap, + server: Storage, +} + +#[cfg(tokio_unstable)] +fn test_runtime() -> anyhow::Result<()> { + tokio::task::Builder::new() + .name("runtime-test") + .spawn(async move {}) + .map(|_| ()) + .map_err(From::from) +} + +#[cfg(not(tokio_unstable))] +fn test_runtime() -> anyhow::Result<()> { + std::panic::catch_unwind(|| tokio::spawn(async move {})).map(|_| ()).map_err(From::from) +} + +impl Drop for LocalWorkerStarter where State: Send + Clone + 'static { + fn drop(&mut self) { + metrics::counter!("background-jobs.tokio.worker.finished", "queue" => self.queue.clone()) + .increment(1); + + let res = test_runtime(); + + if res.is_ok() { + if let Err(e) = crate::spawn::spawn( + "local-worker", + local_worker( + self.queue.clone(), + self.processors.clone(), + self.server.clone(), + ), + ) { + tracing::error!("Failed to re-spawn local worker: {e}"); + } else { + metrics::counter!("background-jobs.tokio.worker.restart").increment(1); + } + } else { + tracing::info!("Shutting down worker"); + } + } +} + +struct RunOnDrop(F) +where + F: Fn(); + +impl Drop for RunOnDrop +where + F: Fn(), +{ + fn drop(&mut self) { + (self.0)(); + } +} + +async fn heartbeat_job( + storage: &Storage, + future: F, + job_id: Uuid, + runner_id: Uuid, + heartbeat_interval: u64, +) -> F::Output { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval)); + + let mut future = std::pin::pin!(future); + + let mut hb_future = Some(storage.heartbeat(job_id, runner_id)); + + loop { + tokio::select! { + output = &mut future => { + break output; + }, + Some(hb_output) = option(hb_future.as_mut()), if hb_future.is_some() => { + hb_future.take(); + + if let Err(e) = hb_output { + tracing::warn!("Failed to heartbeat: {e}"); + } + } + _ = interval.tick() => { + if hb_future.is_none() { + hb_future = Some(storage.heartbeat(job_id, runner_id)); + } + } + } + } +} + +async fn time_job(future: F, job_id: Uuid) -> F::Output { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + interval.tick().await; + let mut count = 0; + + let mut future = std::pin::pin!(future); + + loop { + tokio::select! { + output = &mut future => { + break output; + }, + _ = interval.tick() => { + count += 5; + + if count > (60 * 60) { + if count % (60 * 20) == 0 { + tracing::warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60); + } + } else if count > 60 { + if count % 20 == 0 { + tracing::warn!("Job {} is taking a long time: {} minutes", job_id, count / 60); + } + } else { + tracing::info!("Job {} is taking a long time: {} seconds", job_id, count); + } + } + } + } +} + +async fn option(opt: Option<&mut F>) -> Option +where + F: Future + Unpin, +{ + match opt { + Some(f) => Some(poll_fn(|cx| Pin::new(&mut *f).poll(cx)).await), + None => None, + } +} + +pub(crate) async fn local_worker( + queue: String, + processors: ProcessorMap, + server: Storage, +) where + State: Send + Clone + 'static, +{ + metrics::counter!("background-jobs.tokio.worker.started", "queue" => queue.clone()).increment(1); + + let starter = LocalWorkerStarter { + queue: queue.clone(), + processors: processors.clone(), + server: server.clone(), + }; + + let id = Uuid::now_v7(); + + let log_on_drop = RunOnDrop(|| { + make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing")); + }); + + loop { + let request_span = make_span(id, &queue, "request"); + + let job = match request_span + .in_scope(|| server.pop(&queue, id)) + .instrument(request_span.clone()) + .await + { + Ok(job) => job, + Err(e) => { + metrics::counter!("background-jobs.tokio.worker.failed-request").increment(1); + + let display_val = format!("{}", e); + let debug = format!("{:?}", e); + request_span.record("exception.message", &tracing::field::display(&display_val)); + request_span.record("exception.details", &tracing::field::display(&debug)); + request_span + .in_scope(|| tracing::error!("Failed to notify server of ready worker")); + break; + } + }; + drop(request_span); + + let process_span = make_span(id, &queue, "process"); + let job_id = job.id; + let heartbeat_interval = job.heartbeat_interval; + let return_job = process_span + .in_scope(|| { + heartbeat_job( + &server, + time_job(processors.process(job), job_id), + job_id, + id, + heartbeat_interval, + ) + }) + .instrument(process_span) + .await; + + let return_span = make_span(id, &queue, "return"); + if let Err(e) = return_span + .in_scope(|| server.complete(return_job)) + .instrument(return_span.clone()) + .await + { + metrics::counter!("background-jobs.tokio.worker.failed-return").increment(1); + + let display_val = format!("{}", e); + let debug = format!("{:?}", e); + return_span.record("exception.message", &tracing::field::display(&display_val)); + return_span.record("exception.details", &tracing::field::display(&debug)); + return_span.in_scope(|| tracing::warn!("Failed to return completed job")); + } + drop(return_span); + } + + drop(log_on_drop); + drop(starter); +} + +fn make_span(id: Uuid, queue: &str, operation: &str) -> Span { + tracing::info_span!( + parent: None, + "Worker", + worker.id = tracing::field::display(id), + worker.queue = tracing::field::display(queue), + worker.operation.id = tracing::field::display(&Uuid::now_v7()), + worker.operation.name = tracing::field::display(operation), + exception.message = tracing::field::Empty, + exception.details = tracing::field::Empty, + ) +}