From 759ccf018ba0df78852387c44d1d0d01754b259e Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 20 Apr 2020 19:30:56 -0500 Subject: [PATCH] Eliminate Processor --- Cargo.toml | 8 +- README.md | 67 ++++------- examples/actix-example/Cargo.toml | 2 +- examples/actix-example/src/main.rs | 33 ++---- jobs-actix/Cargo.toml | 4 +- jobs-actix/src/lib.rs | 50 ++++---- jobs-actix/src/server.rs | 2 +- jobs-actix/src/worker.rs | 6 +- jobs-core/Cargo.toml | 4 +- jobs-core/src/actix_job.rs | 78 +++++++++---- jobs-core/src/job.rs | 156 +++++++++++++++++++++---- jobs-core/src/job_info.rs | 28 +++-- jobs-core/src/lib.rs | 26 ++--- jobs-core/src/processor.rs | 180 ----------------------------- jobs-core/src/processor_map.rs | 50 ++++---- jobs-core/src/storage.rs | 2 +- jobs-sled/Cargo.toml | 4 +- src/lib.rs | 61 ++-------- 18 files changed, 319 insertions(+), 442 deletions(-) delete mode 100644 jobs-core/src/processor.rs diff --git a/Cargo.toml b/Cargo.toml index d819b96..774d6e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with sled, actix, and futures" -version = "0.8.0-alpha.0" +version = "0.8.0-alpha.1" license-file = "LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -21,15 +21,15 @@ members = [ default = ["background-jobs-actix", "background-jobs-sled-storage"] [dependencies.background-jobs-core] -version = "0.7.0" +version = "0.8.0-alpha.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.7.0-alpha.0" +version = "0.8.0-alpha.0" path = "jobs-actix" optional = true [dependencies.background-jobs-sled-storage] -version = "0.4.0-alpha.0" +version = "0.8.0-alpha.0" path = "jobs-sled" optional = true diff --git a/README.md b/README.md index 4f8d379..9f804b1 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,14 @@ might not be the best experience. - [Read the documentation on docs.rs](https://docs.rs/background-jobs) - [Find the crate on crates.io](https://crates.io/crates/background-jobs) -- [Join the discussion on Matrix](https://matrix.to/#/!vZKoAKLpHaFIWjRxpT:asonix.dog?via=asonix.dog) +- [Hit me up on Mastodon](https://asonix.dog/@asonix) ### Usage #### Add Background Jobs to your project ```toml [dependencies] actix = "0.8" -background-jobs = "0.7.0" +background-jobs = "0.8.0-alpha.1" failure = "0.1" futures = "0.1" serde = "1.0" @@ -47,10 +47,11 @@ impl MyJob { } impl Job for MyJob { - type Processor = MyProcessor; // We will define this later type State = (); type Future = Result<(), Error>; + const NAME: &'static str = "MyJob"; + fn run(self, _: Self::State) -> Self::Future { info!("args: {:?}", self); @@ -81,38 +82,13 @@ impl MyState { } impl Job for MyJob { - type Processor = MyProcessor; // We will define this later type State = MyState; type Future = Result<(), Error>; - fn run(self, state: Self::State) -> Self::Future { - info!("{}: args, {:?}", state.app_name, self); - - Ok(()) - } -} -``` - -#### Next, define a Processor. -Processors are types that define default attributes for jobs, as well as containing some logic -used internally to perform the job. Processors must implement `Proccessor` and `Clone`. - -```rust -use background_jobs::{Backoff, MaxRetries, Processor}; - -const DEFAULT_QUEUE: &'static str = "default"; - -#[derive(Clone, Debug)] -pub struct MyProcessor; - -impl Processor for MyProcessor { - // The kind of job this processor should execute - type Job = MyJob; - - // The name of the processor. It is super important that each processor has a unique name, - // because otherwise one processor will overwrite another processor when they're being + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being // registered. - const NAME: &'static str = "MyProcessor"; + const NAME: &'static str = "MyJob"; // The queue that this processor belongs to // @@ -130,7 +106,13 @@ impl Processor for MyProcessor { // The logic to determine how often to retry this job if it fails // // Jobs can optionally override this value - const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); + const BACKOFF: Backoff = Backoff::Exponential(2); + + fn run(self, state: Self::State) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + Ok(()) + } } ``` @@ -153,10 +135,9 @@ use actix::System; use background_jobs::{ServerConfig, WorkerConfig}; use failure::Error; -fn main() -> Result<(), Error> { - // First set up the Actix System to ensure we have a runtime to spawn jobs on. - let sys = System::new("my-actix-system"); - +#[actix_rt::main] +async fn main() -> Result<(), Error> { + env_logger::init(); // Set up our Storage // For this example, we use the default in-memory storage mechanism use background_jobs::memory_storage::Storage; @@ -164,19 +145,19 @@ fn main() -> Result<(), Error> { /* // Optionally, a storage backend using the Sled database is provided - use sled::{ConfigBuilder, Db}; use background_jobs::sled_storage::Storage; - let db = Db::start(ConfigBuilder::default().temporary(true).build())?; + use sled_extensions::Db; + let db = Db::open("my-sled-db")?; let storage = Storage::new(db)?; */ // Start the application server. This guards access to to the jobs store - let queue_handle = ServerConfig::new(storage).thread_count(8).start(); + let queue_handle = create_server(storage); // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) - .register(MyProcessor) - .set_processor_count(DEFAULT_QUEUE, 16) + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) .start(queue_handle.clone()); // Queue our jobs @@ -185,7 +166,7 @@ fn main() -> Result<(), Error> { queue_handle.queue(MyJob::new(5, 6))?; // Block on Actix - sys.run()?; + actix_rt::signal::ctrl_c().await?; Ok(()) } ``` @@ -195,7 +176,7 @@ For the complete example project, see [the examples folder](https://git.asonix.d #### Bringing your own server/worker implementation If you want to create your own jobs processor based on this idea, you can depend on the -`background-jobs-core` crate, which provides the Processor and Job traits, as well as some +`background-jobs-core` crate, which provides the Job trait, as well as some other useful types for implementing a jobs processor and job store. ### Contributing diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 2f9a2c2..6ca8e0e 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -11,7 +11,7 @@ actix = "0.10.0-alpha.2" actix-rt = "1.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs = { version = "0.8.0-alpha.0", path = "../.." } +background-jobs = { version = "0.8.0-alpha.1", path = "../.." } env_logger = "0.7" futures = "0.3" sled-extensions = { version = "0.3.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/sled-extensions" } diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 3103600..31d585d 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use background_jobs::{create_server, Job, MaxRetries, Processor, WorkerConfig}; +use background_jobs::{create_server, Job, MaxRetries, WorkerConfig}; use futures::future::{ok, Ready}; const DEFAULT_QUEUE: &'static str = "default"; @@ -15,9 +15,6 @@ pub struct MyJob { other_usize: usize, } -#[derive(Clone, Debug)] -pub struct MyProcessor; - #[actix_rt::main] async fn main() -> Result<(), Error> { env_logger::init(); @@ -39,8 +36,8 @@ async fn main() -> Result<(), Error> { // Configure and start our workers WorkerConfig::new(move || MyState::new("My App")) - .register(MyProcessor) - .set_processor_count(DEFAULT_QUEUE, 16) + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) .start(queue_handle.clone()); // Queue our jobs @@ -72,25 +69,13 @@ impl MyJob { #[async_trait::async_trait] impl Job for MyJob { - type Processor = MyProcessor; type State = MyState; type Future = Ready>; - fn run(self, state: MyState) -> Self::Future { - println!("{}: args, {:?}", state.app_name, self); - - ok(()) - } -} - -impl Processor for MyProcessor { - // The kind of job this processor should execute - type Job = MyJob; - - // The name of the processor. It is super important that each processor has a unique name, - // because otherwise one processor will overwrite another processor when they're being + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being // registered. - const NAME: &'static str = "MyProcessor"; + const NAME: &'static str = "MyJob"; // The queue that this processor belongs to // @@ -104,4 +89,10 @@ impl Processor for MyProcessor { // // Jobs can optionally override this value const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + println!("{}: args, {:?}", state.app_name, self); + + ok(()) + } } diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 9539ac8..fbac10c 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.7.0-alpha.0" +version = "0.8.0-alpha.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -14,7 +14,7 @@ actix = "0.10.0-alpha.2" actix-rt = "1.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs-core = { version = "0.7", path = "../jobs-core", features = ["with-actix"] } +background-jobs-core = { version = "0.8.0-alpha.0", path = "../jobs-core", features = ["with-actix"] } chrono = "0.4" log = "0.4" num_cpus = "1.10.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 79189a6..4aa54b4 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -14,7 +14,7 @@ //! ```rust,ignore //! use actix::System; //! use anyhow::Error; -//! use background_jobs::{create_server, Backoff, Job, MaxRetries, Processor, WorkerConfig}; +//! use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig}; //! use futures::future::{ok, Ready}; //! //! const DEFAULT_QUEUE: &'static str = "default"; @@ -30,9 +30,6 @@ //! other_usize: usize, //! } //! -//! #[derive(Clone, Debug)] -//! pub struct MyProcessor; -//! //! #[actix_rt::main] //! async fn main() -> Result<(), Error> { //! // Set up our Storage @@ -45,8 +42,8 @@ //! //! // Configure and start our workers //! WorkerConfig::new(move || MyState::new("My App")) -//! .register(MyProcessor) -//! .set_processor_count(DEFAULT_QUEUE, 16) +//! .register::() +//! .set_worker_count(DEFAULT_QUEUE, 16) //! .start(queue_handle.clone()); //! //! // Queue our jobs @@ -78,25 +75,13 @@ //! //! #[async_trait::async_trait] //! impl Job for MyJob { -//! type Processor = MyProcessor; //! type State = MyState; //! type Future = Ready>; //! -//! async fn run(self, state: MyState) -> Self::Future { -//! println!("{}: args, {:?}", state.app_name, self); -//! -//! ok(()) -//! } -//! } -//! -//! impl Processor for MyProcessor { -//! // The kind of job this processor should execute -//! type Job = MyJob; -//! -//! // The name of the processor. It is super important that each processor has a unique name, -//! // because otherwise one processor will overwrite another processor when they're being +//! // The name of the job. It is super important that each job has a unique name, +//! // because otherwise one job will overwrite another job when they're being //! // registered. -//! const NAME: &'static str = "MyProcessor"; +//! const NAME: &'static str = "MyJob"; //! //! // The queue that this processor belongs to //! // @@ -123,12 +108,18 @@ //! // The timeout defines when a job is allowed to be considered dead, and so can be retried //! // by the job processor. The value is in milliseconds and defaults to 15,000 //! const TIMEOUT: i64 = 15_000 +//! +//! async fn run(self, state: MyState) -> Self::Future { +//! println!("{}: args, {:?}", state.app_name, self); +//! +//! ok(()) +//! } //! } //! ``` use actix::Arbiter; use anyhow::Error; -use background_jobs_core::{Job, Processor, ProcessorMap, Stats, Storage}; +use background_jobs_core::{new_job, Job, ProcessorMap, Stats, Storage}; use log::error; use std::{collections::BTreeMap, sync::Arc, time::Duration}; @@ -160,7 +151,7 @@ where /// Worker Configuration /// /// This type is used for configuring and creating workers to process jobs. Before starting the -/// workers, register `Processor` types with this struct. This worker registration allows for +/// workers, register `Job` types with this struct. This worker registration allows for /// different worker processes to handle different sets of workers. #[derive(Clone)] pub struct WorkerConfig @@ -187,18 +178,17 @@ where } } - /// Register a `Processor` with the worker + /// Register a `Job` with the worker /// /// This enables the worker to handle jobs associated with this processor. If a processor is /// not registered, none of it's jobs will be run, even if another processor handling the same /// job queue is registered. - pub fn register(mut self, processor: P) -> Self + pub fn register(mut self) -> Self where - P: Processor + Send + Sync + 'static, J: Job, { - self.queues.insert(P::QUEUE.to_owned(), 4); - self.processors.register_processor(processor); + self.queues.insert(J::QUEUE.to_owned(), 4); + self.processors.register::(); self } @@ -208,7 +198,7 @@ where /// will handle processing all workers, regardless of how many are configured. /// /// By default, 4 workers are spawned - pub fn set_processor_count(mut self, queue: &str, count: u64) -> Self { + pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self { self.queues.insert(queue.to_owned(), count); self } @@ -260,7 +250,7 @@ impl QueueHandle { where J: Job, { - let job = J::Processor::new_job(job)?; + let job = new_job(job)?; let server = self.inner.clone(); actix::spawn(async move { if let Err(e) = server.new_job(job).await { diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index ed4925d..0bbcf76 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -102,7 +102,7 @@ impl Server { ) -> Result { trace!("Trying to find job for worker {}", worker.id()); if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { - if let Err(job) = worker.process_job(job).await { + if let Err(job) = worker.process(job).await { error!("Worker has hung up"); self.storage.return_job(job.unexecuted()).await? } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index c0e5254..8585e58 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -6,7 +6,7 @@ use uuid::Uuid; #[async_trait::async_trait] pub trait Worker { - async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo>; + async fn process(&self, job: JobInfo) -> Result<(), JobInfo>; fn id(&self) -> Uuid; @@ -22,7 +22,7 @@ pub(crate) struct LocalWorkerHandle { #[async_trait::async_trait] impl Worker for LocalWorkerHandle { - async fn process_job(&self, job: JobInfo) -> Result<(), JobInfo> { + async fn process(&self, job: JobInfo) -> Result<(), JobInfo> { match self.tx.clone().send(job).await { Err(e) => { error!("Unable to send job"); @@ -65,7 +65,7 @@ pub(crate) fn local_worker( return; } while let Some(job) = rx.recv().await { - let return_job = processors.process_job(job).await; + let return_job = processors.process(job).await; if let Err(e) = server.return_job(return_job).await { error!("Error returning job, {}", e); diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index ab2750e..c4ed378 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.7.0" +version = "0.8.0-alpha.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -14,7 +14,7 @@ default = [] with-actix = ["actix", "tokio"] [dependencies] -actix = { version = "0.10.0-alpha.1", optional = true } +actix = { version = "0.10.0-alpha.2", optional = true } anyhow = "1.0" async-trait = "0.1.24" chrono = { version = "0.4", features = ["serde"] } diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index fef73ce..ed1dc89 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -1,4 +1,4 @@ -use crate::{Backoff, Job, MaxRetries, Processor}; +use crate::{Backoff, Job, MaxRetries}; use anyhow::Error; use log::error; use serde::{de::DeserializeOwned, ser::Serialize}; @@ -10,16 +10,47 @@ use tokio::sync::oneshot; /// This trait is specific to Actix, and will automatically implement the Job trait with the /// proper translation from ?Send futures to Send futures pub trait ActixJob: Serialize + DeserializeOwned + 'static { - /// The processor this job is associated with. The job's processor can be used to create a - /// JobInfo from a job, which is used to serialize the job into a storage mechanism. - type Processor: Processor; - /// The application state provided to this job at runtime. type State: Clone + 'static; /// The future returned by this job + /// + /// Importantly, this Future does not require Send type Future: Future>; + /// The name of the job + /// + /// This name must be unique!!! + const NAME: &'static str; + + /// The name of the default queue for this job + /// + /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, + /// the job will never be processed. + const QUEUE: &'static str = "default"; + + /// Define the default number of retries for this job + /// + /// Defaults to Count(5) + /// Jobs can override + const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); + + /// Define the default backoff strategy for this job + /// + /// Defaults to Exponential(2) + /// Jobs can override + const BACKOFF: Backoff = Backoff::Exponential(2); + + /// Define the maximum number of milliseconds a job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + /// + /// Defaults to 15 seconds + /// Jobs can override + const TIMEOUT: i64 = 15_000; + /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more @@ -31,26 +62,22 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { /// an actor in an actix-based system. fn run(self, state: Self::State) -> Self::Future; - /// If this job should not use the default queue for its processor, this can be overridden in + /// If this job should not use it's default queue, this can be overridden in /// user-code. - /// - /// Jobs will only be processed by processors that are registered, and if a queue is supplied - /// here that is not associated with a valid processor for this job, it will never be - /// processed. - fn queue(&self) -> Option<&str> { - None + fn queue(&self) -> &str { + Self::QUEUE } - /// If this job should not use the default maximum retry count for its processor, this can be + /// If this job should not use it's default maximum retry count, this can be /// overridden in user-code. - fn max_retries(&self) -> Option { - None + fn max_retries(&self) -> MaxRetries { + Self::MAX_RETRIES } - /// If this job should not use the default backoff strategy for its processor, this can be + /// If this job should not use it's default backoff strategy, this can be /// overridden in user-code. - fn backoff_strategy(&self) -> Option { - None + fn backoff_strategy(&self) -> Backoff { + Self::BACKOFF } /// Define the maximum number of milliseconds this job should be allowed to run before being @@ -58,8 +85,8 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> Option { - None + fn timeout(&self) -> i64 { + Self::TIMEOUT } } @@ -67,10 +94,11 @@ impl Job for T where T: ActixJob, { - type Processor = T::Processor; type State = T::State; type Future = Pin> + Send>>; + const NAME: &'static str = ::NAME; + fn run(self, state: Self::State) -> Self::Future { let (tx, rx) = oneshot::channel(); @@ -83,19 +111,19 @@ where Box::pin(async move { rx.await? }) } - fn queue(&self) -> Option<&str> { + fn queue(&self) -> &str { ActixJob::queue(self) } - fn max_retries(&self) -> Option { + fn max_retries(&self) -> MaxRetries { ActixJob::max_retries(self) } - fn backoff_strategy(&self) -> Option { + fn backoff_strategy(&self) -> Backoff { ActixJob::backoff_strategy(self) } - fn timeout(&self) -> Option { + fn timeout(&self) -> i64 { ActixJob::timeout(self) } } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index f55d414..e0c5b1b 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,20 +1,89 @@ -use crate::{Backoff, MaxRetries, Processor}; +use crate::{Backoff, JobError, MaxRetries, NewJobInfo}; use anyhow::Error; +use chrono::{offset::Utc, DateTime}; use serde::{de::DeserializeOwned, ser::Serialize}; -use std::future::Future; +use serde_json::Value; +use std::{future::Future, pin::Pin}; /// The Job trait defines parameters pertaining to an instance of background job +/// +/// Jobs are defnitions of work to be executed. +/// +/// The simplest implementation defines the job's State and Future types, NAME contant, and +/// run method. +/// +/// ### Example +/// +/// ```rust +/// use anyhow::Error; +/// use background_jobs_core::{Job, new_job}; +/// use futures::future::{ok, Ready}; +/// use log::info; +/// +/// #[derive(serde::Deserialize, serde::Serialize)] +/// struct MyJob { +/// count: i64, +/// } +/// +/// impl Job for MyJob { +/// type State = (); +/// type Future = Ready>; +/// +/// const NAME: &'static str = "MyJob"; +/// +/// fn run(self, _: Self::State) -> Self::Future { +/// info!("Processing {}", self.count); +/// +/// ok(()) +/// } +/// } +/// +/// fn main() -> Result<(), Error> { +/// let job = new_job(MyJob { count: 1234 })?; +/// +/// Ok(()) +/// } +/// ``` pub trait Job: Serialize + DeserializeOwned + 'static { - /// The processor this job is associated with. The job's processor can be used to create a - /// JobInfo from a job, which is used to serialize the job into a storage mechanism. - type Processor: Processor; - /// The application state provided to this job at runtime. type State: Clone + 'static; /// The future returned by this job type Future: Future> + Send; + /// The name of the job + /// + /// This name must be unique!!! + const NAME: &'static str; + + /// The name of the default queue for this job + /// + /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, + /// the job will never be processed. + const QUEUE: &'static str = "default"; + + /// Define the default number of retries for this job + /// + /// Defaults to Count(5) + /// Jobs can override + const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); + + /// Define the default backoff strategy for this job + /// + /// Defaults to Exponential(2) + /// Jobs can override + const BACKOFF: Backoff = Backoff::Exponential(2); + + /// Define the maximum number of milliseconds a job should be allowed to run before being + /// considered dead. + /// + /// This is important for allowing the job server to reap processes that were started but never + /// completed. + /// + /// Defaults to 15 seconds + /// Jobs can override + const TIMEOUT: i64 = 15_000; + /// Users of this library must define what it means to run a job. /// /// This should contain all the logic needed to complete a job. If that means queuing more @@ -26,26 +95,22 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// an actor in an actix-based system. fn run(self, state: Self::State) -> Self::Future; - /// If this job should not use the default queue for its processor, this can be overridden in + /// If this job should not use it's default queue, this can be overridden in /// user-code. - /// - /// Jobs will only be processed by processors that are registered, and if a queue is supplied - /// here that is not associated with a valid processor for this job, it will never be - /// processed. - fn queue(&self) -> Option<&str> { - None + fn queue(&self) -> &str { + Self::QUEUE } - /// If this job should not use the default maximum retry count for its processor, this can be + /// If this job should not use it's default maximum retry count, this can be /// overridden in user-code. - fn max_retries(&self) -> Option { - None + fn max_retries(&self) -> MaxRetries { + Self::MAX_RETRIES } - /// If this job should not use the default backoff strategy for its processor, this can be + /// If this job should not use it's default backoff strategy, this can be /// overridden in user-code. - fn backoff_strategy(&self) -> Option { - None + fn backoff_strategy(&self) -> Backoff { + Self::BACKOFF } /// Define the maximum number of milliseconds this job should be allowed to run before being @@ -53,7 +118,56 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// /// This is important for allowing the job server to reap processes that were started but never /// completed. - fn timeout(&self) -> Option { - None + fn timeout(&self) -> i64 { + Self::TIMEOUT } } + +/// A provided method to create a new JobInfo from provided arguments +pub fn new_job(job: J) -> Result +where + J: Job, +{ + let job = NewJobInfo::new( + J::NAME.to_owned(), + job.queue().to_owned(), + job.max_retries(), + job.backoff_strategy(), + job.timeout(), + serde_json::to_value(job).map_err(|_| ToJson)?, + ); + + Ok(job) +} + +/// Create a NewJobInfo to schedule a job to be performed after a certain time +pub fn new_scheduled_job(job: J, after: DateTime) -> Result +where + J: Job, +{ + let mut job = new_job(job)?; + job.schedule(after); + + Ok(job) +} + +/// A provided method to coerce arguments into the expected type and run the job +pub fn process( + args: Value, + state: J::State, +) -> Pin> + Send>> +where + J: Job, +{ + let res = serde_json::from_value::(args).map(move |job| job.run(state)); + + Box::pin(async move { + res?.await?; + + Ok(()) + }) +} + +#[derive(Clone, Debug, thiserror::Error)] +#[error("Failed to to turn job into value")] +pub struct ToJson; diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index 5617043..a8b4f34 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -26,10 +26,10 @@ impl ReturnJobInfo { } } - pub(crate) fn missing_processor(id: Uuid) -> Self { + pub(crate) fn unregistered(id: Uuid) -> Self { ReturnJobInfo { id, - result: JobResult::MissingProcessor, + result: JobResult::Unregistered, } } } @@ -37,8 +37,8 @@ impl ReturnJobInfo { #[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] /// Information about a newly created job pub struct NewJobInfo { - /// Name of the processor that should handle this job - processor: String, + /// Name of the job + name: String, /// Name of the queue that this job is a part of queue: String, @@ -67,15 +67,15 @@ impl NewJobInfo { } pub(crate) fn new( - processor: String, + name: String, queue: String, - args: Value, max_retries: MaxRetries, backoff_strategy: Backoff, timeout: i64, + args: Value, ) -> Self { NewJobInfo { - processor, + name, queue, args, max_retries, @@ -98,7 +98,7 @@ impl NewJobInfo { pub(crate) fn with_id(self, id: Uuid) -> JobInfo { JobInfo { id, - processor: self.processor, + name: self.name, queue: self.queue, status: JobStatus::Pending, args: self.args, @@ -116,15 +116,13 @@ impl NewJobInfo { /// Metadata pertaining to a job that exists within the background_jobs system /// /// Although exposed publically, this type should only really be handled by the library itself, and -/// is impossible to create outside of a -/// [Processor](https://docs.rs/background-jobs/0.4.0/background_jobs/trait.Processor.html)'s -/// new_job method. +/// is impossible to create outside of the new_job method. pub struct JobInfo { /// ID of the job id: Uuid, - /// Name of the processor that should handle this job - processor: String, + /// Name of the job + name: String, /// Name of the queue that this job is a part of queue: String, @@ -166,8 +164,8 @@ impl JobInfo { self.updated_at = Utc::now(); } - pub(crate) fn processor(&self) -> &str { - &self.processor + pub(crate) fn name(&self) -> &str { + &self.name } pub(crate) fn args(&self) -> Value { diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 7bffecc..40309d9 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -12,15 +12,13 @@ use anyhow::Error; mod actix_job; mod job; mod job_info; -mod processor; mod processor_map; mod stats; mod storage; pub use crate::{ - job::Job, + job::{new_job, new_scheduled_job, process, Job}, job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, - processor::Processor, processor_map::{CachedProcessorMap, ProcessorMap}, stats::{JobStat, Stats}, storage::{memory_storage, Storage}, @@ -30,7 +28,7 @@ pub use crate::{ pub use actix_job::ActixJob; #[derive(Debug, thiserror::Error)] -/// The error type returned by a `Processor`'s `process` method +/// The error type returned by the `process` method pub enum JobError { /// Some error occurred while processing the job #[error("Error performing job: {0}")] @@ -40,9 +38,9 @@ pub enum JobError { #[error("Could not make JSON value from arguments")] Json, - /// No processor was present to handle a given job - #[error("No processor available for job")] - MissingProcessor, + /// This job type was not registered for this client + #[error("This job type was not registered for the client")] + Unregistered, } #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] @@ -54,8 +52,8 @@ pub enum JobResult { /// The job failed Failure, - /// There was no processor to run the job - MissingProcessor, + /// The worker had no concept of this job + Unregistered, /// The worker requesting this job closed Unexecuted, @@ -72,9 +70,9 @@ impl JobResult { JobResult::Failure } - /// Indicate that the job's processor is not present - pub fn missing_processor() -> Self { - JobResult::MissingProcessor + /// Indicate that the job was not registered for this worker + pub fn unregistered() -> Self { + JobResult::Unregistered } /// Check if the job failed @@ -88,8 +86,8 @@ impl JobResult { } /// Check if the job is missing it's processor - pub fn is_missing_processor(&self) -> bool { - *self == JobResult::MissingProcessor + pub fn is_unregistered(&self) -> bool { + *self == JobResult::Unregistered } /// Check if the job was returned without an execution attempt diff --git a/jobs-core/src/processor.rs b/jobs-core/src/processor.rs deleted file mode 100644 index fb9b225..0000000 --- a/jobs-core/src/processor.rs +++ /dev/null @@ -1,180 +0,0 @@ -use crate::{Backoff, Job, JobError, MaxRetries, NewJobInfo}; -use anyhow::Error; -use chrono::{offset::Utc, DateTime}; -use serde_json::Value; -use std::{future::Future, pin::Pin}; - -/// ## The Processor trait -/// -/// Processors define the logic spawning jobs such as -/// - The job's name -/// - The job's default queue -/// - The job's default maximum number of retries -/// - The job's [backoff -/// strategy](https://docs.rs/background-jobs/0.4.0/background_jobs/enum.Backoff.html) -/// -/// Processors also provide the default mechanism for running a job, and the only mechanism for -/// creating a -/// [JobInfo](https://docs.rs/background-jobs-core/0.4.0/background_jobs_core/struct.JobInfo.html), -/// which is the type required for queuing jobs to be executed. -/// -/// ### Example -/// -/// ```rust -/// use anyhow::Error; -/// use background_jobs_core::{Job, Processor}; -/// use futures::future::{ok, Ready}; -/// use log::info; -/// -/// #[derive(serde::Deserialize, serde::Serialize)] -/// struct MyJob { -/// count: i32, -/// } -/// -/// impl Job for MyJob { -/// type Processor = MyProcessor; -/// type State = (); -/// type Future = Ready>; -/// -/// fn run(self, _state: Self::State) -> Self::Future { -/// info!("Processing {}", self.count); -/// -/// ok(()) -/// } -/// } -/// -/// #[derive(Clone)] -/// struct MyProcessor; -/// -/// impl Processor for MyProcessor { -/// type Job = MyJob; -/// -/// const NAME: &'static str = "IncrementProcessor"; -/// const QUEUE: &'static str = "default"; -/// } -/// -/// fn main() -> Result<(), Error> { -/// let job = MyProcessor::new_job(MyJob { count: 1234 })?; -/// -/// Ok(()) -/// } -/// ``` -pub trait Processor: Clone { - /// The job this processor will process - type Job: Job + 'static; - - /// The name of the processor - /// - /// This name must be unique!!! It is used to look up which processor should handle a job - const NAME: &'static str; - - /// The name of the default queue for jobs created with this processor - /// - /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, - /// the job will never be processed. - const QUEUE: &'static str; - - /// Define the default number of retries for a given processor - /// - /// Defaults to Count(5) - /// Jobs can override - const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); - - /// Define the default backoff strategy for a given processor - /// - /// Defaults to Exponential(2) - /// Jobs can override - const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); - - /// Define the maximum number of milliseconds a job should be allowed to run before being - /// considered dead. - /// - /// This is important for allowing the job server to reap processes that were started but never - /// completed. - /// - /// Defaults to 15 seconds - /// Jobs can override - const TIMEOUT: i64 = 15_000; - - /// A provided method to create a new JobInfo from provided arguments - /// - /// This is required for spawning jobs, since it enforces the relationship between the job and - /// the Processor that should handle it. - fn new_job(job: Self::Job) -> Result { - let queue = job.queue().unwrap_or(Self::QUEUE).to_owned(); - let max_retries = job.max_retries().unwrap_or(Self::MAX_RETRIES); - let backoff_strategy = job.backoff_strategy().unwrap_or(Self::BACKOFF_STRATEGY); - let timeout = job.timeout().unwrap_or(Self::TIMEOUT); - - let job = NewJobInfo::new( - Self::NAME.to_owned(), - queue, - serde_json::to_value(job).map_err(|_| ToJson)?, - max_retries, - backoff_strategy, - timeout, - ); - - Ok(job) - } - - /// Create a JobInfo to schedule a job to be performed after a certain time - fn new_scheduled_job(job: Self::Job, after: DateTime) -> Result { - let mut job = Self::new_job(job)?; - job.schedule(after); - - Ok(job) - } - - /// A provided method to coerce arguments into the expected type and run the job - /// - /// Advanced users may want to override this method in order to provide their own custom - /// before/after logic for certain job processors - /// - /// The state passed into this method is initialized at the start of the application. The state - /// argument could be useful for containing a hook into something like r2d2, or the address of - /// an actor in an actix-based system. - /// - /// ```rust,ignore - /// fn process( - /// &self, - /// args: Value, - /// state: S - /// ) -> Pin> + Send>> { - /// let res = serde_json::from_value::(args); - /// - /// Box::pin(async move { - /// let job = res.map_err(|_| JobError::Json)?; - /// // Perform some custom pre-job locic - /// - /// job.run(state).await.map_err(JobError::Processing)?; - /// - /// // Perform some custom post-job logic - /// Ok(()) - /// }) - /// } - /// ``` - /// - /// Patterns like this could be useful if you want to use the same job type for multiple - /// scenarios. Defining the `process` method for multiple `Processor`s with different - /// before/after logic for the same [`Job`] supported. - fn process( - &self, - args: Value, - state: ::State, - ) -> Pin> + Send>> { - // Call run on the job here because State isn't Send, but the future produced by job IS - // Send - let res = serde_json::from_value::(args).map(move |job| job.run(state)); - - Box::pin(async move { - res?.await?; - - Ok(()) - }) - } -} - -#[derive(Clone, Debug, thiserror::Error)] -#[error("Failed to to turn job into value")] -pub struct ToJson; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 9df5cd3..52ad483 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,21 +1,20 @@ -use crate::{Job, JobError, JobInfo, Processor, ReturnJobInfo}; +use crate::{Job, JobError, JobInfo, ReturnJobInfo}; use log::{error, info}; use serde_json::Value; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; /// A generic function that processes a job /// -/// Instead of storing [`Processor`] type directly, the [`ProcessorMap`] -/// struct stores these `ProcessFn` types that don't expose differences in Job types. +/// ProcessorMap stores these `ProcessFn` types that don't expose differences in Job types. pub type ProcessFn = Arc< dyn Fn(Value, S) -> Pin> + Send>> + Send + Sync, >; pub type StateFn = Arc S + Send + Sync>; -/// A type for storing the relationships between processor names and the processor itself +/// A type for storing the relationships between job names and the job itself /// -/// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an +/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an /// application before workers are spawned in order to handle queued jobs. #[derive(Clone)] pub struct ProcessorMap { @@ -23,10 +22,10 @@ pub struct ProcessorMap { state_fn: StateFn, } -/// A type for storing the relationships between processor names and the processor itself, with the +/// A type for storing the relationships between job names and the job itself, with the /// state pre-cached instead of being generated from the state function each time /// -/// [`Processor`s] must be registered with the `ProcessorMap` in the initialization phase of an +/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an /// application before workers are spawned in order to handle queued jobs. pub struct CachedProcessorMap { inner: HashMap>, @@ -49,18 +48,17 @@ where } } - /// Register a [`Processor`] with this `ProcessorMap`. + /// Register a [`Job`] with this `ProcessorMap`. /// - /// `ProcessorMap`s are useless if no processors are registerd before workers are spawned, so + /// `ProcessorMap`s are useless if no jobs are registerd before workers are spawned, so /// make sure to register all your processors up-front. - pub fn register_processor(&mut self, processor: P) + pub fn register(&mut self) where - P: Processor + Sync + Send + 'static, J: Job, { self.inner.insert( - P::NAME.to_owned(), - Arc::new(move |value, state| processor.process(value, state)), + J::NAME.to_owned(), + Arc::new(move |value, state| crate::process::(value, state)), ); } @@ -76,17 +74,17 @@ where /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. - pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo { + pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { let opt = self .inner - .get(job.processor()) - .map(|processor| process(processor, (self.state_fn)(), job.clone())); + .get(job.name()) + .map(|name| process(name, (self.state_fn)(), job.clone())); if let Some(fut) = opt { fut.await } else { - error!("Processor {} not present", job.processor()); - ReturnJobInfo::missing_processor(job.id()) + error!("Job {} not registered", job.name()); + ReturnJobInfo::unregistered(job.id()) } } } @@ -99,12 +97,12 @@ where /// /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. - pub async fn process_job(&self, job: JobInfo) -> ReturnJobInfo { - if let Some(processor) = self.inner.get(job.processor()) { - process(processor, self.state.clone(), job).await + pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { + if let Some(name) = self.inner.get(job.name()) { + process(name, self.state.clone(), job).await } else { - error!("Processor {} not present", job.processor()); - ReturnJobInfo::missing_processor(job.id()) + error!("Job {} not registered", job.name()); + ReturnJobInfo::unregistered(job.id()) } } } @@ -112,15 +110,15 @@ where async fn process(process_fn: &ProcessFn, state: S, job: JobInfo) -> ReturnJobInfo { let args = job.args(); let id = job.id(); - let processor = job.processor().to_owned(); + let name = job.name().to_owned(); match process_fn(args, state).await { Ok(_) => { - info!("Job {} completed, {}", id, processor); + info!("Job {} completed, {}", id, name); ReturnJobInfo::pass(id) } Err(e) => { - info!("Job {} errored, {}, {}", id, processor, e); + info!("Job {} errored, {}, {}", id, name, e); ReturnJobInfo::fail(id) } } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 36cb480..62eebdc 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -116,7 +116,7 @@ pub trait Storage: Clone + Send { } else { Ok(()) } - } else if result.is_missing_processor() || result.is_unexecuted() { + } else if result.is_unregistered() || result.is_unexecuted() { if let Some(mut job) = self.fetch_job(id).await? { job.pending(); self.queue_job(job.queue(), id).await?; diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 25eaac9..c913e3c 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-sled-storage" description = "Sled storage backend for background-jobs" -version = "0.4.0-alpha.0" +version = "0.8.0-alpha.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -13,7 +13,7 @@ edition = "2018" [dependencies] actix-threadpool = "0.3.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.7", path = "../jobs-core" } +background-jobs-core = { version = "0.8.0-alpha.0", path = "../jobs-core" } chrono = "0.4" sled-extensions = { version = "0.3.0-alpha.0", features = ["bincode", "cbor"], git = "https://git.asonix.dog/Aardwolf/sled-extensions" } thiserror = "1.0" diff --git a/src/lib.rs b/src/lib.rs index dea4633..0675b41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,10 +61,11 @@ //! } //! //! impl Job for MyJob { -//! type Processor = MyProcessor; // We'll define this later //! type State = (); //! type Future = Ready>; //! +//! const NAME: &'static str = "MyJob"; +//! //! fn run(self, _: Self::State) -> Self::Future { //! println!("args: {:?}", self); //! @@ -97,10 +98,11 @@ //! } //! //! impl Job for MyJob { -//! type Processor = MyProcessor; //! type State = MyState; //! type Future = Ready>; //! +//! const NAME: &'static str = "MyJob"; +//! //! fn run(self, state: Self::State) -> Self::Future { //! info!("{}: args, {:?}", state.app_name, self); //! @@ -109,47 +111,6 @@ //! } //! ``` //! -//! #### Next, define a Processor. -//! Processors are types that define default attributes for jobs, as well as containing some logic -//! used internally to perform the job. Processors must implement `Proccessor` and `Clone`. -//! -//! ```rust,ignore -//! use background_jobs::{Backoff, MaxRetries, Processor}; -//! -//! const DEFAULT_QUEUE: &'static str = "default"; -//! -//! #[derive(Clone, Debug)] -//! pub struct MyProcessor; -//! -//! impl Processor for MyProcessor { -//! // The kind of job this processor should execute -//! type Job = MyJob; -//! -//! // The name of the processor. It is super important that each processor has a unique name, -//! // because otherwise one processor will overwrite another processor when they're being -//! // registered. -//! const NAME: &'static str = "MyProcessor"; -//! -//! // The queue that this processor belongs to -//! // -//! // Workers have the option to subscribe to specific queues, so this is important to -//! // determine which worker will call the processor -//! // -//! // Jobs can optionally override the queue they're spawned on -//! const QUEUE: &'static str = DEFAULT_QUEUE; -//! -//! // The number of times background-jobs should try to retry a job before giving up -//! // -//! // Jobs can optionally override this value -//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); -//! -//! // The logic to determine how often to retry this job if it fails -//! // -//! // Jobs can optionally override this value -//! const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); -//! } -//! ``` -//! //! #### Running jobs //! By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the //! `background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for @@ -179,14 +140,14 @@ //! //! // Configure and start our workers //! WorkerConfig::new(move || MyState::new("My App")) -//! .register(MyProcessor) +//! .register::() //! .set_processor_count(DEFAULT_QUEUE, 16) //! .start(queue_handle.clone()); //! //! // Queue our jobs -//! queue_handle.queue::(MyJob::new(1, 2))?; -//! queue_handle.queue::(MyJob::new(3, 4))?; -//! queue_handle.queue::(MyJob::new(5, 6))?; +//! queue_handle.queue(MyJob::new(1, 2))?; +//! queue_handle.queue(MyJob::new(3, 4))?; +//! queue_handle.queue(MyJob::new(5, 6))?; //! //! // Block on Actix //! actix_rt::signal::ctrl_c().await?; @@ -200,12 +161,10 @@ //! //! #### Bringing your own server/worker implementation //! If you want to create your own jobs processor based on this idea, you can depend on the -//! `background-jobs-core` crate, which provides the Processor and Job traits, as well as some +//! `background-jobs-core` crate, which provides the Job trait, as well as some //! other useful types for implementing a jobs processor and job store. -pub use background_jobs_core::{ - memory_storage, Backoff, Job, JobStat, MaxRetries, Processor, Stats, -}; +pub use background_jobs_core::{memory_storage, Backoff, Job, JobStat, MaxRetries, Stats}; #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig};