background-jobs/jobs-core/src/processor_map.rs

215 lines
6.9 KiB
Rust
Raw Permalink Normal View History

2021-09-16 22:50:32 +00:00
use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo};
use serde_json::Value;
2022-12-09 23:38:22 +00:00
use std::{
collections::HashMap, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc,
time::Instant,
};
2024-01-08 22:37:32 +00:00
use tracing::{Instrument, Span};
2021-09-16 22:50:32 +00:00
use uuid::Uuid;
/// A generic function that processes a job
///
2020-04-21 00:30:56 +00:00
/// ProcessorMap stores these `ProcessFn` types that don't expose differences in Job types.
2020-03-21 02:31:03 +00:00
pub type ProcessFn<S> = Arc<
dyn Fn(Value, S) -> Pin<Box<dyn Future<Output = Result<(), JobError>> + Send>> + Send + Sync,
>;
2019-05-24 03:41:34 +00:00
2019-06-01 15:58:05 +00:00
pub type StateFn<S> = Arc<dyn Fn() -> S + Send + Sync>;
2020-04-21 00:30:56 +00:00
/// A type for storing the relationships between job names and the job itself
///
2020-04-21 00:30:56 +00:00
/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an
/// application before workers are spawned in order to handle queued jobs.
2019-06-01 15:58:05 +00:00
#[derive(Clone)]
pub struct ProcessorMap<S> {
2019-05-24 03:41:34 +00:00
inner: HashMap<String, ProcessFn<S>>,
state_fn: StateFn<S>,
}
2020-04-21 00:30:56 +00:00
/// A type for storing the relationships between job names and the job itself, with the
/// state pre-cached instead of being generated from the state function each time
///
2020-04-21 00:30:56 +00:00
/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an
/// application before workers are spawned in order to handle queued jobs.
#[derive(Clone)]
pub struct CachedProcessorMap<S> {
inner: HashMap<String, ProcessFn<S>>,
state: S,
}
2018-11-18 21:05:03 +00:00
impl<S> ProcessorMap<S>
where
2019-05-24 03:41:34 +00:00
S: Clone + 'static,
2018-11-18 21:05:03 +00:00
{
/// Intialize a `ProcessorMap`
2018-11-18 21:05:03 +00:00
///
/// The state passed into this method will be passed to all jobs executed through this
/// ProcessorMap. The state argument could be useful for containing a hook into something like
/// r2d2, or the address of an actor in an actix-based system.
2019-05-24 03:41:34 +00:00
pub fn new(state_fn: StateFn<S>) -> Self {
2018-11-18 21:05:03 +00:00
ProcessorMap {
inner: HashMap::new(),
2019-05-24 03:41:34 +00:00
state_fn,
2018-11-18 21:05:03 +00:00
}
}
2020-04-21 00:30:56 +00:00
/// Register a [`Job`] with this `ProcessorMap`.
///
2020-04-21 00:30:56 +00:00
/// `ProcessorMap`s are useless if no jobs are registerd before workers are spawned, so
/// make sure to register all your processors up-front.
2020-04-21 00:30:56 +00:00
pub fn register<J>(&mut self)
2019-05-28 01:49:46 +00:00
where
J: Job<State = S>,
{
self.inner.insert(
2020-04-21 00:30:56 +00:00
J::NAME.to_owned(),
Arc::new(move |value, state| crate::process::<J>(value, state)),
);
}
/// Initialize the State from the State Function
pub fn cached(&self) -> CachedProcessorMap<S> {
CachedProcessorMap {
inner: self.inner.clone(),
state: (self.state_fn)(),
}
}
/// Process a given job
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
2020-04-21 00:30:56 +00:00
pub async fn process(&self, job: JobInfo) -> ReturnJobInfo {
2021-09-16 22:50:32 +00:00
let span = job_span(&job);
2021-09-17 22:09:55 +00:00
let fut = async move {
let opt = self
.inner
2024-01-08 00:52:09 +00:00
.get(&job.name)
2021-09-17 22:09:55 +00:00
.map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone()));
let res = if let Some(fut) = opt {
fut.await
} else {
let span = Span::current();
span.record(
"exception.message",
&tracing::field::display("Not registered"),
);
span.record(
"exception.details",
&tracing::field::display("Not registered"),
);
2022-11-19 20:38:47 +00:00
tracing::error!("Not registered");
2024-01-08 00:52:09 +00:00
ReturnJobInfo::unregistered(job.id)
2021-09-17 22:09:55 +00:00
};
res
2021-09-16 22:50:32 +00:00
};
2021-09-17 22:09:55 +00:00
fut.instrument(span).await
}
}
impl<S> CachedProcessorMap<S>
where
S: Clone + 'static,
{
/// Process a given job
///
/// This should not be called from outside implementations of a backgoround-jobs runtime. It is
/// intended for internal use.
2020-04-21 00:30:56 +00:00
pub async fn process(&self, job: JobInfo) -> ReturnJobInfo {
2021-09-16 22:50:32 +00:00
let span = job_span(&job);
2021-09-17 22:09:55 +00:00
let fut = async move {
2024-01-08 00:52:09 +00:00
let res = if let Some(name) = self.inner.get(&job.name) {
2021-09-17 22:09:55 +00:00
process(Arc::clone(name), self.state.clone(), job).await
} else {
let span = Span::current();
span.record(
"exception.message",
&tracing::field::display("Not registered"),
);
span.record(
"exception.details",
&tracing::field::display("Not registered"),
);
2022-11-19 20:38:47 +00:00
tracing::error!("Not registered");
2024-01-08 00:52:09 +00:00
ReturnJobInfo::unregistered(job.id)
2021-09-17 22:09:55 +00:00
};
res
2021-09-16 22:50:32 +00:00
};
2021-09-13 22:40:49 +00:00
2021-09-17 22:09:55 +00:00
fut.instrument(span).await
2021-09-13 22:40:49 +00:00
}
}
2021-09-16 22:50:32 +00:00
fn job_span(job: &JobInfo) -> Span {
tracing::info_span!(
"Job",
2024-01-08 00:52:09 +00:00
execution_id = tracing::field::display(&Uuid::now_v7()),
job.id = tracing::field::display(&job.id),
job.name = tracing::field::display(&job.name),
2021-09-16 22:50:32 +00:00
job.execution_time = tracing::field::Empty,
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
)
2021-09-13 22:40:49 +00:00
}
async fn process<S>(process_fn: ProcessFn<S>, state: S, job: JobInfo) -> ReturnJobInfo
where
S: Clone,
{
2024-01-08 00:52:09 +00:00
let args = job.args.clone();
let id = job.id;
let name = job.name.clone();
let queue = job.queue.clone();
let start = Instant::now();
2021-09-13 22:40:49 +00:00
2022-12-09 23:38:22 +00:00
let res = match std::panic::catch_unwind(AssertUnwindSafe(|| (process_fn)(args, state))) {
2021-09-13 22:40:49 +00:00
Ok(fut) => catch_unwind(fut).await,
Err(e) => Err(e),
};
2022-11-19 20:38:47 +00:00
let duration = start.elapsed();
let seconds = duration.as_micros() as f64 / 1_000_000_f64;
2020-04-21 21:08:19 +00:00
2021-09-16 22:50:32 +00:00
let span = Span::current();
span.record("job.execution_time", &tracing::field::display(&seconds));
metrics::histogram!("background-jobs.job.execution_time", "queue" => queue.clone(), "name" => name.clone()).record(seconds);
2021-09-16 22:50:32 +00:00
2020-04-21 21:08:19 +00:00
match res {
2021-09-13 22:40:49 +00:00
Ok(Ok(_)) => {
2021-09-16 22:50:32 +00:00
#[cfg(feature = "completion-logging")]
tracing::info!("Job {queue}: {name}-{id} completed");
2021-09-16 22:50:32 +00:00
2020-03-21 02:31:03 +00:00
ReturnJobInfo::pass(id)
}
2021-09-13 22:40:49 +00:00
Ok(Err(e)) => {
2024-02-05 05:35:47 +00:00
let display = format!("{e}");
2021-09-16 22:50:32 +00:00
span.record("exception.message", &tracing::field::display(&display));
2024-02-05 05:35:47 +00:00
let debug = format!("{e:?}");
2021-09-16 22:50:32 +00:00
span.record("exception.details", &tracing::field::display(&debug));
2024-02-05 05:35:47 +00:00
2021-09-16 22:50:32 +00:00
#[cfg(feature = "error-logging")]
tracing::warn!("Job {queue}: {name}-{id} errored");
2020-03-21 02:31:03 +00:00
ReturnJobInfo::fail(id)
}
2021-09-13 22:40:49 +00:00
Err(_) => {
2021-09-16 22:50:32 +00:00
span.record(
"exception.message",
&tracing::field::display("Job panicked"),
);
span.record(
"exception.details",
&tracing::field::display("Job panicked"),
);
#[cfg(feature = "error-logging")]
tracing::warn!("Job {queue}: {name}-{id} panicked");
2021-09-13 22:40:49 +00:00
ReturnJobInfo::fail(id)
}
2020-03-21 02:31:03 +00:00
}
}