diff --git a/README.md b/README.md index 5ca8700..40f769f 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ might not be the best experience. [dependencies] actix-rt = "2.2.0" background-jobs = "0.15.0" -anyhow = "1.0" serde = { version = "1.0", features = ["derive"] } ``` @@ -24,8 +23,7 @@ Jobs are a combination of the data required to perform an operation, and the log operation. They implement the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. ```rust -use background_jobs::Job; -use anyhow::Error; +use background_jobs::{Job, BoxError}; use std::future::{ready, Ready}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -45,7 +43,8 @@ impl MyJob { impl Job for MyJob { type State = (); - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; const NAME: &'static str = "MyJob"; @@ -80,7 +79,8 @@ impl MyState { impl Job for MyJob { type State = MyState; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being @@ -126,11 +126,10 @@ With that out of the way, back to the examples: ##### Main ```rust -use background_jobs::{create_server, WorkerConfig}; -use anyhow::Error; +use background_jobs::{create_server, actix::WorkerConfig, BoxError}; #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { // Set up our Storage // For this example, we use the default in-memory storage mechanism use background_jobs::memory_storage::{ActixTimer, Storage}; diff --git a/examples/basic-example/Cargo.toml b/examples/basic-example/Cargo.toml index 787a960..6cfb25e 100644 --- a/examples/basic-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs index ae33766..c507e67 100644 --- a/examples/basic-example/src/main.rs +++ b/examples/basic-example/src/main.rs @@ -1,9 +1,8 @@ use actix_rt::Arbiter; -use anyhow::Error; use background_jobs::{ actix::{Spawner, WorkerConfig}, memory_storage::{ActixTimer, Storage}, - MaxRetries, UnsendJob as Job, + BoxError, MaxRetries, UnsendJob as Job, }; use std::{ future::{ready, Ready}, @@ -26,7 +25,7 @@ pub struct MyJob { } #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -84,8 +83,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, diff --git a/examples/error-example/Cargo.toml b/examples/error-example/Cargo.toml index 4e66133..a374a4d 100644 --- a/examples/error-example/Cargo.toml +++ b/examples/error-example/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled", diff --git a/examples/error-example/src/main.rs b/examples/error-example/src/main.rs index 5aed057..52e458d 100644 --- a/examples/error-example/src/main.rs +++ b/examples/error-example/src/main.rs @@ -1,6 +1,5 @@ use actix_rt::Arbiter; -use anyhow::Error; -use background_jobs::{actix::WorkerConfig, sled::Storage, Job, MaxRetries}; +use background_jobs::{actix::WorkerConfig, sled::Storage, BoxError, Job, MaxRetries}; use std::{ future::{ready, Ready}, time::{Duration, SystemTime}, @@ -25,7 +24,7 @@ pub struct MyJob { pub struct ErroringJob; #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -87,8 +86,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being @@ -115,10 +114,19 @@ impl Job for MyJob { } } +#[derive(Debug)] +pub struct Boom; +impl std::fmt::Display for Boom { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "boom") + } +} +impl std::error::Error for Boom {} + impl Job for ErroringJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = Boom; + type Future = Ready>; const NAME: &'static str = "ErroringJob"; @@ -127,6 +135,6 @@ impl Job for ErroringJob { const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); fn run(self, _: MyState) -> Self::Future { - ready(Err(anyhow::anyhow!("boom"))) + ready(Err(Boom)) } } diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml index fccbcb6..1d86988 100644 --- a/examples/long-example/Cargo.toml +++ b/examples/long-example/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/examples/long-example/src/main.rs b/examples/long-example/src/main.rs index 6e9568b..aa5540d 100644 --- a/examples/long-example/src/main.rs +++ b/examples/long-example/src/main.rs @@ -1,9 +1,8 @@ use actix_rt::Arbiter; -use anyhow::Error; use background_jobs::{ actix::{Spawner, WorkerConfig}, sled::Storage, - MaxRetries, UnsendJob as Job, + BoxError, MaxRetries, UnsendJob as Job, }; use std::{ future::{ready, Future, Ready}, @@ -30,7 +29,7 @@ pub struct MyJob { pub struct LongJob; #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -90,8 +89,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, @@ -121,8 +120,8 @@ impl Job for MyJob { impl Job for LongJob { type State = MyState; - type Error = Error; - type Future = Pin>>>; + type Error = BoxError; + type Future = Pin>>>; type Spawner = Spawner; const NAME: &'static str = "LongJob"; diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml index 85a2a53..de98b1b 100644 --- a/examples/managed-example/Cargo.toml +++ b/examples/managed-example/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index 1b974c3..b26b9d7 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -1,9 +1,8 @@ use actix_rt::Arbiter; -use anyhow::Error; use background_jobs::{ actix::{Spawner, WorkerConfig}, sled::Storage, - MaxRetries, UnsendJob as Job, + BoxError, MaxRetries, UnsendJob as Job, }; use std::{ future::{ready, Ready}, @@ -29,7 +28,7 @@ pub struct MyJob { pub struct StopJob; #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -102,8 +101,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, @@ -133,8 +132,8 @@ impl Job for MyJob { impl Job for StopJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; type Spawner = Spawner; const NAME: &'static str = "StopJob"; diff --git a/examples/metrics-example/Cargo.toml b/examples/metrics-example/Cargo.toml index dbe91ff..6dcae36 100644 --- a/examples/metrics-example/Cargo.toml +++ b/examples/metrics-example/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/examples/metrics-example/src/main.rs b/examples/metrics-example/src/main.rs index 327d520..fe95ee6 100644 --- a/examples/metrics-example/src/main.rs +++ b/examples/metrics-example/src/main.rs @@ -1,10 +1,9 @@ use actix_rt::Arbiter; -use anyhow::Error; use background_jobs::{ actix::{Spawner, WorkerConfig}, metrics::MetricsStorage, sled::Storage, - MaxRetries, UnsendJob as Job, + BoxError, MaxRetries, UnsendJob as Job, }; use std::{future::Future, pin::Pin, time::Duration}; use tracing::info; @@ -24,7 +23,7 @@ pub struct MyJob { } #[actix_rt::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn")); // Install the metrics subscriber @@ -89,8 +88,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Pin> + 'static>>; + type Error = BoxError; + type Future = Pin> + 'static>>; type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml index b9cf4a4..f54b470 100644 --- a/examples/panic-example/Cargo.toml +++ b/examples/panic-example/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio" ] } time = "0.3" tokio = { version = "1", features = ["full"] } diff --git a/examples/panic-example/src/main.rs b/examples/panic-example/src/main.rs index 523742d..5146f04 100644 --- a/examples/panic-example/src/main.rs +++ b/examples/panic-example/src/main.rs @@ -1,5 +1,4 @@ -use anyhow::Error; -use background_jobs::{sled::Storage, tokio::WorkerConfig, Job, MaxRetries}; +use background_jobs::{sled::Storage, tokio::WorkerConfig, BoxError, Job, MaxRetries}; use std::{ future::{ready, Ready}, time::{Duration, SystemTime}, @@ -24,7 +23,7 @@ pub struct MyJob { pub struct PanickingJob; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -83,8 +82,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being @@ -113,8 +112,8 @@ impl Job for MyJob { impl Job for PanickingJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; const NAME: &'static str = "PanickingJob"; diff --git a/examples/postgres-example/Cargo.toml b/examples/postgres-example/Cargo.toml index 990742b..09ef39a 100644 --- a/examples/postgres-example/Cargo.toml +++ b/examples/postgres-example/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" [dependencies] actix-rt = "2.9.0" -anyhow = "1.0.79" background-jobs = { version = "0.17.0", features = ["postgres"], path = "../.." } serde = { version = "1.0.195", features = ["derive"] } tokio = { version = "1.35.1", features = ["full"] } diff --git a/examples/postgres-example/src/main.rs b/examples/postgres-example/src/main.rs index 8175f13..2c9c174 100644 --- a/examples/postgres-example/src/main.rs +++ b/examples/postgres-example/src/main.rs @@ -2,7 +2,7 @@ use actix_rt::Arbiter; use background_jobs::{ actix::{Spawner, WorkerConfig}, postgres::Storage, - MaxRetries, UnsendJob as Job, + BoxError, MaxRetries, UnsendJob as Job, }; // use background_jobs_sled_storage::Storage; use std::{ @@ -26,7 +26,7 @@ pub struct MyJob { } #[actix_rt::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -90,8 +90,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = anyhow::Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; type Spawner = Spawner; // The name of the job. It is super important that each job has a unique name, diff --git a/examples/tokio-example/Cargo.toml b/examples/tokio-example/Cargo.toml index 7e370c1..9728eb7 100644 --- a/examples/tokio-example/Cargo.toml +++ b/examples/tokio-example/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0" background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio"] } tokio = { version = "1", features = ["full"] } tracing = "0.1" diff --git a/examples/tokio-example/src/main.rs b/examples/tokio-example/src/main.rs index 9261948..0dfa09b 100644 --- a/examples/tokio-example/src/main.rs +++ b/examples/tokio-example/src/main.rs @@ -1,8 +1,7 @@ -use anyhow::Error; use background_jobs::{ memory_storage::{Storage, TokioTimer}, tokio::WorkerConfig, - Job, MaxRetries, + BoxError, Job, MaxRetries, }; use std::{ future::{ready, Ready}, @@ -25,7 +24,7 @@ pub struct MyJob { } #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), BoxError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::fmt::fmt() @@ -79,8 +78,8 @@ impl MyJob { impl Job for MyJob { type State = MyState; - type Error = Error; - type Future = Ready>; + type Error = BoxError; + type Future = Ready>; // The name of the job. It is super important that each job has a unique name, // because otherwise one job will overwrite another job when they're being diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 3065a0f..d49c338 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -11,7 +11,6 @@ edition = "2021" [dependencies] actix-rt = "2.5.1" -anyhow = "1.0" async-trait = "0.1.24" background-jobs-core = { version = "0.17.0", path = "../jobs-core" } metrics = "0.22.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 4c1613c..6c7e2de 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -12,8 +12,7 @@ //! //! ### Example //! ```rust -//! use anyhow::Error; -//! use background_jobs_core::{Backoff, Job, MaxRetries}; +//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError}; //! use background_jobs_actix::{ActixTimer, WorkerConfig}; //! use std::future::{ready, Ready}; //! @@ -31,7 +30,7 @@ //! } //! //! #[actix_rt::main] -//! async fn main() -> Result<(), Error> { +//! async fn main() -> Result<(), BoxError> { //! // Set up our Storage //! // For this example, we use the default in-memory storage mechanism //! use background_jobs_core::memory_storage::Storage; @@ -72,7 +71,7 @@ //! //! impl Job for MyJob { //! type State = MyState; -//! type Future = Ready>; +//! type Future = Ready>; //! //! // The name of the job. It is super important that each job has a unique name, //! // because otherwise one job will overwrite another job when they're being @@ -114,9 +113,8 @@ //! ``` use actix_rt::{Arbiter, ArbiterHandle}; -use anyhow::Error; use background_jobs_core::{ - memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage, + memory_storage::Timer, new_job, new_scheduled_job, BoxError, Job, ProcessorMap, Storage, }; use std::{ collections::BTreeMap, @@ -469,7 +467,7 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute whenever a worker for the /// job's queue is free to do so. - pub async fn queue(&self, job: J) -> Result<(), Error> + pub async fn queue(&self, job: J) -> Result<(), BoxError> where J: Job, { @@ -482,7 +480,7 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute after the specified time /// and when a worker for the job's queue is free to do so. - pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> + pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), BoxError> where J: Job, { diff --git a/jobs-actix/src/storage.rs b/jobs-actix/src/storage.rs index 1988de2..5599961 100644 --- a/jobs-actix/src/storage.rs +++ b/jobs-actix/src/storage.rs @@ -1,16 +1,15 @@ -use anyhow::Error; -use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage}; +use background_jobs_core::{BoxError, JobInfo, NewJobInfo, ReturnJobInfo, Storage}; use uuid::Uuid; #[async_trait::async_trait] pub(crate) trait ActixStorage { - async fn push(&self, job: NewJobInfo) -> Result; + async fn push(&self, job: NewJobInfo) -> Result; - async fn pop(&self, queue: &str, runner_id: Uuid) -> Result; + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result; - async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error>; + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError>; - async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error>; + async fn complete(&self, ret: ReturnJobInfo) -> Result<(), BoxError>; } pub(crate) struct StorageWrapper(pub(crate) S) @@ -24,19 +23,19 @@ where S: Storage + Send + Sync, S::Error: Send + Sync + 'static, { - async fn push(&self, job: NewJobInfo) -> Result { + async fn push(&self, job: NewJobInfo) -> Result { Ok(self.0.push(job).await?) } - async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { Ok(self.0.pop(queue, runner_id).await?) } - async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), Error> { + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError> { Ok(self.0.heartbeat(job_id, runner_id).await?) } - async fn complete(&self, ret: ReturnJobInfo) -> Result<(), Error> { + async fn complete(&self, ret: ReturnJobInfo) -> Result<(), BoxError> { self.0.complete(ret).await?; Ok(()) diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 6c0d57a..b4d1d3b 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -18,7 +18,6 @@ completion-logging = [] error-logging = [] [dependencies] -anyhow = "1.0" async-trait = "0.1.24" event-listener = "4" metrics = "0.22.0" diff --git a/jobs-core/src/box_error.rs b/jobs-core/src/box_error.rs new file mode 100644 index 0000000..83c78b1 --- /dev/null +++ b/jobs-core/src/box_error.rs @@ -0,0 +1,71 @@ +/// A simple error box that provides no additional formatting utilities +pub struct BoxError { + error: Box, +} + +impl std::fmt::Debug for BoxError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.error.fmt(f) + } +} + +impl std::fmt::Display for BoxError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.error.fmt(f) + } +} + +impl From for BoxError +where + E: std::error::Error + Send + Sync + 'static, +{ + fn from(error: E) -> Self { + BoxError { + error: Box::new(error), + } + } +} + +impl From for Box { + fn from(value: BoxError) -> Self { + value.error + } +} + +impl From for Box { + fn from(value: BoxError) -> Self { + value.error + } +} + +impl From for Box { + fn from(value: BoxError) -> Self { + value.error + } +} + +impl AsRef for BoxError { + fn as_ref(&self) -> &(dyn std::error::Error + Send + Sync + 'static) { + self.error.as_ref() + } +} + +impl AsRef for BoxError { + fn as_ref(&self) -> &(dyn std::error::Error + Send + 'static) { + self.error.as_ref() + } +} + +impl AsRef for BoxError { + fn as_ref(&self) -> &(dyn std::error::Error + 'static) { + self.error.as_ref() + } +} + +impl std::ops::Deref for BoxError { + type Target = dyn std::error::Error + Send + Sync; + + fn deref(&self) -> &Self::Target { + self.error.as_ref() + } +} diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 2d0bc71..db0d368 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,5 +1,4 @@ -use crate::{Backoff, JobError, MaxRetries, NewJobInfo}; -use anyhow::Error; +use crate::{Backoff, BoxError, JobError, MaxRetries, NewJobInfo}; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value; use std::{future::Future, pin::Pin, time::SystemTime}; @@ -15,8 +14,7 @@ use tracing::{Instrument, Span}; /// ### Example /// /// ```rust -/// use anyhow::Error; -/// use background_jobs_core::{Job, new_job}; +/// use background_jobs_core::{Job, new_job, BoxError}; /// use tracing::info; /// use std::future::{ready, Ready}; /// @@ -27,7 +25,8 @@ use tracing::{Instrument, Span}; /// /// impl Job for MyJob { /// type State = (); -/// type Future = Ready>; +/// type Error = BoxError; +/// type Future = Ready>; /// /// const NAME: &'static str = "MyJob"; /// @@ -38,7 +37,7 @@ use tracing::{Instrument, Span}; /// } /// } /// -/// fn main() -> Result<(), Error> { +/// fn main() -> Result<(), BoxError> { /// let job = new_job(MyJob { count: 1234 })?; /// /// Ok(()) @@ -49,7 +48,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { type State: Clone + 'static; /// The error type this job returns - type Error: Into>; + type Error: Into; /// The future returned by this job type Future: Future> + Send; @@ -130,7 +129,7 @@ pub trait Job: Serialize + DeserializeOwned + 'static { } /// A provided method to create a new JobInfo from provided arguments -pub fn new_job(job: J) -> Result +pub fn new_job(job: J) -> Result where J: Job, { @@ -147,7 +146,7 @@ where } /// Create a NewJobInfo to schedule a job to be performed after a certain time -pub fn new_scheduled_job(job: J, after: SystemTime) -> Result +pub fn new_scheduled_job(job: J, after: SystemTime) -> Result where J: Job, { @@ -178,13 +177,13 @@ where Box::pin(async move { let (fut, span) = res?; - if let Some(span) = span { - fut.instrument(span).await.map_err(Into::into)?; + let res = if let Some(span) = span { + fut.instrument(span).await } else { - fut.await.map_err(Into::into)?; - } + fut.await + }; - Ok(()) + res.map_err(Into::into).map_err(JobError::Processing) }) } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index d1943c7..eae6b1a 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -6,6 +6,7 @@ //! This crate shouldn't be depended on directly, except in the case of implementing a custom jobs //! processor. For a default solution based on Actix and Sled, look at the `background-jobs` crate. +mod box_error; mod catch_unwind; mod job; mod job_info; @@ -14,6 +15,7 @@ mod storage; mod unsend_job; pub use crate::{ + box_error::BoxError, job::{new_job, new_scheduled_job, process, Job}, job_info::{JobInfo, NewJobInfo, ReturnJobInfo}, processor_map::{CachedProcessorMap, ProcessorMap}, @@ -27,7 +29,7 @@ pub use unsend_job::{JoinError, UnsendJob, UnsendSpawner}; pub enum JobError { /// Some error occurred while processing the job #[error("{0}")] - Processing(#[from] Box), + Processing(#[from] BoxError), /// Creating a `Job` type from the provided `serde_json::Value` failed #[error("Could not make JSON value from arguments")] diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index f6e38cd..a25bec3 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -188,10 +188,11 @@ where ReturnJobInfo::pass(id) } Ok(Err(e)) => { - let display = format!("{}", e); - let debug = format!("{:?}", e); + let display = format!("{e}"); span.record("exception.message", &tracing::field::display(&display)); + let debug = format!("{e:?}"); span.record("exception.details", &tracing::field::display(&debug)); + #[cfg(feature = "error-logging")] tracing::warn!("Job {queue}: {name}-{id} errored"); ReturnJobInfo::fail(id) diff --git a/jobs-core/src/unsend_job.rs b/jobs-core/src/unsend_job.rs index 54564fa..841ea0b 100644 --- a/jobs-core/src/unsend_job.rs +++ b/jobs-core/src/unsend_job.rs @@ -1,4 +1,4 @@ -use crate::{Backoff, Job, MaxRetries}; +use crate::{Backoff, BoxError, Job, MaxRetries}; use serde::{de::DeserializeOwned, ser::Serialize}; use std::{ fmt::Debug, @@ -45,7 +45,7 @@ pub trait UnsendJob: Serialize + DeserializeOwned + 'static { type State: Clone + 'static; /// The error type this job returns - type Error: Into> + Send; + type Error: Into; /// The future returned by this job /// @@ -150,7 +150,7 @@ where T: UnsendJob, { type State = T::State; - type Error = T::Error; + type Error = BoxError; type Future = UnwrapFuture<::Handle>>; const NAME: &'static str = ::NAME; @@ -161,7 +161,8 @@ where fn run(self, state: Self::State) -> Self::Future { UnwrapFuture(T::Spawner::spawn( - UnsendJob::run(self, state).instrument(Span::current()), + async move { UnsendJob::run(self, state).await.map_err(Into::into) } + .instrument(Span::current()), )) } diff --git a/jobs-tokio/Cargo.toml b/jobs-tokio/Cargo.toml index c171718..d182850 100644 --- a/jobs-tokio/Cargo.toml +++ b/jobs-tokio/Cargo.toml @@ -12,7 +12,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.79" async-trait = "0.1.77" background-jobs-core = { version = "0.17.0", path = "../jobs-core" } metrics = "0.22.0" diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index 7c82e04..44edad6 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -12,8 +12,7 @@ //! //! ### Example //! ```rust -//! use anyhow::Error; -//! use background_jobs_core::{Backoff, Job, MaxRetries}; +//! use background_jobs_core::{Backoff, Job, MaxRetries, BoxError}; //! use background_jobs_tokio::{TokioTimer, WorkerConfig}; //! use std::future::{ready, Ready}; //! @@ -31,7 +30,7 @@ //! } //! //! #[tokio::main] -//! async fn main() -> Result<(), Error> { +//! async fn main() -> Result<(), BoxError> { //! // Set up our Storage //! // For this example, we use the default in-memory storage mechanism //! use background_jobs_core::memory_storage::Storage; @@ -74,7 +73,7 @@ //! //! impl Job for MyJob { //! type State = MyState; -//! type Future = Ready>; +//! type Future = Ready>; //! //! // The name of the job. It is super important that each job has a unique name, //! // because otherwise one job will overwrite another job when they're being @@ -115,9 +114,9 @@ //! } //! ``` -use anyhow::Error; use background_jobs_core::{ - memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait, + memory_storage::Timer, new_job, new_scheduled_job, BoxError, Job, ProcessorMap, + Storage as StorageTrait, }; use std::{ collections::{BTreeMap, HashMap}, @@ -314,7 +313,7 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute whenever a worker for the /// job's queue is free to do so. - pub async fn queue(&self, job: J) -> Result<(), Error> + pub async fn queue(&self, job: J) -> Result<(), BoxError> where J: Job, { @@ -327,7 +326,7 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute after the specified time /// and when a worker for the job's queue is free to do so. - pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> + pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), BoxError> where J: Job, { diff --git a/jobs-tokio/src/storage.rs b/jobs-tokio/src/storage.rs index cf87230..799dd5d 100644 --- a/jobs-tokio/src/storage.rs +++ b/jobs-tokio/src/storage.rs @@ -1,17 +1,17 @@ use std::{ops::Deref, sync::Arc}; -use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait}; +use background_jobs_core::{BoxError, JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait}; use uuid::Uuid; #[async_trait::async_trait] pub trait TokioStorage: Send + Sync { - async fn push(&self, job: NewJobInfo) -> anyhow::Result; + async fn push(&self, job: NewJobInfo) -> Result; - async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result; + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result; - async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> anyhow::Result<()>; + async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> Result<(), BoxError>; - async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()>; + async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), BoxError>; } #[derive(Clone)] @@ -26,19 +26,22 @@ impl TokioStorage for StorageWrapper where S: StorageTrait + Send + Sync + 'static, { - async fn push(&self, job: NewJobInfo) -> anyhow::Result { + async fn push(&self, job: NewJobInfo) -> Result { self.0.push(job).await.map_err(From::from) } - async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result { + async fn pop(&self, queue: &str, runner_id: Uuid) -> Result { self.0.pop(queue, runner_id).await.map_err(From::from) } - async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> anyhow::Result<()> { - self.0.heartbeat(job_id, runner_id).await.map_err(From::from) + async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<(), BoxError> { + self.0 + .heartbeat(job_id, runner_id) + .await + .map_err(From::from) } - async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()> { + async fn complete(&self, return_job_info: ReturnJobInfo) -> Result<(), BoxError> { self.0 .complete(return_job_info) .await diff --git a/src/lib.rs b/src/lib.rs index 59b476d..ff51d2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,6 @@ //! ```toml //! [dependencies] //! actix-rt = "2.6.0" -//! anyhow = "1.0" //! background-jobs = "0.15.0" //! serde = { version = "1.0", features = ["derive"] } //! ``` @@ -39,8 +38,7 @@ //! operation. They implment the `Job`, `serde::Serialize`, and `serde::DeserializeOwned`. //! //! ```rust,ignore -//! use anyhow::Error; -//! use background_jobs::Job; +//! use background_jobs::[Job, BoxError}; //! use std::future::{ready, Ready}; //! //! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -60,7 +58,8 @@ //! //! impl Job for MyJob { //! type State = (); -//! type Future = Ready>; +//! type Error = BoxError; +//! type Future = Ready>; //! //! const NAME: &'static str = "MyJob"; //! @@ -80,8 +79,7 @@ //! Let's re-define the job to care about some application state. //! //! ```rust,ignore -//! use anyhow::Error; -//! use background_jobs::Job; +//! use background_jobs::[Job, BoxError}; //! use std::future::{ready, Ready}; //! //! #[derive(Clone, Debug)] @@ -99,7 +97,8 @@ //! //! impl Job for MyJob { //! type State = MyState; -//! type Future = Ready>; +//! type Error = BoxError; +//! type Future = Ready>; //! //! const NAME: &'static str = "MyJob"; //! @@ -123,11 +122,10 @@ //! //! ##### Main //! ```rust,ignore -//! use anyhow::Error; -//! use background_jobs::{ServerConfig, memory_storage::Storage, WorkerConfig}; +//! use background_jobs::{ServerConfig, memory_storage::Storage, actix::WorkerConfig, BoxError}; //! //! #[actix_rt::main] -//! async fn main() -> Result<(), Error> { +//! async fn main() -> Result<(), BoxError> { //! // Set up our Storage //! let storage = Storage::new(); //! @@ -173,7 +171,7 @@ //! | `completion-logging` | Enables a tracing event that occurs whenever a job completes | //! | `error-logging` | Enables a tracing event that occurs whenever a job fails | -pub use background_jobs_core::{Backoff, Job, MaxRetries, UnsendJob, UnsendSpawner}; +pub use background_jobs_core::{Backoff, BoxError, Job, MaxRetries, UnsendJob, UnsendSpawner}; #[cfg(feature = "metrics")] pub mod metrics {