Compare commits

...

14 commits
main ... v0.9.x

9 changed files with 198 additions and 42 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs"
description = "Background Jobs implemented with actix and futures"
version = "0.9.0"
version = "0.9.1"
license-file = "LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -21,10 +21,10 @@ members = [
default = ["background-jobs-actix"]
[dependencies.background-jobs-core]
version = "0.9.0"
version = "0.9.4"
path = "jobs-core"
[dependencies.background-jobs-actix]
version = "0.9.0"
version = "0.9.4"
path = "jobs-actix"
optional = true

View file

@ -1,8 +1,12 @@
use actix_rt::Arbiter;
use anyhow::Error;
use background_jobs::{create_server, Job, MaxRetries, WorkerConfig};
use background_jobs::{create_server_in_arbiter, ActixJob, MaxRetries, WorkerConfig};
use background_jobs_sled_storage::Storage;
use chrono::{Duration, Utc};
use std::future::{ready, Ready};
use std::{
future::{ready, Future, Ready},
pin::Pin,
};
const DEFAULT_QUEUE: &str = "default";
@ -20,6 +24,9 @@ pub struct MyJob {
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct PanickingJob;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct LongJob;
#[actix_rt::main]
async fn main() -> Result<(), Error> {
if std::env::var_os("RUST_LOG").is_none() {
@ -30,15 +37,18 @@ async fn main() -> Result<(), Error> {
let db = sled::Config::new().temporary(true).open()?;
let storage = Storage::new(db)?;
let arbiter = Arbiter::new();
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(storage);
let queue_handle = create_server_in_arbiter(&arbiter, storage);
// Configure and start our workers
WorkerConfig::new(move || MyState::new("My App"))
.register::<LongJob>()
.register::<PanickingJob>()
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone());
.start_in_arbiter(&arbiter, queue_handle.clone());
// Queue some panicking job
for _ in 0..32 {
@ -50,9 +60,12 @@ async fn main() -> Result<(), Error> {
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
queue_handle.schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2))?;
queue_handle.queue(LongJob)?;
// Block on Actix
actix_rt::signal::ctrl_c().await?;
arbiter.stop();
let _ = arbiter.join();
Ok(())
}
@ -74,7 +87,7 @@ impl MyJob {
}
#[async_trait::async_trait]
impl Job for MyJob {
impl ActixJob for MyJob {
type State = MyState;
type Future = Ready<Result<(), Error>>;
@ -104,7 +117,26 @@ impl Job for MyJob {
}
#[async_trait::async_trait]
impl Job for PanickingJob {
impl ActixJob for LongJob {
type State = MyState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "LongJob";
const QUEUE: &'static str = DEFAULT_QUEUE;
const MAX_RETRIES: MaxRetries = MaxRetries::Count(0);
fn run(self, _: MyState) -> Self::Future {
Box::pin(async move {
actix_rt::time::sleep(std::time::Duration::from_secs(120)).await;
Ok(())
})
}
}
#[async_trait::async_trait]
impl ActixJob for PanickingJob {
type State = MyState;
type Future = Ready<Result<(), Error>>;

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-actix"
description = "in-process jobs processor based on Actix"
version = "0.9.1"
version = "0.9.6"
license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -21,5 +21,5 @@ num_cpus = "1.10.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] }
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
uuid = { version ="0.8.1", features = ["v4", "serde"] }

View file

@ -143,7 +143,22 @@ pub fn create_server<S>(storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
let arbiter = Arbiter::current();
create_server_in_arbiter_handle(Arbiter::current(), storage)
}
/// Create a new server in the provided Arbiter
pub fn create_server_in_arbiter<S>(arbiter: &Arbiter, storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
create_server_in_arbiter_handle(arbiter.handle(), storage)
}
/// Create a new server in the provided ArbiterHandle
pub fn create_server_in_arbiter_handle<S>(arbiter: ArbiterHandle, storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
QueueHandle {
inner: Server::new(&arbiter, storage),
arbiter,
@ -209,19 +224,18 @@ where
///
/// This method will panic if not called from an actix runtime
pub fn start(self, queue_handle: QueueHandle) {
for (key, count) in self.queues.into_iter() {
for _ in 0..count {
local_worker(
key.clone(),
self.processors.cached(),
queue_handle.inner.clone(),
);
}
}
let handle = Arbiter::current();
self.start_in_arbiter_handle(&handle, queue_handle);
}
/// Start the workers in the provided arbiter
pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) {
self.start_in_arbiter_handle(&arbiter.handle(), queue_handle)
}
/// Start the workers in the provided arbiter via it's handle
pub fn start_in_arbiter_handle(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) {
for (key, count) in self.queues.into_iter() {
for _ in 0..count {
let key = key.clone();
@ -257,11 +271,14 @@ impl QueueHandle {
{
let job = new_job(job)?;
let server = self.inner.clone();
self.arbiter.spawn(async move {
let success = self.arbiter.spawn(async move {
if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e);
}
});
if !success {
return Err(anyhow::anyhow!("Failed to queue job"));
}
Ok(())
}
@ -275,11 +292,14 @@ impl QueueHandle {
{
let job = new_scheduled_job(job, after)?;
let server = self.inner.clone();
self.arbiter.spawn(async move {
let success = self.arbiter.spawn(async move {
if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e);
}
});
if !success {
return Err(anyhow::anyhow!("Failed to schedule job"));
}
Ok(())
}

View file

@ -9,7 +9,7 @@ use actix_rt::{
use anyhow::Error;
use async_mutex::Mutex;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{error, trace};
use log::{error, trace, warn};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
@ -31,9 +31,46 @@ pub(crate) struct ServerCache {
pub(crate) struct Server {
storage: Arc<dyn ActixStorage + Send + Sync>,
cache: ServerCache,
arbiter: ArbiterHandle,
}
struct Ticker {
server: Server,
}
impl Drop for Ticker {
fn drop(&mut self) {
let online = self.server.arbiter.spawn(async move {});
if online {
let server = self.server.clone();
self.server.arbiter.spawn(async move {
// ensure new ticker is spawned when existing ticker dies
let _ticker = server.ticker();
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
} else {
warn!("Not restarting ticker, arbiter has died");
}
}
}
impl Server {
fn ticker(&self) -> Ticker {
Ticker {
server: self.clone(),
}
}
/// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(arbiter: &ArbiterHandle, storage: S) -> Self
where
@ -42,21 +79,12 @@ impl Server {
let server = Server {
storage: Arc::new(StorageWrapper(storage)),
cache: ServerCache::new(),
arbiter: arbiter.clone(),
};
let server2 = server.clone();
arbiter.spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
drop(server.ticker());
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
server2
server
}
async fn check_db(&self) -> Result<(), Error> {

View file

@ -1,7 +1,8 @@
use crate::Server;
use actix_rt::spawn;
use actix_rt::{spawn, Arbiter};
use background_jobs_core::{CachedProcessorMap, JobInfo};
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use std::future::Future;
use tokio::sync::mpsc::{channel, Sender};
use uuid::Uuid;
@ -42,6 +43,67 @@ impl Worker for LocalWorkerHandle {
}
}
struct LocalWorkerStarter<State: Clone + 'static> {
queue: String,
processors: CachedProcessorMap<State>,
server: Server,
}
impl<State: Clone + 'static> Drop for LocalWorkerStarter<State> {
fn drop(&mut self) {
let res = std::panic::catch_unwind(|| {
let handle = Arbiter::current();
handle.spawn(async move {})
});
if let Ok(true) = res {
local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
)
} else {
warn!("Not restarting worker, arbiter has died");
}
}
}
struct WarnOnDrop(Uuid);
impl Drop for WarnOnDrop {
fn drop(&mut self) {
warn!("Worker {} closing", self.0);
}
}
async fn time_job<F: Future + Unpin>(mut future: F, job_id: Uuid) -> <F as Future>::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
interval.tick().await;
let mut count = 0;
loop {
tokio::select! {
output = &mut future => { break output },
_ = interval.tick() => {
count += 5;
if count > 60 * 60 {
if count % (60 * 20) == 0 {
warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60);
}
} else if count >= 60 {
if count % 20 == 0 {
info!("Job {} is taking a long time: {} minutes", job_id, count / 60);
}
} else {
info!("Job {} is taking a long time: {} seconds", job_id, count);
}
},
};
}
}
pub(crate) fn local_worker<State>(
queue: String,
processors: CachedProcessorMap<State>,
@ -49,6 +111,11 @@ pub(crate) fn local_worker<State>(
) where
State: Clone + 'static,
{
let starter = LocalWorkerStarter {
queue: queue.clone(),
processors: processors.clone(),
server: server.clone(),
};
let id = Uuid::new_v4();
let (tx, mut rx) = channel(16);
@ -56,13 +123,16 @@ pub(crate) fn local_worker<State>(
let handle = LocalWorkerHandle { tx, id, queue };
spawn(async move {
info!("Starting worker {}", id);
let warn_on_drop = WarnOnDrop(id);
debug!("Beginning worker loop for {}", id);
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
error!("Couldn't request first job, bailing, {}", e);
return;
}
while let Some(job) = rx.recv().await {
let return_job = processors.process(job).await;
let id = job.id();
let return_job = time_job(Box::pin(processors.process(job)), id).await;
if let Err(e) = server.return_job(return_job).await {
error!("Error returning job, {}", e);
@ -72,6 +142,7 @@ pub(crate) fn local_worker<State>(
break;
}
}
warn!("Worker {} closing", id);
drop(warn_on_drop);
drop(starter);
});
}

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-core"
description = "Core types for implementing an asynchronous jobs processor"
version = "0.9.3"
version = "0.9.5"
license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"

View file

@ -28,6 +28,7 @@ pub struct ProcessorMap<S> {
///
/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an
/// application before workers are spawned in order to handle queued jobs.
#[derive(Clone)]
pub struct CachedProcessorMap<S> {
inner: HashMap<String, ProcessFn<S>>,
state: S,
@ -153,6 +154,7 @@ where
let args = job.args();
let id = job.id();
let name = job.name().to_owned();
info!("Job {} {} starting", id, name);
let start = Utc::now();

View file

@ -172,4 +172,7 @@ pub mod dev {
}
#[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig};
pub use background_jobs_actix::{
create_server, create_server_in_arbiter, create_server_in_arbiter_handle, ActixJob,
QueueHandle, WorkerConfig,
};