diff --git a/Cargo.toml b/Cargo.toml index 15322af..2ff124a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with actix and futures" -version = "0.9.0" +version = "0.10.0" license-file = "LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -19,12 +19,14 @@ members = [ [features] default = ["background-jobs-actix"] +completion-logging = ["background-jobs-core/completion-logging", "error-logging"] +error-logging = ["background-jobs-core/error-logging"] [dependencies.background-jobs-core] -version = "0.9.0" +version = "0.10.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.9.0" +version = "0.10.0" path = "jobs-actix" optional = true diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 9e86e3a..51b7573 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -10,9 +10,10 @@ edition = "2018" actix-rt = "2.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs = { version = "0.9.0", path = "../.." } -background-jobs-sled-storage = { version = "0.9.0", path = "../../jobs-sled" } +background-jobs = { version = "0.10.0", path = "../..", features = ["error-logging"] } +background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } chrono = "0.4" -env_logger = "0.8" +tracing = "0.1" +tracing-subscriber = { version = "0.2", features = ["fmt"] } serde = { version = "1.0", features = ["derive"] } sled = "0.34" diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index 6d3d7cb..fe283ad 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -1,8 +1,10 @@ use anyhow::Error; -use background_jobs::{create_server, Job, MaxRetries, WorkerConfig}; +use background_jobs::{create_server, ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; use chrono::{Duration, Utc}; use std::future::{ready, Ready}; +use tracing::info; +use tracing_subscriber::EnvFilter; const DEFAULT_QUEUE: &str = "default"; @@ -22,10 +24,12 @@ pub struct PanickingJob; #[actix_rt::main] async fn main() -> Result<(), Error> { - if std::env::var_os("RUST_LOG").is_none() { - std::env::set_var("RUST_LOG", "info"); - } - env_logger::init(); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + // Set up our Storage let db = sled::Config::new().temporary(true).open()?; let storage = Storage::new(db)?; @@ -97,7 +101,7 @@ impl Job for MyJob { const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); fn run(self, state: MyState) -> Self::Future { - println!("{}: args, {:?}", state.app_name, self); + info!("{}: args, {:?}", state.app_name, self); ready(Ok(())) } diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 529caa8..30a3372 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.9.1" +version = "0.10.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -10,13 +10,14 @@ readme = "../README.md" edition = "2018" [dependencies] -actix-rt = "2.0.0" +actix-rt = "2.2.0" anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.9.0", path = "../jobs-core", features = ["with-actix"] } +background-jobs-core = { version = "0.10.0", path = "../jobs-core", features = ["with-actix"] } chrono = "0.4" -log = "0.4" +tracing = "0.1" +tracing-futures = "0.2" num_cpus = "1.10.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 6fe6d4d..4aae3a7 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,7 +1,7 @@ use crate::{Job, QueueHandle}; use actix_rt::time::{interval_at, Instant}; -use log::error; use std::time::Duration; +use tracing::error; /// A type used to schedule recurring jobs. /// diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 4e0addc..3788705 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -120,8 +120,8 @@ use actix_rt::{Arbiter, ArbiterHandle}; use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use chrono::{DateTime, Utc}; -use log::error; use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use tracing::error; mod every; mod server; diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index d51806b..229a8f0 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -9,12 +9,12 @@ use actix_rt::{ use anyhow::Error; use async_mutex::Mutex; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; -use log::{error, trace}; use std::{ collections::{HashMap, VecDeque}, sync::Arc, time::Duration, }; +use tracing::{error, trace}; type WorkerQueue = VecDeque>; diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 2ef7ca2..755edbe 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,8 +1,9 @@ use crate::Server; use actix_rt::spawn; use background_jobs_core::{CachedProcessorMap, JobInfo}; -use log::{debug, error, warn}; use tokio::sync::mpsc::{channel, Sender}; +use tracing::{debug, error, info, warn, Span}; +use tracing_futures::Instrument; use uuid::Uuid; #[async_trait::async_trait] @@ -51,27 +52,49 @@ pub(crate) fn local_worker( { let id = Uuid::new_v4(); - let (tx, mut rx) = channel(16); + let span = tracing::info_span!( + parent: None, + "Worker", + worker.id = tracing::field::display(&id), + worker.queue = tracing::field::display(&queue), + exception.message = tracing::field::Empty, + exception.details = tracing::field::Empty, + ); - let handle = LocalWorkerHandle { tx, id, queue }; + spawn( + async move { + let (tx, mut rx) = channel(16); - spawn(async move { - debug!("Beginning worker loop for {}", id); - if let Err(e) = server.request_job(Box::new(handle.clone())).await { - error!("Couldn't request first job, bailing, {}", e); - return; - } - while let Some(job) = rx.recv().await { - let return_job = processors.process(job).await; + let handle = LocalWorkerHandle { tx, id, queue }; - if let Err(e) = server.return_job(return_job).await { - error!("Error returning job, {}", e); - } + let span = Span::current(); + + debug!("Beginning worker loop for {}", id); if let Err(e) = server.request_job(Box::new(handle.clone())).await { - error!("Error requesting job, {}", e); - break; + let display = format!("{}", e); + let debug = format!("{:?}", e); + span.record("exception.message", &tracing::field::display(&display)); + span.record("exception.details", &tracing::field::display(&debug)); + error!("Failed to notify server of new worker, {}", e); + return; } + while let Some(job) = rx.recv().await { + let return_job = processors.process(job).await; + + if let Err(e) = server.return_job(return_job).await { + warn!("Failed to return completed job, {}", e); + } + if let Err(e) = server.request_job(Box::new(handle.clone())).await { + let display = format!("{}", e); + let debug = format!("{:?}", e); + span.record("exception.message", &tracing::field::display(&display)); + span.record("exception.details", &tracing::field::display(&debug)); + error!("Failed to notify server of ready worker, {}", e); + break; + } + } + info!("Worker closing"); } - warn!("Worker {} closing", id); - }); + .instrument(span), + ); } diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 10a954e..0b3346b 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.9.3" +version = "0.10.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -12,14 +12,17 @@ edition = "2018" [features] default = [] with-actix = ["actix-rt", "tokio"] +completion-logging = [] +error-logging = [] [dependencies] -actix-rt = { version = "2.0.0", optional = true } +actix-rt = { version = "2.2.0", optional = true } anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" chrono = { version = "0.4", features = ["serde"] } -log = "0.4" +tracing = "0.1" +tracing-futures = "0.2.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index 044b119..e0101b3 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -1,9 +1,10 @@ use crate::{Backoff, Job, MaxRetries}; use anyhow::Error; -use log::error; use serde::{de::DeserializeOwned, ser::Serialize}; use std::{future::Future, pin::Pin}; use tokio::sync::oneshot; +use tracing::{error, Span}; +use tracing_futures::Instrument; /// The ActixJob trait defines parameters pertaining to an instance of background job /// @@ -92,7 +93,7 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { impl Job for T where - T: ActixJob, + T: ActixJob + std::panic::UnwindSafe, { type State = T::State; type Future = Pin> + Send>>; @@ -106,13 +107,25 @@ where fn run(self, state: Self::State) -> Self::Future { let (tx, rx) = oneshot::channel(); - actix_rt::spawn(async move { - if tx.send(ActixJob::run(self, state).await).is_err() { + let span = Span::current(); + let handle = actix_rt::spawn(async move { + let entered = span.enter(); + let fut = ActixJob::run(self, state); + drop(entered); + + let result = fut.instrument(span.clone()).await; + + if tx.send(result).is_err() { + let entered = span.enter(); error!("Job dropped"); + drop(entered); } }); - Box::pin(async move { rx.await? }) + Box::pin(async move { + handle.await.unwrap(); + rx.await? + }) } fn queue(&self) -> &str { diff --git a/jobs-core/src/catch_unwind.rs b/jobs-core/src/catch_unwind.rs new file mode 100644 index 0000000..af4f428 --- /dev/null +++ b/jobs-core/src/catch_unwind.rs @@ -0,0 +1,41 @@ +use std::{ + future::Future, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; + +pub(crate) struct CatchUnwindFuture { + future: Mutex, +} + +pub(crate) fn catch_unwind(future: F) -> CatchUnwindFuture +where + F: Future + Unpin, +{ + CatchUnwindFuture { + future: Mutex::new(future), + } +} + +impl Future for CatchUnwindFuture +where + F: Future + Unpin, +{ + type Output = std::thread::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let future = &self.future; + let waker = cx.waker().clone(); + let res = std::panic::catch_unwind(|| { + let mut context = Context::from_waker(&waker); + let mut guard = future.lock().unwrap(); + Pin::new(&mut *guard).poll(&mut context) + }); + + match res { + Ok(poll) => poll.map(Ok), + Err(e) => Poll::Ready(Err(e)), + } + } +} diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 18de27a..1021f5f 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -17,7 +17,7 @@ use std::{future::Future, pin::Pin}; /// ```rust /// use anyhow::Error; /// use background_jobs_core::{Job, new_job}; -/// use log::info; +/// use tracing::info; /// use std::future::{ready, Ready}; /// /// #[derive(serde::Deserialize, serde::Serialize)] diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index a8b4f34..dea3c1d 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,7 +1,7 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; use chrono::{offset::Utc, DateTime, Duration}; -use log::trace; use serde_json::Value; +use tracing::trace; use uuid::Uuid; #[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 40309d9..a1a0e76 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -10,6 +10,7 @@ use anyhow::Error; #[cfg(feature = "with-actix")] mod actix_job; +mod catch_unwind; mod job; mod job_info; mod processor_map; diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 6f28942..db6674e 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,8 +1,10 @@ -use crate::{Job, JobError, JobInfo, ReturnJobInfo}; +use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo}; use chrono::Utc; -use log::{error, info}; use serde_json::Value; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; +use tracing::{error, Span}; +use tracing_futures::Instrument; +use uuid::Uuid; /// A generic function that processes a job /// @@ -76,17 +78,32 @@ where /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { - let opt = self - .inner - .get(job.name()) - .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone())); + let span = job_span(&job); + let opt = self.inner.get(job.name()).map(|name| { + let entered = span.enter(); + let fut = process(Arc::clone(name), (self.state_fn)(), job.clone()); + drop(entered); + fut + }); - if let Some(fut) = opt { - fut.await + let res = if let Some(fut) = opt { + fut.instrument(span.clone()).await } else { - error!("Job {} not registered", job.name()); + span.record( + "exception.message", + &tracing::field::display("Not registered"), + ); + span.record( + "exception.details", + &tracing::field::display("Not registered"), + ); + let entered = span.enter(); + error!("Not registered"); + drop(entered); ReturnJobInfo::unregistered(job.id()) - } + }; + + res } } @@ -99,51 +116,44 @@ where /// This should not be called from outside implementations of a backgoround-jobs runtime. It is /// intended for internal use. pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { - if let Some(name) = self.inner.get(job.name()) { - process(Arc::clone(name), self.state.clone(), job).await + let span = job_span(&job); + + let res = if let Some(name) = self.inner.get(job.name()) { + let entered = span.enter(); + let fut = process(Arc::clone(name), self.state.clone(), job); + drop(entered); + + fut.instrument(span.clone()).await } else { - error!("Job {} not registered", job.name()); + let entered = span.enter(); + span.record( + "exception.message", + &tracing::field::display("Not registered"), + ); + span.record( + "exception.details", + &tracing::field::display("Not registered"), + ); + error!("Not registered"); + drop(entered); ReturnJobInfo::unregistered(job.id()) - } + }; + + res } } -struct CatchUnwindFuture { - future: std::sync::Mutex, -} - -fn catch_unwind(future: F) -> CatchUnwindFuture -where - F: Future + Unpin, -{ - CatchUnwindFuture { - future: std::sync::Mutex::new(future), - } -} - -impl std::future::Future for CatchUnwindFuture -where - F: Future + Unpin, -{ - type Output = std::thread::Result; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let future = &self.future; - let waker = cx.waker().clone(); - let res = std::panic::catch_unwind(|| { - let mut context = std::task::Context::from_waker(&waker); - let mut guard = future.lock().unwrap(); - Pin::new(&mut *guard).poll(&mut context) - }); - - match res { - Ok(poll) => poll.map(Ok), - Err(e) => std::task::Poll::Ready(Err(e)), - } - } +fn job_span(job: &JobInfo) -> Span { + tracing::info_span!( + "Job", + execution_id = tracing::field::display(&Uuid::new_v4()), + job.id = tracing::field::display(&job.id()), + job.name = tracing::field::display(&job.name()), + job.args = tracing::field::debug(&job.args()), + job.execution_time = tracing::field::Empty, + exception.message = tracing::field::Empty, + exception.details = tracing::field::Empty, + ) } async fn process(process_fn: ProcessFn, state: S, job: JobInfo) -> ReturnJobInfo @@ -152,7 +162,6 @@ where { let args = job.args(); let id = job.id(); - let name = job.name().to_owned(); let start = Utc::now(); @@ -176,17 +185,36 @@ where 0_f64 }; + let span = Span::current(); + span.record("job.execution_time", &tracing::field::display(&seconds)); + match res { Ok(Ok(_)) => { - info!("Job {} {} completed {:.6}", id, name, seconds); + #[cfg(feature = "completion-logging")] + tracing::info!("Job completed"); + ReturnJobInfo::pass(id) } Ok(Err(e)) => { - info!("Job {} {} errored {} {:.6}", id, name, e, seconds); + let display = format!("{}", e); + let debug = format!("{:?}", e); + span.record("exception.message", &tracing::field::display(&display)); + span.record("exception.details", &tracing::field::display(&debug)); + #[cfg(feature = "error-logging")] + tracing::warn!("Job errored: {:?}", e); ReturnJobInfo::fail(id) } Err(_) => { - info!("Job {} {} panicked {:.6}", id, name, seconds); + span.record( + "exception.message", + &tracing::field::display("Job panicked"), + ); + span.record( + "exception.details", + &tracing::field::display("Job panicked"), + ); + #[cfg(feature = "error-logging")] + tracing::warn!("Job panicked"); ReturnJobInfo::fail(id) } } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 6697faa..96b937b 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,7 +1,7 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; use chrono::offset::Utc; -use log::info; use std::error::Error; +use tracing::info; use uuid::Uuid; /// Define a storage backend for jobs @@ -109,7 +109,9 @@ pub trait Storage: Clone + Send { self.save_job(job).await?; self.update_stats(Stats::retry_job).await } else { - info!("Job {} failed permanently", id); + #[cfg(feature = "error-logging")] + tracing::warn!("Job {} failed permanently", id); + self.delete_job(id).await?; self.update_stats(Stats::fail_job).await } diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 9cebfe4..b645cc1 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-sled-storage" description = "Sled storage backend for background-jobs" -version = "0.9.1" +version = "0.10.0" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -13,7 +13,7 @@ edition = "2018" [dependencies] actix-rt = "2.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.9.0", path = "../jobs-core" } +background-jobs-core = { version = "0.10.0", path = "../jobs-core" } bincode = "1.2" chrono = "0.4" sled = "0.34" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 3bcedce..62858d9 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -125,7 +125,7 @@ impl background_jobs_core::Storage for Storage { } }) .filter_map(|id| this.jobinfo.get(id).ok()) - .filter_map(|opt| opt) + .flatten() .filter_map(|ivec| serde_cbor::from_slice(&ivec).ok()) .find(|job: &JobInfo| job.is_ready(now) && job.is_pending(now)); @@ -216,7 +216,7 @@ impl background_jobs_core::Storage for Storage { Ok(spawn_blocking(move || { this.stats.fetch_and_update("stats", move |opt| { let stats = if let Some(stats_ivec) = opt { - bincode::deserialize(&stats_ivec).unwrap_or_default() + bincode::deserialize(stats_ivec).unwrap_or_default() } else { Stats::default() };