Build a quick tokio-based jobs processor. more-or-less copy actix

This commit is contained in:
asonix 2024-01-13 16:36:49 -05:00
parent b0beaad10a
commit f85469686f
6 changed files with 655 additions and 0 deletions

17
jobs-tokio/Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[package]
name = "background-jobs-tokio"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.79"
async-trait = "0.1.77"
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
metrics = "0.22.0"
serde = "1.0.195"
serde_json = "1.0.111"
tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time", "tracing"] }
tracing = "0.1.40"
uuid = { version = "1.6.1", features = ["v7", "serde"] }

26
jobs-tokio/src/every.rs Normal file
View file

@ -0,0 +1,26 @@
use crate::QueueHandle;
use background_jobs_core::Job;
use std::time::Duration;
use tokio::time::{interval_at, Instant};
/// A type used to schedule recurring jobs.
///
/// ```rust,ignore
/// let server = create_server(storage);
/// server.every(Duration::from_secs(60 * 30), MyJob::new());
/// ```
pub(crate) async fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
where
J: Job + Clone + Send,
{
let mut interval = interval_at(Instant::now(), duration);
loop {
interval.tick().await;
let job = job.clone();
if spawner.queue::<J>(job).await.is_err() {
tracing::error!("Failed to queue job: {}", J::NAME);
}
}
}

287
jobs-tokio/src/lib.rs Normal file
View file

@ -0,0 +1,287 @@
#![deny(missing_docs)]
//! # A Tokio-based Jobs Processor
//!
//! This library will spin up as many actors as requested for each processor to process jobs
//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
//! order to achieve parallel execution, multiple Arbiters must be in use.
//!
//! The thread count is used to spawn Synchronous Actors to handle the storage of job
//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be
//! used. By default, the number of cores of the running system is used.
//!
//! ### Example
//! ```rust
//! use anyhow::Error;
//! use background_jobs_core::{Backoff, Job, MaxRetries};
//! use background_jobs_tokio::{TokioTimer, WorkerConfig};
//! use std::future::{ready, Ready};
//!
//! const DEFAULT_QUEUE: &'static str = "default";
//!
//! #[derive(Clone, Debug)]
//! pub struct MyState {
//! pub app_name: String,
//! }
//!
//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
//! pub struct MyJob {
//! some_usize: usize,
//! other_usize: usize,
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//! // Set up our Storage
//! // For this example, we use the default in-memory storage mechanism
//! use background_jobs_core::memory_storage::Storage;
//! let storage = Storage::new(TokioTimer);
//!
//! // Configure and start our workers
//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
//! .register::<MyJob>()
//! .set_worker_count(DEFAULT_QUEUE, 16)
//! .start();
//!
//! // Queue our jobs
//! queue_handle.queue(MyJob::new(1, 2)).await?;
//! queue_handle.queue(MyJob::new(3, 4)).await?;
//! queue_handle.queue(MyJob::new(5, 6)).await?;
//!
//! // tokio::signal::ctrl_c().await?;
//!
//! Ok(())
//! }
//!
//! impl MyState {
//! pub fn new(app_name: &str) -> Self {
//! MyState {
//! app_name: app_name.to_owned(),
//! }
//! }
//! }
//!
//! impl MyJob {
//! pub fn new(some_usize: usize, other_usize: usize) -> Self {
//! MyJob {
//! some_usize,
//! other_usize,
//! }
//! }
//! }
//!
//! impl Job for MyJob {
//! type State = MyState;
//! type Future = Ready<Result<(), Error>>;
//!
//! // The name of the job. It is super important that each job has a unique name,
//! // because otherwise one job will overwrite another job when they're being
//! // registered.
//! const NAME: &'static str = "MyJob";
//!
//! // The queue that this processor belongs to
//! //
//! // Workers have the option to subscribe to specific queues, so this is important to
//! // determine which worker will call the processor
//! //
//! // Jobs can optionally override the queue they're spawned on
//! const QUEUE: &'static str = DEFAULT_QUEUE;
//!
//! // The number of times background-jobs should try to retry a job before giving up
//! //
//! // This value defaults to MaxRetries::Count(5)
//! // Jobs can optionally override this value
//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
//!
//! // The logic to determine how often to retry this job if it fails
//! //
//! // This value defaults to Backoff::Exponential(2)
//! // Jobs can optionally override this value
//! const BACKOFF: Backoff = Backoff::Exponential(2);
//!
//! // This is important for allowing the job server to reap processes that were started but never
//! // completed.
//! //
//! // Defaults to 5 seconds
//! const HEARTBEAT_INTERVAL: u64 = 5_000;
//!
//! fn run(self, state: MyState) -> Self::Future {
//! println!("{}: args, {:?}", state.app_name, self);
//!
//! ready(Ok(()))
//! }
//! }
//! ```
use anyhow::Error;
use background_jobs_core::{
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait,
};
use std::{
collections::BTreeMap,
sync::Arc,
time::{Duration, SystemTime},
};
mod every;
mod spawn;
mod storage;
mod worker;
use self::{every::every, storage::Storage};
/// A timer implementation for the Memory Storage backend
#[derive(Debug, Clone)]
pub struct TokioTimer;
#[async_trait::async_trait]
impl Timer for TokioTimer {
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where
F: std::future::Future + Send + Sync,
{
tokio::time::timeout(duration, future).await.map_err(|_| ())
}
}
/// Create a new Server
fn create_server<S>(storage: S) -> QueueHandle
where
S: StorageTrait + Sync + 'static,
{
QueueHandle {
inner: Storage::new(storage),
}
}
/// Worker Configuration
///
/// This type is used for configuring and creating workers to process jobs. Before starting the
/// workers, register `Job` types with this struct. This worker registration allows for
/// different worker processes to handle different sets of workers.
#[derive(Clone)]
pub struct WorkerConfig<State>
where
State: Clone + 'static,
{
processors: ProcessorMap<State>,
queues: BTreeMap<String, u64>,
queue_handle: QueueHandle,
}
impl<State> WorkerConfig<State>
where
State: Send + Clone + 'static,
{
/// Create a new WorkerConfig
///
/// The supplied function should return the State required by the jobs intended to be
/// processed. The function must be sharable between threads, but the state itself does not
/// have this requirement.
pub fn new<S: StorageTrait + Send + Sync + 'static>(
storage: S,
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
) -> Self {
let queue_handle = create_server(storage);
let q2 = queue_handle.clone();
WorkerConfig {
processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
queues: BTreeMap::new(),
queue_handle,
}
}
/// Register a `Job` with the worker
///
/// This enables the worker to handle jobs associated with this processor. If a processor is
/// not registered, none of it's jobs will be run, even if another processor handling the same
/// job queue is registered.
pub fn register<J>(mut self) -> Self
where
J: Job<State = State>,
{
self.queues.insert(J::QUEUE.to_owned(), 4);
self.processors.register::<J>();
self
}
/// Set the number of workers to run for a given queue
///
/// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto
/// will handle processing all workers, regardless of how many are configured.
///
/// By default, 4 workers are spawned
pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self {
self.queues.insert(queue.to_owned(), count);
self
}
/// Start the workers in the provided arbiter
pub fn start(self) -> QueueHandle {
for (key, count) in self.queues.iter() {
for _ in 0..*count {
let queue = key.clone();
let processors = self.processors.clone();
let server = self.queue_handle.inner.clone();
if let Err(e) = spawn::spawn(
"local-worker",
worker::local_worker(queue, processors.clone(), server),
) {
tracing::error!("Failed to spawn worker {e}");
}
}
}
self.queue_handle
}
}
/// A handle to the job server, used for queuing new jobs
///
/// `QueueHandle` should be stored in your application's state in order to allow all parts of your
/// application to spawn jobs.
#[derive(Clone)]
pub struct QueueHandle {
inner: Storage,
}
impl QueueHandle {
/// Queues a job for execution
///
/// This job will be sent to the server for storage, and will execute whenever a worker for the
/// job's queue is free to do so.
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
where
J: Job,
{
let job = new_job(job)?;
self.inner.push(job).await?;
Ok(())
}
/// Schedule a job for execution later
///
/// This job will be sent to the server for storage, and will execute after the specified time
/// and when a worker for the job's queue is free to do so.
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
where
J: Job,
{
let job = new_scheduled_job(job, after)?;
self.inner.push(job).await?;
Ok(())
}
/// Queues a job for recurring execution
///
/// This job will be added to it's queue on the server once every `Duration`. It will be
/// processed whenever workers are free to do so.
pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
where
J: Job + Clone + Send + 'static,
{
spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
}
}

22
jobs-tokio/src/spawn.rs Normal file
View file

@ -0,0 +1,22 @@
use std::future::Future;
use tokio::task::JoinHandle;
#[cfg(tokio_unstable)]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::task::Builder::new().name(name).spawn(future)
}
#[cfg(not(tokio_unstable))]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let _ = name;
Ok(tokio::task::spawn(future))
}

67
jobs-tokio/src/storage.rs Normal file
View file

@ -0,0 +1,67 @@
use std::{ops::Deref, sync::Arc};
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait};
use uuid::Uuid;
#[async_trait::async_trait]
pub trait TokioStorage: Send + Sync {
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid>;
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo>;
async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> anyhow::Result<()>;
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()>;
}
#[derive(Clone)]
pub(crate) struct Storage {
inner: Arc<dyn TokioStorage>,
}
struct StorageWrapper<S>(S);
#[async_trait::async_trait]
impl<S> TokioStorage for StorageWrapper<S>
where
S: StorageTrait + Send + Sync + 'static,
{
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid> {
self.0.push(job).await.map_err(From::from)
}
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo> {
self.0.pop(queue, runner_id).await.map_err(From::from)
}
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> anyhow::Result<()> {
self.0.heartbeat(job_id, runner_id).await.map_err(From::from)
}
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()> {
self.0
.complete(return_job_info)
.await
.map(|_| ())
.map_err(From::from)
}
}
impl Storage {
pub(crate) fn new<S>(storage: S) -> Self
where
S: StorageTrait + Send + Sync + 'static,
{
Self {
inner: Arc::new(StorageWrapper(storage)),
}
}
}
impl Deref for Storage {
type Target = dyn TokioStorage + 'static;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

236
jobs-tokio/src/worker.rs Normal file
View file

@ -0,0 +1,236 @@
use crate::storage::Storage;
use background_jobs_core::ProcessorMap;
use std::{
future::{poll_fn, Future},
pin::Pin,
};
use tracing::{Instrument, Span};
use uuid::Uuid;
struct LocalWorkerStarter<State: Send + Clone + 'static> {
queue: String,
processors: ProcessorMap<State>,
server: Storage,
}
#[cfg(tokio_unstable)]
fn test_runtime() -> anyhow::Result<()> {
tokio::task::Builder::new()
.name("runtime-test")
.spawn(async move {})
.map(|_| ())
.map_err(From::from)
}
#[cfg(not(tokio_unstable))]
fn test_runtime() -> anyhow::Result<()> {
std::panic::catch_unwind(|| tokio::spawn(async move {})).map(|_| ()).map_err(From::from)
}
impl<State> Drop for LocalWorkerStarter<State> where State: Send + Clone + 'static {
fn drop(&mut self) {
metrics::counter!("background-jobs.tokio.worker.finished", "queue" => self.queue.clone())
.increment(1);
let res = test_runtime();
if res.is_ok() {
if let Err(e) = crate::spawn::spawn(
"local-worker",
local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
),
) {
tracing::error!("Failed to re-spawn local worker: {e}");
} else {
metrics::counter!("background-jobs.tokio.worker.restart").increment(1);
}
} else {
tracing::info!("Shutting down worker");
}
}
}
struct RunOnDrop<F>(F)
where
F: Fn();
impl<F> Drop for RunOnDrop<F>
where
F: Fn(),
{
fn drop(&mut self) {
(self.0)();
}
}
async fn heartbeat_job<F: Future>(
storage: &Storage,
future: F,
job_id: Uuid,
runner_id: Uuid,
heartbeat_interval: u64,
) -> F::Output {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut future = std::pin::pin!(future);
let mut hb_future = Some(storage.heartbeat(job_id, runner_id));
loop {
tokio::select! {
output = &mut future => {
break output;
},
Some(hb_output) = option(hb_future.as_mut()), if hb_future.is_some() => {
hb_future.take();
if let Err(e) = hb_output {
tracing::warn!("Failed to heartbeat: {e}");
}
}
_ = interval.tick() => {
if hb_future.is_none() {
hb_future = Some(storage.heartbeat(job_id, runner_id));
}
}
}
}
}
async fn time_job<F: Future>(future: F, job_id: Uuid) -> F::Output {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.tick().await;
let mut count = 0;
let mut future = std::pin::pin!(future);
loop {
tokio::select! {
output = &mut future => {
break output;
},
_ = interval.tick() => {
count += 5;
if count > (60 * 60) {
if count % (60 * 20) == 0 {
tracing::warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60);
}
} else if count > 60 {
if count % 20 == 0 {
tracing::warn!("Job {} is taking a long time: {} minutes", job_id, count / 60);
}
} else {
tracing::info!("Job {} is taking a long time: {} seconds", job_id, count);
}
}
}
}
}
async fn option<F>(opt: Option<&mut F>) -> Option<F::Output>
where
F: Future + Unpin,
{
match opt {
Some(f) => Some(poll_fn(|cx| Pin::new(&mut *f).poll(cx)).await),
None => None,
}
}
pub(crate) async fn local_worker<State>(
queue: String,
processors: ProcessorMap<State>,
server: Storage,
) where
State: Send + Clone + 'static,
{
metrics::counter!("background-jobs.tokio.worker.started", "queue" => queue.clone()).increment(1);
let starter = LocalWorkerStarter {
queue: queue.clone(),
processors: processors.clone(),
server: server.clone(),
};
let id = Uuid::now_v7();
let log_on_drop = RunOnDrop(|| {
make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing"));
});
loop {
let request_span = make_span(id, &queue, "request");
let job = match request_span
.in_scope(|| server.pop(&queue, id))
.instrument(request_span.clone())
.await
{
Ok(job) => job,
Err(e) => {
metrics::counter!("background-jobs.tokio.worker.failed-request").increment(1);
let display_val = format!("{}", e);
let debug = format!("{:?}", e);
request_span.record("exception.message", &tracing::field::display(&display_val));
request_span.record("exception.details", &tracing::field::display(&debug));
request_span
.in_scope(|| tracing::error!("Failed to notify server of ready worker"));
break;
}
};
drop(request_span);
let process_span = make_span(id, &queue, "process");
let job_id = job.id;
let heartbeat_interval = job.heartbeat_interval;
let return_job = process_span
.in_scope(|| {
heartbeat_job(
&server,
time_job(processors.process(job), job_id),
job_id,
id,
heartbeat_interval,
)
})
.instrument(process_span)
.await;
let return_span = make_span(id, &queue, "return");
if let Err(e) = return_span
.in_scope(|| server.complete(return_job))
.instrument(return_span.clone())
.await
{
metrics::counter!("background-jobs.tokio.worker.failed-return").increment(1);
let display_val = format!("{}", e);
let debug = format!("{:?}", e);
return_span.record("exception.message", &tracing::field::display(&display_val));
return_span.record("exception.details", &tracing::field::display(&debug));
return_span.in_scope(|| tracing::warn!("Failed to return completed job"));
}
drop(return_span);
}
drop(log_on_drop);
drop(starter);
}
fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
tracing::info_span!(
parent: None,
"Worker",
worker.id = tracing::field::display(id),
worker.queue = tracing::field::display(queue),
worker.operation.id = tracing::field::display(&Uuid::now_v7()),
worker.operation.name = tracing::field::display(operation),
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
)
}