Compare commits
14 commits
Author | SHA1 | Date | |
---|---|---|---|
Aode (lion) | bb29da02d2 | ||
Aode (lion) | 9dee94adc8 | ||
Aode (lion) | a3485a1e3e | ||
Aode (lion) | aa9b89e33a | ||
Aode (lion) | f826a87a59 | ||
Aode (lion) | 1b673a13a0 | ||
Aode (lion) | 1f8ef7905d | ||
Aode (lion) | 95a7dcf6a6 | ||
Aode (lion) | c48d42ead6 | ||
Aode (lion) | c93fec98a0 | ||
Aode (Lion) | e3bba29de4 | ||
Aode (Lion) | 5cd6a1b102 | ||
Aode (Lion) | 1266873f5d | ||
Aode (Lion) | 1ce008e8c4 |
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs"
|
||||
description = "Background Jobs implemented with actix and futures"
|
||||
version = "0.9.0"
|
||||
version = "0.9.1"
|
||||
license-file = "LICENSE"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
@ -21,10 +21,10 @@ members = [
|
|||
default = ["background-jobs-actix"]
|
||||
|
||||
[dependencies.background-jobs-core]
|
||||
version = "0.9.0"
|
||||
version = "0.9.4"
|
||||
path = "jobs-core"
|
||||
|
||||
[dependencies.background-jobs-actix]
|
||||
version = "0.9.0"
|
||||
version = "0.9.4"
|
||||
path = "jobs-actix"
|
||||
optional = true
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
use actix_rt::Arbiter;
|
||||
use anyhow::Error;
|
||||
use background_jobs::{create_server, Job, MaxRetries, WorkerConfig};
|
||||
use background_jobs::{create_server_in_arbiter, ActixJob, MaxRetries, WorkerConfig};
|
||||
use background_jobs_sled_storage::Storage;
|
||||
use chrono::{Duration, Utc};
|
||||
use std::future::{ready, Ready};
|
||||
use std::{
|
||||
future::{ready, Future, Ready},
|
||||
pin::Pin,
|
||||
};
|
||||
|
||||
const DEFAULT_QUEUE: &str = "default";
|
||||
|
||||
|
@ -20,6 +24,9 @@ pub struct MyJob {
|
|||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct PanickingJob;
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
pub struct LongJob;
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
if std::env::var_os("RUST_LOG").is_none() {
|
||||
|
@ -30,15 +37,18 @@ async fn main() -> Result<(), Error> {
|
|||
let db = sled::Config::new().temporary(true).open()?;
|
||||
let storage = Storage::new(db)?;
|
||||
|
||||
let arbiter = Arbiter::new();
|
||||
|
||||
// Start the application server. This guards access to to the jobs store
|
||||
let queue_handle = create_server(storage);
|
||||
let queue_handle = create_server_in_arbiter(&arbiter, storage);
|
||||
|
||||
// Configure and start our workers
|
||||
WorkerConfig::new(move || MyState::new("My App"))
|
||||
.register::<LongJob>()
|
||||
.register::<PanickingJob>()
|
||||
.register::<MyJob>()
|
||||
.set_worker_count(DEFAULT_QUEUE, 16)
|
||||
.start(queue_handle.clone());
|
||||
.start_in_arbiter(&arbiter, queue_handle.clone());
|
||||
|
||||
// Queue some panicking job
|
||||
for _ in 0..32 {
|
||||
|
@ -50,9 +60,12 @@ async fn main() -> Result<(), Error> {
|
|||
queue_handle.queue(MyJob::new(3, 4))?;
|
||||
queue_handle.queue(MyJob::new(5, 6))?;
|
||||
queue_handle.schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2))?;
|
||||
queue_handle.queue(LongJob)?;
|
||||
|
||||
// Block on Actix
|
||||
actix_rt::signal::ctrl_c().await?;
|
||||
arbiter.stop();
|
||||
let _ = arbiter.join();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -74,7 +87,7 @@ impl MyJob {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Job for MyJob {
|
||||
impl ActixJob for MyJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
|
||||
|
@ -104,7 +117,26 @@ impl Job for MyJob {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Job for PanickingJob {
|
||||
impl ActixJob for LongJob {
|
||||
type State = MyState;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||
|
||||
const NAME: &'static str = "LongJob";
|
||||
|
||||
const QUEUE: &'static str = DEFAULT_QUEUE;
|
||||
|
||||
const MAX_RETRIES: MaxRetries = MaxRetries::Count(0);
|
||||
|
||||
fn run(self, _: MyState) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
actix_rt::time::sleep(std::time::Duration::from_secs(120)).await;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ActixJob for PanickingJob {
|
||||
type State = MyState;
|
||||
type Future = Ready<Result<(), Error>>;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-actix"
|
||||
description = "in-process jobs processor based on Actix"
|
||||
version = "0.9.1"
|
||||
version = "0.9.6"
|
||||
license-file = "../LICENSE"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
@ -21,5 +21,5 @@ num_cpus = "1.10.0"
|
|||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
tokio = { version = "1", default-features = false, features = ["macros", "sync"] }
|
||||
uuid = { version ="0.8.1", features = ["v4", "serde"] }
|
||||
|
|
|
@ -143,7 +143,22 @@ pub fn create_server<S>(storage: S) -> QueueHandle
|
|||
where
|
||||
S: Storage + Sync + 'static,
|
||||
{
|
||||
let arbiter = Arbiter::current();
|
||||
create_server_in_arbiter_handle(Arbiter::current(), storage)
|
||||
}
|
||||
|
||||
/// Create a new server in the provided Arbiter
|
||||
pub fn create_server_in_arbiter<S>(arbiter: &Arbiter, storage: S) -> QueueHandle
|
||||
where
|
||||
S: Storage + Sync + 'static,
|
||||
{
|
||||
create_server_in_arbiter_handle(arbiter.handle(), storage)
|
||||
}
|
||||
|
||||
/// Create a new server in the provided ArbiterHandle
|
||||
pub fn create_server_in_arbiter_handle<S>(arbiter: ArbiterHandle, storage: S) -> QueueHandle
|
||||
where
|
||||
S: Storage + Sync + 'static,
|
||||
{
|
||||
QueueHandle {
|
||||
inner: Server::new(&arbiter, storage),
|
||||
arbiter,
|
||||
|
@ -209,19 +224,18 @@ where
|
|||
///
|
||||
/// This method will panic if not called from an actix runtime
|
||||
pub fn start(self, queue_handle: QueueHandle) {
|
||||
for (key, count) in self.queues.into_iter() {
|
||||
for _ in 0..count {
|
||||
local_worker(
|
||||
key.clone(),
|
||||
self.processors.cached(),
|
||||
queue_handle.inner.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
let handle = Arbiter::current();
|
||||
|
||||
self.start_in_arbiter_handle(&handle, queue_handle);
|
||||
}
|
||||
|
||||
/// Start the workers in the provided arbiter
|
||||
pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) {
|
||||
self.start_in_arbiter_handle(&arbiter.handle(), queue_handle)
|
||||
}
|
||||
|
||||
/// Start the workers in the provided arbiter via it's handle
|
||||
pub fn start_in_arbiter_handle(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) {
|
||||
for (key, count) in self.queues.into_iter() {
|
||||
for _ in 0..count {
|
||||
let key = key.clone();
|
||||
|
@ -257,11 +271,14 @@ impl QueueHandle {
|
|||
{
|
||||
let job = new_job(job)?;
|
||||
let server = self.inner.clone();
|
||||
self.arbiter.spawn(async move {
|
||||
let success = self.arbiter.spawn(async move {
|
||||
if let Err(e) = server.new_job(job).await {
|
||||
error!("Error creating job, {}", e);
|
||||
}
|
||||
});
|
||||
if !success {
|
||||
return Err(anyhow::anyhow!("Failed to queue job"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -275,11 +292,14 @@ impl QueueHandle {
|
|||
{
|
||||
let job = new_scheduled_job(job, after)?;
|
||||
let server = self.inner.clone();
|
||||
self.arbiter.spawn(async move {
|
||||
let success = self.arbiter.spawn(async move {
|
||||
if let Err(e) = server.new_job(job).await {
|
||||
error!("Error creating job, {}", e);
|
||||
}
|
||||
});
|
||||
if !success {
|
||||
return Err(anyhow::anyhow!("Failed to schedule job"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ use actix_rt::{
|
|||
use anyhow::Error;
|
||||
use async_mutex::Mutex;
|
||||
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
|
||||
use log::{error, trace};
|
||||
use log::{error, trace, warn};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
|
@ -31,9 +31,46 @@ pub(crate) struct ServerCache {
|
|||
pub(crate) struct Server {
|
||||
storage: Arc<dyn ActixStorage + Send + Sync>,
|
||||
cache: ServerCache,
|
||||
arbiter: ArbiterHandle,
|
||||
}
|
||||
|
||||
struct Ticker {
|
||||
server: Server,
|
||||
}
|
||||
|
||||
impl Drop for Ticker {
|
||||
fn drop(&mut self) {
|
||||
let online = self.server.arbiter.spawn(async move {});
|
||||
|
||||
if online {
|
||||
let server = self.server.clone();
|
||||
|
||||
self.server.arbiter.spawn(async move {
|
||||
// ensure new ticker is spawned when existing ticker dies
|
||||
let _ticker = server.ticker();
|
||||
|
||||
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = server.check_db().await {
|
||||
error!("Error while checking database for new jobs, {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
warn!("Not restarting ticker, arbiter has died");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Server {
|
||||
fn ticker(&self) -> Ticker {
|
||||
Ticker {
|
||||
server: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new Server from a compatible storage implementation
|
||||
pub(crate) fn new<S>(arbiter: &ArbiterHandle, storage: S) -> Self
|
||||
where
|
||||
|
@ -42,21 +79,12 @@ impl Server {
|
|||
let server = Server {
|
||||
storage: Arc::new(StorageWrapper(storage)),
|
||||
cache: ServerCache::new(),
|
||||
arbiter: arbiter.clone(),
|
||||
};
|
||||
|
||||
let server2 = server.clone();
|
||||
arbiter.spawn(async move {
|
||||
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
|
||||
drop(server.ticker());
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = server.check_db().await {
|
||||
error!("Error while checking database for new jobs, {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
server2
|
||||
server
|
||||
}
|
||||
|
||||
async fn check_db(&self) -> Result<(), Error> {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use crate::Server;
|
||||
use actix_rt::spawn;
|
||||
use actix_rt::{spawn, Arbiter};
|
||||
use background_jobs_core::{CachedProcessorMap, JobInfo};
|
||||
use log::{debug, error, warn};
|
||||
use log::{debug, error, info, warn};
|
||||
use std::future::Future;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -42,6 +43,67 @@ impl Worker for LocalWorkerHandle {
|
|||
}
|
||||
}
|
||||
|
||||
struct LocalWorkerStarter<State: Clone + 'static> {
|
||||
queue: String,
|
||||
processors: CachedProcessorMap<State>,
|
||||
server: Server,
|
||||
}
|
||||
|
||||
impl<State: Clone + 'static> Drop for LocalWorkerStarter<State> {
|
||||
fn drop(&mut self) {
|
||||
let res = std::panic::catch_unwind(|| {
|
||||
let handle = Arbiter::current();
|
||||
|
||||
handle.spawn(async move {})
|
||||
});
|
||||
|
||||
if let Ok(true) = res {
|
||||
local_worker(
|
||||
self.queue.clone(),
|
||||
self.processors.clone(),
|
||||
self.server.clone(),
|
||||
)
|
||||
} else {
|
||||
warn!("Not restarting worker, arbiter has died");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WarnOnDrop(Uuid);
|
||||
|
||||
impl Drop for WarnOnDrop {
|
||||
fn drop(&mut self) {
|
||||
warn!("Worker {} closing", self.0);
|
||||
}
|
||||
}
|
||||
|
||||
async fn time_job<F: Future + Unpin>(mut future: F, job_id: Uuid) -> <F as Future>::Output {
|
||||
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
|
||||
interval.tick().await;
|
||||
let mut count = 0;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
output = &mut future => { break output },
|
||||
_ = interval.tick() => {
|
||||
count += 5;
|
||||
|
||||
if count > 60 * 60 {
|
||||
if count % (60 * 20) == 0 {
|
||||
warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60);
|
||||
}
|
||||
} else if count >= 60 {
|
||||
if count % 20 == 0 {
|
||||
info!("Job {} is taking a long time: {} minutes", job_id, count / 60);
|
||||
}
|
||||
} else {
|
||||
info!("Job {} is taking a long time: {} seconds", job_id, count);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn local_worker<State>(
|
||||
queue: String,
|
||||
processors: CachedProcessorMap<State>,
|
||||
|
@ -49,6 +111,11 @@ pub(crate) fn local_worker<State>(
|
|||
) where
|
||||
State: Clone + 'static,
|
||||
{
|
||||
let starter = LocalWorkerStarter {
|
||||
queue: queue.clone(),
|
||||
processors: processors.clone(),
|
||||
server: server.clone(),
|
||||
};
|
||||
let id = Uuid::new_v4();
|
||||
|
||||
let (tx, mut rx) = channel(16);
|
||||
|
@ -56,13 +123,16 @@ pub(crate) fn local_worker<State>(
|
|||
let handle = LocalWorkerHandle { tx, id, queue };
|
||||
|
||||
spawn(async move {
|
||||
info!("Starting worker {}", id);
|
||||
let warn_on_drop = WarnOnDrop(id);
|
||||
debug!("Beginning worker loop for {}", id);
|
||||
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
|
||||
error!("Couldn't request first job, bailing, {}", e);
|
||||
return;
|
||||
}
|
||||
while let Some(job) = rx.recv().await {
|
||||
let return_job = processors.process(job).await;
|
||||
let id = job.id();
|
||||
let return_job = time_job(Box::pin(processors.process(job)), id).await;
|
||||
|
||||
if let Err(e) = server.return_job(return_job).await {
|
||||
error!("Error returning job, {}", e);
|
||||
|
@ -72,6 +142,7 @@ pub(crate) fn local_worker<State>(
|
|||
break;
|
||||
}
|
||||
}
|
||||
warn!("Worker {} closing", id);
|
||||
drop(warn_on_drop);
|
||||
drop(starter);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "background-jobs-core"
|
||||
description = "Core types for implementing an asynchronous jobs processor"
|
||||
version = "0.9.3"
|
||||
version = "0.9.5"
|
||||
license-file = "../LICENSE"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||
|
|
|
@ -28,6 +28,7 @@ pub struct ProcessorMap<S> {
|
|||
///
|
||||
/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an
|
||||
/// application before workers are spawned in order to handle queued jobs.
|
||||
#[derive(Clone)]
|
||||
pub struct CachedProcessorMap<S> {
|
||||
inner: HashMap<String, ProcessFn<S>>,
|
||||
state: S,
|
||||
|
@ -153,6 +154,7 @@ where
|
|||
let args = job.args();
|
||||
let id = job.id();
|
||||
let name = job.name().to_owned();
|
||||
info!("Job {} {} starting", id, name);
|
||||
|
||||
let start = Utc::now();
|
||||
|
||||
|
|
|
@ -172,4 +172,7 @@ pub mod dev {
|
|||
}
|
||||
|
||||
#[cfg(feature = "background-jobs-actix")]
|
||||
pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig};
|
||||
pub use background_jobs_actix::{
|
||||
create_server, create_server_in_arbiter, create_server_in_arbiter_handle, ActixJob,
|
||||
QueueHandle, WorkerConfig,
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue