Compare commits
7 commits
b0beaad10a
...
6665ced671
Author | SHA1 | Date | |
---|---|---|---|
asonix | 6665ced671 | ||
asonix | 56291a91f8 | ||
asonix | cff4da40b8 | ||
asonix | 61456090fc | ||
asonix | ab6ab53b9a | ||
asonix | 8797127077 | ||
asonix | f85469686f |
19
Cargo.toml
19
Cargo.toml
|
@ -16,16 +16,23 @@ members = [
|
|||
"jobs-metrics",
|
||||
"jobs-postgres",
|
||||
"jobs-sled",
|
||||
"jobs-tokio",
|
||||
"examples/basic-example",
|
||||
"examples/long-example",
|
||||
"examples/managed-example",
|
||||
"examples/metrics-example",
|
||||
"examples/panic-example",
|
||||
"examples/postgres-example",
|
||||
"examples/tokio-example",
|
||||
]
|
||||
|
||||
[features]
|
||||
default = ["background-jobs-actix", "background-jobs-metrics"]
|
||||
default = ["actix-rt", "metrics"]
|
||||
actix-rt = ["dep:background-jobs-actix"]
|
||||
metrics = ["dep:background-jobs-metrics"]
|
||||
postgres = ["dep:background-jobs-postgres"]
|
||||
sled = ["dep:background-jobs-sled"]
|
||||
tokio = ["dep:background-jobs-tokio"]
|
||||
completion-logging = [
|
||||
"background-jobs-core/completion-logging",
|
||||
"error-logging",
|
||||
|
@ -50,3 +57,13 @@ optional = true
|
|||
version = "0.17.0"
|
||||
path = "jobs-postgres"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-sled]
|
||||
version = "0.17.0"
|
||||
path = "jobs-sled"
|
||||
optional = true
|
||||
|
||||
[dependencies.background-jobs-tokio]
|
||||
version = "0.17.0"
|
||||
path = "jobs-tokio"
|
||||
optional = true
|
||||
|
|
|
@ -9,10 +9,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
] }
|
||||
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
memory_storage::{ActixTimer, Storage},
|
||||
ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
// use background_jobs_sled_storage::Storage;
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
|
@ -85,7 +85,7 @@ impl MyJob {
|
|||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
|
|
@ -9,10 +9,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
] }
|
||||
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig};
|
||||
use background_jobs_sled_storage::Storage;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Future, Ready},
|
||||
pin::Pin,
|
||||
|
@ -88,7 +91,7 @@ impl MyJob {
|
|||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
@ -118,7 +121,7 @@ impl Job for MyJob {
|
|||
impl Job for LongJob {
|
||||
type State = MyState;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
const NAME: &'static str = "LongJob";
|
||||
|
||||
|
|
|
@ -9,10 +9,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
] }
|
||||
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig};
|
||||
use background_jobs_sled_storage::Storage;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
|
@ -100,7 +103,7 @@ impl MyJob {
|
|||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
@ -130,7 +133,7 @@ impl Job for MyJob {
|
|||
impl Job for StopJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
const NAME: &'static str = "StopJob";
|
||||
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
|
|
|
@ -9,10 +9,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
] }
|
||||
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
actix::{Spawner, WorkerConfig},
|
||||
metrics::MetricsStorage,
|
||||
ActixSpawner,
|
||||
MaxRetries,
|
||||
// memory_storage::{ActixTimer, Storage},
|
||||
UnsendJob as Job,
|
||||
WorkerConfig,
|
||||
sled::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
use background_jobs_sled_storage::Storage;
|
||||
use std::{future::Future, pin::Pin, time::Duration};
|
||||
use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
@ -93,7 +90,7 @@ impl MyJob {
|
|||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + 'static>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
|
|
@ -9,10 +9,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [
|
||||
"error-logging",
|
||||
] }
|
||||
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
|
||||
background-jobs = { version = "0.17.0", path = "../..", features = [ "error-logging", "sled" ] }
|
||||
time = "0.3"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{Job, MaxRetries, WorkerConfig};
|
||||
use background_jobs_sled_storage::Storage;
|
||||
use background_jobs::{actix::WorkerConfig, sled::Storage, Job, MaxRetries};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
|
|
|
@ -8,7 +8,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
actix-rt = "2.9.0"
|
||||
anyhow = "1.0.79"
|
||||
background-jobs = { version = "0.17.0", features = ["background-jobs-postgres"], path = "../.." }
|
||||
background-jobs = { version = "0.17.0", features = ["postgres"], path = "../.." }
|
||||
serde = { version = "1.0.195", features = ["derive"] }
|
||||
tokio = { version = "1.35.1", features = ["full"] }
|
||||
tracing = "0.1.40"
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use actix_rt::Arbiter;
|
||||
use background_jobs::{
|
||||
postgres::Storage, ActixSpawner, MaxRetries, UnsendJob as Job, WorkerConfig,
|
||||
actix::{Spawner, WorkerConfig},
|
||||
postgres::Storage,
|
||||
MaxRetries, UnsendJob as Job,
|
||||
};
|
||||
// use background_jobs_sled_storage::Storage;
|
||||
use std::{
|
||||
|
@ -89,7 +91,7 @@ impl MyJob {
|
|||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<anyhow::Result<()>>;
|
||||
type Spawner = ActixSpawner;
|
||||
type Spawner = Spawner;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
|
|
1
examples/tokio-example/.gitignore
vendored
Normal file
1
examples/tokio-example/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
/my-sled-db
|
16
examples/tokio-example/Cargo.toml
Normal file
16
examples/tokio-example/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "tokio-example"
|
||||
version = "0.1.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
background-jobs = { version = "0.17.0", path = "../..", default-features = false, features = [ "error-logging", "sled", "tokio"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
sled = "0.34"
|
105
examples/tokio-example/src/main.rs
Normal file
105
examples/tokio-example/src/main.rs
Normal file
|
@ -0,0 +1,105 @@
|
|||
use anyhow::Error;
|
||||
use background_jobs::{
|
||||
memory_storage::{Storage, TokioTimer},
|
||||
tokio::WorkerConfig,
|
||||
Job, MaxRetries,
|
||||
};
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
const DEFAULT_QUEUE: &str = "default";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MyState {
|
||||
pub app_name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct MyJob {
|
||||
some_usize: usize,
|
||||
other_usize: usize,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
.with_env_filter(env_filter)
|
||||
.init();
|
||||
|
||||
// Set up our Storage
|
||||
// let db = sled::Config::new().temporary(true).open()?;
|
||||
let storage = Storage::new(TokioTimer);
|
||||
|
||||
// Configure and start our workers
|
||||
let queue_handle = WorkerConfig::new(storage, |_| MyState::new("My App"))
|
||||
.register::<MyJob>()
|
||||
.set_worker_count(DEFAULT_QUEUE, 16)
|
||||
.start();
|
||||
|
||||
// Queue our jobs
|
||||
queue_handle.queue(MyJob::new(1, 2)).await?;
|
||||
queue_handle.queue(MyJob::new(3, 4)).await?;
|
||||
queue_handle.queue(MyJob::new(5, 6)).await?;
|
||||
for i in 0..20 {
|
||||
queue_handle
|
||||
.schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(i))
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Block on Tokio
|
||||
tokio::signal::ctrl_c().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl MyState {
|
||||
pub fn new(app_name: &str) -> Self {
|
||||
MyState {
|
||||
app_name: app_name.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MyJob {
|
||||
pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||
MyJob {
|
||||
some_usize,
|
||||
other_usize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Job for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
|
||||
// The name of the job. It is super important that each job has a unique name,
|
||||
// because otherwise one job will overwrite another job when they're being
|
||||
// registered.
|
||||
const NAME: &'static str = "MyJob";
|
||||
|
||||
// The queue that this processor belongs to
|
||||
//
|
||||
// Workers have the option to subscribe to specific queues, so this is important to
|
||||
// determine which worker will call the processor
|
||||
//
|
||||
// Jobs can optionally override the queue they're spawned on
|
||||
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
|
||||
// The number of times background-jobs should try to retry a job before giving up
|
||||
//
|
||||
// Jobs can optionally override this value
|
||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
|
||||
|
||||
fn run(self, state: MyState) -> Self::Future {
|
||||
info!("{}: args, {:?}", state.app_name, self);
|
||||
|
||||
ready(Ok(()))
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ tokio = { version = "1", default-features = false, features = [
|
|||
"macros",
|
||||
"rt",
|
||||
"sync",
|
||||
"time",
|
||||
"tracing",
|
||||
] }
|
||||
uuid = { version = "1", features = ["v7", "serde"] }
|
||||
|
|
|
@ -99,11 +99,11 @@
|
|||
//! // Jobs can optionally override this value
|
||||
//! const BACKOFF: Backoff = Backoff::Exponential(2);
|
||||
//!
|
||||
//! // When should the job be considered dead
|
||||
//! // This is important for allowing the job server to reap processes that were started but never
|
||||
//! // completed.
|
||||
//! //
|
||||
//! // The timeout defines when a job is allowed to be considered dead, and so can be retried
|
||||
//! // by the job processor. The value is in milliseconds and defaults to 15,000
|
||||
//! const TIMEOUT: i64 = 15_000;
|
||||
//! // Defaults to 5 seconds
|
||||
//! const HEARTBEAT_INTERVAL: u64 = 5_000;
|
||||
//!
|
||||
//! fn run(self, state: MyState) -> Self::Future {
|
||||
//! println!("{}: args, {:?}", state.app_name, self);
|
||||
|
@ -197,7 +197,7 @@ impl Manager {
|
|||
|
||||
notified.await;
|
||||
|
||||
metrics::counter!("background-jobs.worker-arbiter.restart", "number" => i.to_string()).increment(1);
|
||||
metrics::counter!("background-jobs.actix.worker-arbiter.restart", "number" => i.to_string()).increment(1);
|
||||
tracing::warn!("Recovering from dead worker arbiter");
|
||||
|
||||
drop(worker_arbiter);
|
||||
|
|
|
@ -16,7 +16,7 @@ struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> {
|
|||
|
||||
impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State, Extras> {
|
||||
fn drop(&mut self) {
|
||||
metrics::counter!("background-jobs.worker.finished", "queue" => self.queue.clone())
|
||||
metrics::counter!("background-jobs.actix.worker.finished", "queue" => self.queue.clone())
|
||||
.increment(1);
|
||||
|
||||
let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {}));
|
||||
|
@ -141,7 +141,8 @@ pub(crate) async fn local_worker<State, Extras>(
|
|||
State: Clone + 'static,
|
||||
Extras: 'static,
|
||||
{
|
||||
metrics::counter!("background-jobs.worker.started", "queue" => queue.clone()).increment(1);
|
||||
metrics::counter!("background-jobs.actix.worker.started", "queue" => queue.clone())
|
||||
.increment(1);
|
||||
|
||||
let starter = LocalWorkerStarter {
|
||||
queue: queue.clone(),
|
||||
|
@ -166,7 +167,7 @@ pub(crate) async fn local_worker<State, Extras>(
|
|||
{
|
||||
Ok(job) => job,
|
||||
Err(e) => {
|
||||
metrics::counter!("background-jobs.worker.failed-request").increment(1);
|
||||
metrics::counter!("background-jobs.actix.worker.failed-request").increment(1);
|
||||
|
||||
let display_val = format!("{}", e);
|
||||
let debug = format!("{:?}", e);
|
||||
|
@ -201,7 +202,7 @@ pub(crate) async fn local_worker<State, Extras>(
|
|||
.instrument(return_span.clone())
|
||||
.await
|
||||
{
|
||||
metrics::counter!("background-jobs.worker.failed-return").increment(1);
|
||||
metrics::counter!("background-jobs.actix.worker.failed-return").increment(1);
|
||||
|
||||
let display_val = format!("{}", e);
|
||||
let debug = format!("{:?}", e);
|
||||
|
|
|
@ -46,7 +46,7 @@ pub fn install() -> Result<StatsHandle, SetRecorderError<StatsRecorder>> {
|
|||
/// .add_recorder(jobs_recorder)
|
||||
/// .build();
|
||||
///
|
||||
/// metrics::set_boxed_recorder(Box::new(recorder)).expect("Failed to set recorder");
|
||||
/// metrics::set_global_recorder(recorder).expect("Failed to set recorder");
|
||||
///
|
||||
/// println!("{:?}", handle.get());
|
||||
/// ```
|
||||
|
|
|
@ -125,7 +125,7 @@ impl StatsRecorder {
|
|||
/// .add_recorder(jobs_recorder)
|
||||
/// .build();
|
||||
///
|
||||
/// metrics::set_boxed_recorder(Box::new(recorder)).expect("Failed to set recorder");
|
||||
/// metrics::set_global_recorder(recorder).expect("Failed to set recorder");
|
||||
///
|
||||
/// println!("{:?}", handle.get());
|
||||
/// ```
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-sled-storage"
|
||||
name = "background-jobs-sled"
|
||||
description = "Sled storage backend for background-jobs"
|
||||
version = "0.10.0"
|
||||
version = "0.17.0"
|
||||
license = "AGPL-3.0"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/asonix/background-jobs"
|
||||
|
|
17
jobs-tokio/Cargo.toml
Normal file
17
jobs-tokio/Cargo.toml
Normal file
|
@ -0,0 +1,17 @@
|
|||
[package]
|
||||
name = "background-jobs-tokio"
|
||||
version = "0.17.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.79"
|
||||
async-trait = "0.1.77"
|
||||
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
|
||||
metrics = "0.22.0"
|
||||
serde = "1.0.195"
|
||||
serde_json = "1.0.111"
|
||||
tokio = { version = "1.35.1", features = ["macros", "rt", "sync", "time", "tracing"] }
|
||||
tracing = "0.1.40"
|
||||
uuid = { version = "1.6.1", features = ["v7", "serde"] }
|
26
jobs-tokio/src/every.rs
Normal file
26
jobs-tokio/src/every.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
use crate::QueueHandle;
|
||||
use background_jobs_core::Job;
|
||||
use std::time::Duration;
|
||||
use tokio::time::{interval_at, Instant};
|
||||
|
||||
/// A type used to schedule recurring jobs.
|
||||
///
|
||||
/// ```rust,ignore
|
||||
/// let server = create_server(storage);
|
||||
/// server.every(Duration::from_secs(60 * 30), MyJob::new());
|
||||
/// ```
|
||||
pub(crate) async fn every<J>(spawner: QueueHandle, duration: Duration, job: J)
|
||||
where
|
||||
J: Job + Clone + Send,
|
||||
{
|
||||
let mut interval = interval_at(Instant::now(), duration);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let job = job.clone();
|
||||
if spawner.queue::<J>(job).await.is_err() {
|
||||
tracing::error!("Failed to queue job: {}", J::NAME);
|
||||
}
|
||||
}
|
||||
}
|
287
jobs-tokio/src/lib.rs
Normal file
287
jobs-tokio/src/lib.rs
Normal file
|
@ -0,0 +1,287 @@
|
|||
#![deny(missing_docs)]
|
||||
|
||||
//! # A Tokio-based Jobs Processor
|
||||
//!
|
||||
//! This library will spin up as many actors as requested for each processor to process jobs
|
||||
//! concurrently. Keep in mind that, by default, spawned actors run on the same Arbiter, so in
|
||||
//! order to achieve parallel execution, multiple Arbiters must be in use.
|
||||
//!
|
||||
//! The thread count is used to spawn Synchronous Actors to handle the storage of job
|
||||
//! information. For storage backends that cannot be parallelized, a thread-count of 1 should be
|
||||
//! used. By default, the number of cores of the running system is used.
|
||||
//!
|
||||
//! ### Example
|
||||
//! ```rust
|
||||
//! use anyhow::Error;
|
||||
//! use background_jobs_core::{Backoff, Job, MaxRetries};
|
||||
//! use background_jobs_tokio::{TokioTimer, WorkerConfig};
|
||||
//! use std::future::{ready, Ready};
|
||||
//!
|
||||
//! const DEFAULT_QUEUE: &'static str = "default";
|
||||
//!
|
||||
//! #[derive(Clone, Debug)]
|
||||
//! pub struct MyState {
|
||||
//! pub app_name: String,
|
||||
//! }
|
||||
//!
|
||||
//! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
//! pub struct MyJob {
|
||||
//! some_usize: usize,
|
||||
//! other_usize: usize,
|
||||
//! }
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() -> Result<(), Error> {
|
||||
//! // Set up our Storage
|
||||
//! // For this example, we use the default in-memory storage mechanism
|
||||
//! use background_jobs_core::memory_storage::Storage;
|
||||
//! let storage = Storage::new(TokioTimer);
|
||||
//!
|
||||
//! // Configure and start our workers
|
||||
//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
|
||||
//! .register::<MyJob>()
|
||||
//! .set_worker_count(DEFAULT_QUEUE, 16)
|
||||
//! .start();
|
||||
//!
|
||||
//! // Queue our jobs
|
||||
//! queue_handle.queue(MyJob::new(1, 2)).await?;
|
||||
//! queue_handle.queue(MyJob::new(3, 4)).await?;
|
||||
//! queue_handle.queue(MyJob::new(5, 6)).await?;
|
||||
//!
|
||||
//! // tokio::signal::ctrl_c().await?;
|
||||
//!
|
||||
//! Ok(())
|
||||
//! }
|
||||
//!
|
||||
//! impl MyState {
|
||||
//! pub fn new(app_name: &str) -> Self {
|
||||
//! MyState {
|
||||
//! app_name: app_name.to_owned(),
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! impl MyJob {
|
||||
//! pub fn new(some_usize: usize, other_usize: usize) -> Self {
|
||||
//! MyJob {
|
||||
//! some_usize,
|
||||
//! other_usize,
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! impl Job for MyJob {
|
||||
//! type State = MyState;
|
||||
//! type Future = Ready<Result<(), Error>>;
|
||||
//!
|
||||
//! // The name of the job. It is super important that each job has a unique name,
|
||||
//! // because otherwise one job will overwrite another job when they're being
|
||||
//! // registered.
|
||||
//! const NAME: &'static str = "MyJob";
|
||||
//!
|
||||
//! // The queue that this processor belongs to
|
||||
//! //
|
||||
//! // Workers have the option to subscribe to specific queues, so this is important to
|
||||
//! // determine which worker will call the processor
|
||||
//! //
|
||||
//! // Jobs can optionally override the queue they're spawned on
|
||||
//! const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
//!
|
||||
//! // The number of times background-jobs should try to retry a job before giving up
|
||||
//! //
|
||||
//! // This value defaults to MaxRetries::Count(5)
|
||||
//! // Jobs can optionally override this value
|
||||
//! const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
|
||||
//!
|
||||
//! // The logic to determine how often to retry this job if it fails
|
||||
//! //
|
||||
//! // This value defaults to Backoff::Exponential(2)
|
||||
//! // Jobs can optionally override this value
|
||||
//! const BACKOFF: Backoff = Backoff::Exponential(2);
|
||||
//!
|
||||
//! // This is important for allowing the job server to reap processes that were started but never
|
||||
//! // completed.
|
||||
//! //
|
||||
//! // Defaults to 5 seconds
|
||||
//! const HEARTBEAT_INTERVAL: u64 = 5_000;
|
||||
//!
|
||||
//! fn run(self, state: MyState) -> Self::Future {
|
||||
//! println!("{}: args, {:?}", state.app_name, self);
|
||||
//!
|
||||
//! ready(Ok(()))
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
use anyhow::Error;
|
||||
use background_jobs_core::{
|
||||
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage as StorageTrait,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
mod every;
|
||||
mod spawn;
|
||||
mod storage;
|
||||
mod worker;
|
||||
|
||||
use self::{every::every, storage::Storage};
|
||||
|
||||
/// A timer implementation for the Memory Storage backend
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TokioTimer;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Timer for TokioTimer {
|
||||
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
|
||||
where
|
||||
F: std::future::Future + Send + Sync,
|
||||
{
|
||||
tokio::time::timeout(duration, future).await.map_err(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new Server
|
||||
fn create_server<S>(storage: S) -> QueueHandle
|
||||
where
|
||||
S: StorageTrait + Sync + 'static,
|
||||
{
|
||||
QueueHandle {
|
||||
inner: Storage::new(storage),
|
||||
}
|
||||
}
|
||||
|
||||
/// Worker Configuration
|
||||
///
|
||||
/// This type is used for configuring and creating workers to process jobs. Before starting the
|
||||
/// workers, register `Job` types with this struct. This worker registration allows for
|
||||
/// different worker processes to handle different sets of workers.
|
||||
#[derive(Clone)]
|
||||
pub struct WorkerConfig<State>
|
||||
where
|
||||
State: Clone + 'static,
|
||||
{
|
||||
processors: ProcessorMap<State>,
|
||||
queues: BTreeMap<String, u64>,
|
||||
queue_handle: QueueHandle,
|
||||
}
|
||||
|
||||
impl<State> WorkerConfig<State>
|
||||
where
|
||||
State: Send + Clone + 'static,
|
||||
{
|
||||
/// Create a new WorkerConfig
|
||||
///
|
||||
/// The supplied function should return the State required by the jobs intended to be
|
||||
/// processed. The function must be sharable between threads, but the state itself does not
|
||||
/// have this requirement.
|
||||
pub fn new<S: StorageTrait + Send + Sync + 'static>(
|
||||
storage: S,
|
||||
state_fn: impl Fn(QueueHandle) -> State + Send + Sync + 'static,
|
||||
) -> Self {
|
||||
let queue_handle = create_server(storage);
|
||||
let q2 = queue_handle.clone();
|
||||
|
||||
WorkerConfig {
|
||||
processors: ProcessorMap::new(Arc::new(move || state_fn(q2.clone()))),
|
||||
queues: BTreeMap::new(),
|
||||
queue_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a `Job` with the worker
|
||||
///
|
||||
/// This enables the worker to handle jobs associated with this processor. If a processor is
|
||||
/// not registered, none of it's jobs will be run, even if another processor handling the same
|
||||
/// job queue is registered.
|
||||
pub fn register<J>(mut self) -> Self
|
||||
where
|
||||
J: Job<State = State>,
|
||||
{
|
||||
self.queues.insert(J::QUEUE.to_owned(), 4);
|
||||
self.processors.register::<J>();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the number of workers to run for a given queue
|
||||
///
|
||||
/// This does not spin up any additional threads. The `Arbiter` the workers are spawned onto
|
||||
/// will handle processing all workers, regardless of how many are configured.
|
||||
///
|
||||
/// By default, 4 workers are spawned
|
||||
pub fn set_worker_count(mut self, queue: &str, count: u64) -> Self {
|
||||
self.queues.insert(queue.to_owned(), count);
|
||||
self
|
||||
}
|
||||
|
||||
/// Start the workers in the provided arbiter
|
||||
pub fn start(self) -> QueueHandle {
|
||||
for (key, count) in self.queues.iter() {
|
||||
for _ in 0..*count {
|
||||
let queue = key.clone();
|
||||
let processors = self.processors.clone();
|
||||
let server = self.queue_handle.inner.clone();
|
||||
|
||||
if let Err(e) = spawn::spawn(
|
||||
"local-worker",
|
||||
worker::local_worker(queue, processors.clone(), server),
|
||||
) {
|
||||
tracing::error!("Failed to spawn worker {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.queue_handle
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle to the job server, used for queuing new jobs
|
||||
///
|
||||
/// `QueueHandle` should be stored in your application's state in order to allow all parts of your
|
||||
/// application to spawn jobs.
|
||||
#[derive(Clone)]
|
||||
pub struct QueueHandle {
|
||||
inner: Storage,
|
||||
}
|
||||
|
||||
impl QueueHandle {
|
||||
/// Queues a job for execution
|
||||
///
|
||||
/// This job will be sent to the server for storage, and will execute whenever a worker for the
|
||||
/// job's queue is free to do so.
|
||||
pub async fn queue<J>(&self, job: J) -> Result<(), Error>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
let job = new_job(job)?;
|
||||
self.inner.push(job).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Schedule a job for execution later
|
||||
///
|
||||
/// This job will be sent to the server for storage, and will execute after the specified time
|
||||
/// and when a worker for the job's queue is free to do so.
|
||||
pub async fn schedule<J>(&self, job: J, after: SystemTime) -> Result<(), Error>
|
||||
where
|
||||
J: Job,
|
||||
{
|
||||
let job = new_scheduled_job(job, after)?;
|
||||
self.inner.push(job).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Queues a job for recurring execution
|
||||
///
|
||||
/// This job will be added to it's queue on the server once every `Duration`. It will be
|
||||
/// processed whenever workers are free to do so.
|
||||
pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
|
||||
where
|
||||
J: Job + Clone + Send + 'static,
|
||||
{
|
||||
spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
|
||||
}
|
||||
}
|
22
jobs-tokio/src/spawn.rs
Normal file
22
jobs-tokio/src/spawn.rs
Normal file
|
@ -0,0 +1,22 @@
|
|||
use std::future::Future;
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
tokio::task::Builder::new().name(name).spawn(future)
|
||||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let _ = name;
|
||||
Ok(tokio::task::spawn(future))
|
||||
}
|
67
jobs-tokio/src/storage.rs
Normal file
67
jobs-tokio/src/storage.rs
Normal file
|
@ -0,0 +1,67 @@
|
|||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage as StorageTrait};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait TokioStorage: Send + Sync {
|
||||
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid>;
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo>;
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, worker_id: Uuid) -> anyhow::Result<()>;
|
||||
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Storage {
|
||||
inner: Arc<dyn TokioStorage>,
|
||||
}
|
||||
|
||||
struct StorageWrapper<S>(S);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<S> TokioStorage for StorageWrapper<S>
|
||||
where
|
||||
S: StorageTrait + Send + Sync + 'static,
|
||||
{
|
||||
async fn push(&self, job: NewJobInfo) -> anyhow::Result<Uuid> {
|
||||
self.0.push(job).await.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn pop(&self, queue: &str, runner_id: Uuid) -> anyhow::Result<JobInfo> {
|
||||
self.0.pop(queue, runner_id).await.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> anyhow::Result<()> {
|
||||
self.0.heartbeat(job_id, runner_id).await.map_err(From::from)
|
||||
}
|
||||
|
||||
async fn complete(&self, return_job_info: ReturnJobInfo) -> anyhow::Result<()> {
|
||||
self.0
|
||||
.complete(return_job_info)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(From::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub(crate) fn new<S>(storage: S) -> Self
|
||||
where
|
||||
S: StorageTrait + Send + Sync + 'static,
|
||||
{
|
||||
Self {
|
||||
inner: Arc::new(StorageWrapper(storage)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Storage {
|
||||
type Target = dyn TokioStorage + 'static;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
236
jobs-tokio/src/worker.rs
Normal file
236
jobs-tokio/src/worker.rs
Normal file
|
@ -0,0 +1,236 @@
|
|||
use crate::storage::Storage;
|
||||
use background_jobs_core::ProcessorMap;
|
||||
use std::{
|
||||
future::{poll_fn, Future},
|
||||
pin::Pin,
|
||||
};
|
||||
use tracing::{Instrument, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
struct LocalWorkerStarter<State: Send + Clone + 'static> {
|
||||
queue: String,
|
||||
processors: ProcessorMap<State>,
|
||||
server: Storage,
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn test_runtime() -> anyhow::Result<()> {
|
||||
tokio::task::Builder::new()
|
||||
.name("runtime-test")
|
||||
.spawn(async move {})
|
||||
.map(|_| ())
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
fn test_runtime() -> anyhow::Result<()> {
|
||||
std::panic::catch_unwind(|| tokio::spawn(async move {})).map(|_| ()).map_err(From::from)
|
||||
}
|
||||
|
||||
impl<State> Drop for LocalWorkerStarter<State> where State: Send + Clone + 'static {
|
||||
fn drop(&mut self) {
|
||||
metrics::counter!("background-jobs.tokio.worker.finished", "queue" => self.queue.clone())
|
||||
.increment(1);
|
||||
|
||||
let res = test_runtime();
|
||||
|
||||
if res.is_ok() {
|
||||
if let Err(e) = crate::spawn::spawn(
|
||||
"local-worker",
|
||||
local_worker(
|
||||
self.queue.clone(),
|
||||
self.processors.clone(),
|
||||
self.server.clone(),
|
||||
),
|
||||
) {
|
||||
tracing::error!("Failed to re-spawn local worker: {e}");
|
||||
} else {
|
||||
metrics::counter!("background-jobs.tokio.worker.restart").increment(1);
|
||||
}
|
||||
} else {
|
||||
tracing::info!("Shutting down worker");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RunOnDrop<F>(F)
|
||||
where
|
||||
F: Fn();
|
||||
|
||||
impl<F> Drop for RunOnDrop<F>
|
||||
where
|
||||
F: Fn(),
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
async fn heartbeat_job<F: Future>(
|
||||
storage: &Storage,
|
||||
future: F,
|
||||
job_id: Uuid,
|
||||
runner_id: Uuid,
|
||||
heartbeat_interval: u64,
|
||||
) -> F::Output {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval));
|
||||
|
||||
let mut future = std::pin::pin!(future);
|
||||
|
||||
let mut hb_future = Some(storage.heartbeat(job_id, runner_id));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
output = &mut future => {
|
||||
break output;
|
||||
},
|
||||
Some(hb_output) = option(hb_future.as_mut()), if hb_future.is_some() => {
|
||||
hb_future.take();
|
||||
|
||||
if let Err(e) = hb_output {
|
||||
tracing::warn!("Failed to heartbeat: {e}");
|
||||
}
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
if hb_future.is_none() {
|
||||
hb_future = Some(storage.heartbeat(job_id, runner_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn time_job<F: Future>(future: F, job_id: Uuid) -> F::Output {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
|
||||
interval.tick().await;
|
||||
let mut count = 0;
|
||||
|
||||
let mut future = std::pin::pin!(future);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
output = &mut future => {
|
||||
break output;
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
count += 5;
|
||||
|
||||
if count > (60 * 60) {
|
||||
if count % (60 * 20) == 0 {
|
||||
tracing::warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60);
|
||||
}
|
||||
} else if count > 60 {
|
||||
if count % 20 == 0 {
|
||||
tracing::warn!("Job {} is taking a long time: {} minutes", job_id, count / 60);
|
||||
}
|
||||
} else {
|
||||
tracing::info!("Job {} is taking a long time: {} seconds", job_id, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn option<F>(opt: Option<&mut F>) -> Option<F::Output>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
match opt {
|
||||
Some(f) => Some(poll_fn(|cx| Pin::new(&mut *f).poll(cx)).await),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn local_worker<State>(
|
||||
queue: String,
|
||||
processors: ProcessorMap<State>,
|
||||
server: Storage,
|
||||
) where
|
||||
State: Send + Clone + 'static,
|
||||
{
|
||||
metrics::counter!("background-jobs.tokio.worker.started", "queue" => queue.clone()).increment(1);
|
||||
|
||||
let starter = LocalWorkerStarter {
|
||||
queue: queue.clone(),
|
||||
processors: processors.clone(),
|
||||
server: server.clone(),
|
||||
};
|
||||
|
||||
let id = Uuid::now_v7();
|
||||
|
||||
let log_on_drop = RunOnDrop(|| {
|
||||
make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing"));
|
||||
});
|
||||
|
||||
loop {
|
||||
let request_span = make_span(id, &queue, "request");
|
||||
|
||||
let job = match request_span
|
||||
.in_scope(|| server.pop(&queue, id))
|
||||
.instrument(request_span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(job) => job,
|
||||
Err(e) => {
|
||||
metrics::counter!("background-jobs.tokio.worker.failed-request").increment(1);
|
||||
|
||||
let display_val = format!("{}", e);
|
||||
let debug = format!("{:?}", e);
|
||||
request_span.record("exception.message", &tracing::field::display(&display_val));
|
||||
request_span.record("exception.details", &tracing::field::display(&debug));
|
||||
request_span
|
||||
.in_scope(|| tracing::error!("Failed to notify server of ready worker"));
|
||||
break;
|
||||
}
|
||||
};
|
||||
drop(request_span);
|
||||
|
||||
let process_span = make_span(id, &queue, "process");
|
||||
let job_id = job.id;
|
||||
let heartbeat_interval = job.heartbeat_interval;
|
||||
let return_job = process_span
|
||||
.in_scope(|| {
|
||||
heartbeat_job(
|
||||
&server,
|
||||
time_job(processors.process(job), job_id),
|
||||
job_id,
|
||||
id,
|
||||
heartbeat_interval,
|
||||
)
|
||||
})
|
||||
.instrument(process_span)
|
||||
.await;
|
||||
|
||||
let return_span = make_span(id, &queue, "return");
|
||||
if let Err(e) = return_span
|
||||
.in_scope(|| server.complete(return_job))
|
||||
.instrument(return_span.clone())
|
||||
.await
|
||||
{
|
||||
metrics::counter!("background-jobs.tokio.worker.failed-return").increment(1);
|
||||
|
||||
let display_val = format!("{}", e);
|
||||
let debug = format!("{:?}", e);
|
||||
return_span.record("exception.message", &tracing::field::display(&display_val));
|
||||
return_span.record("exception.details", &tracing::field::display(&debug));
|
||||
return_span.in_scope(|| tracing::warn!("Failed to return completed job"));
|
||||
}
|
||||
drop(return_span);
|
||||
}
|
||||
|
||||
drop(log_on_drop);
|
||||
drop(starter);
|
||||
}
|
||||
|
||||
fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
|
||||
tracing::info_span!(
|
||||
parent: None,
|
||||
"Worker",
|
||||
worker.id = tracing::field::display(id),
|
||||
worker.queue = tracing::field::display(queue),
|
||||
worker.operation.id = tracing::field::display(&Uuid::now_v7()),
|
||||
worker.operation.name = tracing::field::display(operation),
|
||||
exception.message = tracing::field::Empty,
|
||||
exception.details = tracing::field::Empty,
|
||||
)
|
||||
}
|
27
src/lib.rs
27
src/lib.rs
|
@ -112,7 +112,7 @@
|
|||
//! ```
|
||||
//!
|
||||
//! #### Running jobs
|
||||
//! By default, this crate ships with the `background-jobs-actix` feature enabled. This uses the
|
||||
//! By default, this crate ships with the `actix-rt` feature enabled. This uses the
|
||||
//! `background-jobs-actix` crate to spin up a Server and Workers, and provides a mechanism for
|
||||
//! spawning new jobs.
|
||||
//!
|
||||
|
@ -163,7 +163,7 @@
|
|||
|
||||
pub use background_jobs_core::{Backoff, Job, MaxRetries, UnsendJob, UnsendSpawner};
|
||||
|
||||
#[cfg(feature = "background-jobs-metrics")]
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod metrics {
|
||||
pub use background_jobs_metrics::{
|
||||
build, install, JobStat, MetricsStorage, SetRecorderError, Stats, StatsHandle,
|
||||
|
@ -182,14 +182,29 @@ pub mod dev {
|
|||
pub mod memory_storage {
|
||||
pub use background_jobs_core::memory_storage::{Storage, Timer};
|
||||
|
||||
#[cfg(feature = "background-jobs-actix")]
|
||||
#[cfg(feature = "actix-rt")]
|
||||
pub use background_jobs_actix::ActixTimer;
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use background_jobs_tokio::TokioTimer;
|
||||
}
|
||||
|
||||
#[cfg(feature = "background-jobs-actix")]
|
||||
pub use background_jobs_actix::{ActixSpawner, Manager, QueueHandle, WorkerConfig};
|
||||
#[cfg(feature = "actix-rt")]
|
||||
pub mod actix {
|
||||
pub use background_jobs_actix::{ActixSpawner as Spawner, Manager, QueueHandle, WorkerConfig};
|
||||
}
|
||||
|
||||
#[cfg(feature = "background-jobs-postgres")]
|
||||
#[cfg(feature = "postgres")]
|
||||
pub mod postgres {
|
||||
pub use background_jobs_postgres::Storage;
|
||||
}
|
||||
|
||||
#[cfg(feature = "sled")]
|
||||
pub mod sled {
|
||||
pub use background_jobs_sled::{Error, Storage};
|
||||
}
|
||||
#[cfg(feature = "tokio")]
|
||||
|
||||
pub mod tokio {
|
||||
pub use background_jobs_tokio::{QueueHandle, WorkerConfig};
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue