From 048dc341bc916275c392ddd536b15bdf11e8eef7 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 4 Feb 2024 21:44:49 -0600 Subject: [PATCH] Don't forward Display for process error --- Cargo.toml | 1 + examples/error-example/.gitignore | 1 + examples/error-example/Cargo.toml | 20 +++++ examples/error-example/src/main.rs | 131 +++++++++++++++++++++++++++++ jobs-core/src/lib.rs | 2 +- 5 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 examples/error-example/.gitignore create mode 100644 examples/error-example/Cargo.toml create mode 100644 examples/error-example/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 1d32e22..690cc9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "jobs-sled", "jobs-tokio", "examples/basic-example", + "examples/error-example", "examples/long-example", "examples/managed-example", "examples/metrics-example", diff --git a/examples/error-example/.gitignore b/examples/error-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/error-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/error-example/Cargo.toml b/examples/error-example/Cargo.toml new file mode 100644 index 0000000..4f3cc7f --- /dev/null +++ b/examples/error-example/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "error-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-rt = "2.0.0" +anyhow = "1.0" +background-jobs = { version = "0.17.0", path = "../..", features = [ + "error-logging", +] } +background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +time = "0.3" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +serde = { version = "1.0", features = ["derive"] } +sled = "0.34" diff --git a/examples/error-example/src/main.rs b/examples/error-example/src/main.rs new file mode 100644 index 0000000..a4a95ea --- /dev/null +++ b/examples/error-example/src/main.rs @@ -0,0 +1,131 @@ +use actix_rt::Arbiter; +use anyhow::Error; +use background_jobs::{Job, MaxRetries, WorkerConfig}; +use background_jobs_sled_storage::Storage; +use std::{ + future::{ready, Ready}, + time::{Duration, SystemTime}, +}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +const DEFAULT_QUEUE: &str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct ErroringJob; + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + + // Set up our Storage + let db = sled::Config::new().temporary(true).open()?; + let storage = Storage::new(db)?; + + let arbiter = Arbiter::new(); + + // Configure and start our workers + let queue_handle = + WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App")) + .register::() + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start(); + + // Queue some panicking job + for _ in 0..32 { + queue_handle.queue(ErroringJob).await?; + } + + // Queue our jobs + queue_handle.queue(MyJob::new(1, 2)).await?; + queue_handle.queue(MyJob::new(3, 4)).await?; + queue_handle.queue(MyJob::new(5, 6)).await?; + queue_handle + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) + .await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + arbiter.stop(); + let _ = arbiter.join(); + + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +impl Job for MyJob { + type State = MyState; + type Future = Ready>; + + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being + // registered. + const NAME: &'static str = "MyJob"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + ready(Ok(())) + } +} + +impl Job for ErroringJob { + type State = MyState; + type Future = Ready>; + + const NAME: &'static str = "ErroringJob"; + + const QUEUE: &'static str = DEFAULT_QUEUE; + + const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); + + fn run(self, _: MyState) -> Self::Future { + std::future::ready(Err(anyhow::anyhow!("boom"))) + } +} diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index f4ef644..abc5223 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -28,7 +28,7 @@ pub use unsend_job::{JoinError, UnsendJob, UnsendSpawner}; /// The error type returned by the `process` method pub enum JobError { /// Some error occurred while processing the job - #[error("Error performing job: {0}")] + #[error("Error performing job")] Processing(#[from] Error), /// Creating a `Job` type from the provided `serde_json::Value` failed