Move more out of spawned tasks

This commit is contained in:
Aode (Lion) 2021-10-06 20:31:27 -05:00
parent cce5a97306
commit c0ce6f303e
8 changed files with 101 additions and 81 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs" name = "background-jobs"
description = "Background Jobs implemented with actix and futures" description = "Background Jobs implemented with actix and futures"
version = "0.10.0" version = "0.11.0"
license = "AGPL-3.0" license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs" repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -27,6 +27,6 @@ version = "0.10.0"
path = "jobs-core" path = "jobs-core"
[dependencies.background-jobs-actix] [dependencies.background-jobs-actix]
version = "0.10.0" version = "0.11.0"
path = "jobs-actix" path = "jobs-actix"
optional = true optional = true

View file

@ -10,7 +10,7 @@ edition = "2018"
actix-rt = "2.0.0" actix-rt = "2.0.0"
anyhow = "1.0" anyhow = "1.0"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs = { version = "0.10.0", path = "../..", features = ["error-logging"] } background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] }
background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" }
chrono = "0.4" chrono = "0.4"
tracing = "0.1" tracing = "0.1"

View file

@ -46,14 +46,16 @@ async fn main() -> Result<(), Error> {
// Queue some panicking job // Queue some panicking job
for _ in 0..32 { for _ in 0..32 {
queue_handle.queue(PanickingJob)?; queue_handle.queue(PanickingJob).await?;
} }
// Queue our jobs // Queue our jobs
queue_handle.queue(MyJob::new(1, 2))?; queue_handle.queue(MyJob::new(1, 2)).await?;
queue_handle.queue(MyJob::new(3, 4))?; queue_handle.queue(MyJob::new(3, 4)).await?;
queue_handle.queue(MyJob::new(5, 6))?; queue_handle.queue(MyJob::new(5, 6)).await?;
queue_handle.schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2))?; queue_handle
.schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2))
.await?;
// Block on Actix // Block on Actix
actix_rt::signal::ctrl_c().await?; actix_rt::signal::ctrl_c().await?;

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-actix" name = "background-jobs-actix"
description = "in-process jobs processor based on Actix" description = "in-process jobs processor based on Actix"
version = "0.10.1" version = "0.11.0"
license = "AGPL-3.0" license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs" repository = "https://git.asonix.dog/Aardwolf/background-jobs"
@ -22,5 +22,5 @@ num_cpus = "1.10.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] } tokio = { version = "1", default-features = false, features = ["rt", "sync"] }
uuid = { version ="0.8.1", features = ["v4", "serde"] } uuid = { version ="0.8.1", features = ["v4", "serde"] }

View file

@ -14,14 +14,15 @@ where
J: Job + Clone + Send, J: Job + Clone + Send,
{ {
let spawner_clone = spawner.clone(); let spawner_clone = spawner.clone();
spawner.arbiter.spawn(async move { spawner.tokio_rt.spawn(async move {
let mut interval = interval_at(Instant::now(), duration); let mut interval = interval_at(Instant::now(), duration);
loop { loop {
interval.tick().await; interval.tick().await;
if spawner_clone.queue(job.clone()).is_err() { let job = job.clone();
error!("Failed to queue job"); if spawner_clone.queue::<J>(job).await.is_err() {
error!("Failed to queue job: {}", J::NAME);
} }
} }
}); });

View file

@ -121,7 +121,6 @@ use anyhow::Error;
use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use std::{collections::BTreeMap, sync::Arc, time::Duration}; use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tracing::error;
mod every; mod every;
mod server; mod server;
@ -143,10 +142,25 @@ pub fn create_server<S>(storage: S) -> QueueHandle
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
let arbiter = Arbiter::current(); create_server_in_arbiter(storage, &Arbiter::current())
}
/// Create a new Server
///
/// In previous versions of this library, the server itself was run on it's own dedicated threads
/// and guarded access to jobs via messages. Since we now have futures-aware synchronization
/// primitives, the Server has become an object that gets shared between client threads.
///
/// This method will panic if not called from an actix runtime
pub fn create_server_in_arbiter<S>(storage: S, arbiter: &ArbiterHandle) -> QueueHandle
where
S: Storage + Sync + 'static,
{
let tokio_rt = tokio::runtime::Handle::current();
QueueHandle { QueueHandle {
inner: Server::new(&arbiter, storage), inner: Server::new(&arbiter, storage),
arbiter, tokio_rt,
} }
} }
@ -209,19 +223,11 @@ where
/// ///
/// This method will panic if not called from an actix runtime /// This method will panic if not called from an actix runtime
pub fn start(self, queue_handle: QueueHandle) { pub fn start(self, queue_handle: QueueHandle) {
for (key, count) in self.queues.into_iter() { self.start_in_arbiter(&Arbiter::current(), queue_handle)
for _ in 0..count {
local_worker(
key.clone(),
self.processors.cached(),
queue_handle.inner.clone(),
);
}
}
} }
/// Start the workers in the provided arbiter /// Start the workers in the provided arbiter
pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) { pub fn start_in_arbiter(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) {
for (key, count) in self.queues.into_iter() { for (key, count) in self.queues.into_iter() {
for _ in 0..count { for _ in 0..count {
let key = key.clone(); let key = key.clone();
@ -229,7 +235,7 @@ where
let server = queue_handle.inner.clone(); let server = queue_handle.inner.clone();
arbiter.spawn_fn(move || { arbiter.spawn_fn(move || {
local_worker(key, processors.cached(), server); actix_rt::spawn(local_worker(key, processors.cached(), server));
}); });
} }
} }
@ -243,7 +249,7 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct QueueHandle { pub struct QueueHandle {
inner: Server, inner: Server,
arbiter: ArbiterHandle, tokio_rt: tokio::runtime::Handle,
} }
impl QueueHandle { impl QueueHandle {
@ -251,17 +257,37 @@ impl QueueHandle {
/// ///
/// This job will be sent to the server for storage, and will execute whenever a worker for the /// This job will be sent to the server for storage, and will execute whenever a worker for the
/// job's queue is free to do so. /// job's queue is free to do so.
pub fn queue<J>(&self, job: J) -> Result<(), Error> pub async fn queue<J>(&self, job: J) -> Result<(), Error>
where where
J: Job, J: Job,
{ {
let job = new_job(job)?; let job = new_job(job)?;
self.inner.new_job(job).await?;
Ok(())
}
/// Queues a job for execution
///
/// This job will be sent to the server for storage, and will execute whenever a worker for the
/// job's queue is free to do so.
pub async fn blocking_queue<J>(&self, job: J) -> Result<(), Error>
where
J: Job,
{
self.tokio_rt.block_on(self.queue(job))
}
/// Schedule a job for execution later
///
/// This job will be sent to the server for storage, and will execute after the specified time
/// and when a worker for the job's queue is free to do so.
pub async fn schedule<J>(&self, job: J, after: DateTime<Utc>) -> Result<(), Error>
where
J: Job,
{
let job = new_scheduled_job(job, after)?;
let server = self.inner.clone(); let server = self.inner.clone();
self.arbiter.spawn(async move { server.new_job(job).await?;
if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e);
}
});
Ok(()) Ok(())
} }
@ -269,18 +295,11 @@ impl QueueHandle {
/// ///
/// This job will be sent to the server for storage, and will execute after the specified time /// This job will be sent to the server for storage, and will execute after the specified time
/// and when a worker for the job's queue is free to do so. /// and when a worker for the job's queue is free to do so.
pub fn schedule<J>(&self, job: J, after: DateTime<Utc>) -> Result<(), Error> pub fn blocking_schedule<J>(&self, job: J, after: DateTime<Utc>) -> Result<(), Error>
where where
J: Job, J: Job,
{ {
let job = new_scheduled_job(job, after)?; self.tokio_rt.block_on(self.schedule(job, after))
let server = self.inner.clone();
self.arbiter.spawn(async move {
if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e);
}
});
Ok(())
} }
/// Queues a job for recurring execution /// Queues a job for recurring execution

View file

@ -109,7 +109,7 @@ impl Server {
trace!("Trying to find job for worker {}", worker.id()); trace!("Trying to find job for worker {}", worker.id());
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
if let Err(job) = worker.process(job).await { if let Err(job) = worker.process(job).await {
error!("Worker has hung up"); error!("Worker {} has hung up", worker.id());
self.storage.return_job(job.unexecuted()).await? self.storage.return_job(job.unexecuted()).await?
} }
} else { } else {

View file

@ -1,5 +1,4 @@
use crate::Server; use crate::Server;
use actix_rt::spawn;
use background_jobs_core::{CachedProcessorMap, JobInfo}; use background_jobs_core::{CachedProcessorMap, JobInfo};
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Sender};
use tracing::{error, info, warn, Span}; use tracing::{error, info, warn, Span};
@ -57,55 +56,54 @@ impl Worker for LocalWorkerHandle {
} }
} }
pub(crate) fn local_worker<State>( pub(crate) async fn local_worker<State>(
queue: String, queue: String,
processors: CachedProcessorMap<State>, processors: CachedProcessorMap<State>,
server: Server, server: Server,
) where ) where
State: Clone + 'static, State: Clone + 'static,
{ {
spawn(async move { let id = Uuid::new_v4();
let id = Uuid::new_v4(); let (tx, mut rx) = channel(16);
let (tx, mut rx) = channel(16);
let handle = LocalWorkerHandle { tx, id, queue }; let handle = LocalWorkerHandle { tx, id, queue };
loop { loop {
let span = handle.span("request"); let span = handle.span("request");
if let Err(e) = server if let Err(e) = server
.request_job(Box::new(handle.clone())) .request_job(Box::new(handle.clone()))
.instrument(span.clone()) .instrument(span.clone())
.await .await
{ {
let display = format!("{}", e);
let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display));
span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| error!("Failed to notify server of ready worker, {}", e));
break;
}
drop(span);
if let Some(job) = rx.recv().await {
let return_job = processors
.process(job)
.instrument(handle.span("process"))
.await;
let span = handle.span("return");
if let Err(e) = server.return_job(return_job).instrument(span.clone()).await {
let display = format!("{}", e); let display = format!("{}", e);
let debug = format!("{:?}", e); let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display)); span.record("exception.message", &tracing::field::display(&display));
span.record("exception.details", &tracing::field::display(&debug)); span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| error!("Failed to notify server of ready worker, {}", e)); span.in_scope(|| warn!("Failed to return completed job, {}", e));
break;
}
drop(span);
if let Some(job) = rx.recv().await {
let return_job = processors
.process(job)
.instrument(handle.span("process"))
.await;
let span = handle.span("return");
if let Err(e) = server.return_job(return_job).instrument(span.clone()).await {
let display = format!("{}", e);
let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display));
span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| warn!("Failed to return completed job, {}", e));
}
continue;
} }
break; continue;
} }
handle.span("closing").in_scope(|| info!("Worker closing"));
}); break;
}
handle.span("closing").in_scope(|| info!("Worker closing"));
} }