Carry Arbiter in QueueHandle to allow spawning from off the runtime

This commit is contained in:
asonix 2021-01-06 12:24:27 -06:00
parent 8817e8b679
commit 2cb81ee743
4 changed files with 31 additions and 28 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-actix" name = "background-jobs-actix"
description = "in-process jobs processor based on Actix" description = "in-process jobs processor based on Actix"
version = "0.8.0" version = "0.8.1"
license-file = "../LICENSE" license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs" repository = "https://git.asonix.dog/Aardwolf/background-jobs"

View file

@ -1,8 +1,5 @@
use crate::{Job, QueueHandle}; use crate::{Job, QueueHandle};
use actix_rt::{ use actix_rt::time::{interval_at, Instant};
spawn,
time::{interval_at, Instant},
};
use log::error; use log::error;
use std::time::Duration; use std::time::Duration;
@ -12,19 +9,20 @@ use std::time::Duration;
/// let server = create_server(storage); /// let server = create_server(storage);
/// server.every(Duration::from_secs(60 * 30), MyJob::new()); /// server.every(Duration::from_secs(60 * 30), MyJob::new());
/// ``` /// ```
pub(crate) fn every<J>(spawner: QueueHandle, duration: Duration, job: J) pub(crate) fn every<J>(spawner: &QueueHandle, duration: Duration, job: J)
where where
J: Job + Clone, J: Job + Clone + Send,
{ {
spawn(async move { let spawner_clone = spawner.clone();
spawner.arbiter.send(Box::pin(async move {
let mut interval = interval_at(Instant::now(), duration); let mut interval = interval_at(Instant::now(), duration);
loop { loop {
interval.tick().await; interval.tick().await;
if spawner.queue(job.clone()).is_err() { if spawner_clone.queue(job.clone()).is_err() {
error!("Failed to queue job"); error!("Failed to queue job");
} }
} }
}); }));
} }

View file

@ -116,7 +116,7 @@
//! } //! }
//! ``` //! ```
use actix_rt::{spawn, Arbiter}; use actix_rt::Arbiter;
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -138,13 +138,15 @@ pub use background_jobs_core::ActixJob;
/// and guarded access to jobs via messages. Since we now have futures-aware synchronization /// and guarded access to jobs via messages. Since we now have futures-aware synchronization
/// primitives, the Server has become an object that gets shared between client threads. /// primitives, the Server has become an object that gets shared between client threads.
/// ///
/// This method should only be called once. /// This method will panic if not called from an actix runtime
pub fn create_server<S>(storage: S) -> QueueHandle pub fn create_server<S>(storage: S) -> QueueHandle
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
let arbiter = Arbiter::current();
QueueHandle { QueueHandle {
inner: Server::new(storage), inner: Server::new(&arbiter, storage),
arbiter,
} }
} }
@ -204,6 +206,8 @@ where
} }
/// Start the workers in the current arbiter /// Start the workers in the current arbiter
///
/// This method will panic if not called from an actix runtime
pub fn start(self, queue_handle: QueueHandle) { pub fn start(self, queue_handle: QueueHandle) {
for (key, count) in self.queues.into_iter() { for (key, count) in self.queues.into_iter() {
for _ in 0..count { for _ in 0..count {
@ -239,6 +243,7 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct QueueHandle { pub struct QueueHandle {
inner: Server, inner: Server,
arbiter: Arbiter,
} }
impl QueueHandle { impl QueueHandle {
@ -252,11 +257,11 @@ impl QueueHandle {
{ {
let job = new_job(job)?; let job = new_job(job)?;
let server = self.inner.clone(); let server = self.inner.clone();
spawn(async move { self.arbiter.send(Box::pin(async move {
if let Err(e) = server.new_job(job).await { if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e); error!("Error creating job, {}", e);
} }
}); }));
Ok(()) Ok(())
} }
@ -270,11 +275,11 @@ impl QueueHandle {
{ {
let job = new_scheduled_job(job, after)?; let job = new_scheduled_job(job, after)?;
let server = self.inner.clone(); let server = self.inner.clone();
spawn(async move { self.arbiter.send(Box::pin(async move {
if let Err(e) = server.new_job(job).await { if let Err(e) = server.new_job(job).await {
error!("Error creating job, {}", e); error!("Error creating job, {}", e);
} }
}); }));
Ok(()) Ok(())
} }
@ -284,9 +289,9 @@ impl QueueHandle {
/// processed whenever workers are free to do so. /// processed whenever workers are free to do so.
pub fn every<J>(&self, duration: Duration, job: J) pub fn every<J>(&self, duration: Duration, job: J)
where where
J: Job + Clone + 'static, J: Job + Clone + Send + 'static,
{ {
every(self.clone(), duration, job); every(self, duration, job);
} }
/// Return an overview of the processor's statistics /// Return an overview of the processor's statistics

View file

@ -3,8 +3,8 @@ use crate::{
worker::Worker, worker::Worker,
}; };
use actix_rt::{ use actix_rt::{
spawn,
time::{interval_at, Instant}, time::{interval_at, Instant},
Arbiter,
}; };
use anyhow::Error; use anyhow::Error;
use async_mutex::Mutex; use async_mutex::Mutex;
@ -16,7 +16,7 @@ use std::{
time::Duration, time::Duration,
}; };
type WorkerQueue = VecDeque<Box<dyn Worker + Send>>; type WorkerQueue = VecDeque<Box<dyn Worker + Send + Sync>>;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct ServerCache { pub(crate) struct ServerCache {
@ -35,7 +35,7 @@ pub(crate) struct Server {
impl Server { impl Server {
/// Create a new Server from a compatible storage implementation /// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(storage: S) -> Self pub(crate) fn new<S>(arbiter: &Arbiter, storage: S) -> Self
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
@ -45,7 +45,7 @@ impl Server {
}; };
let server2 = server.clone(); let server2 = server.clone();
spawn(async move { arbiter.send(Box::pin(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop { loop {
@ -54,7 +54,7 @@ impl Server {
error!("Error while checking database for new jobs, {}", e); error!("Error while checking database for new jobs, {}", e);
} }
} }
}); }));
server2 server2
} }
@ -92,7 +92,7 @@ impl Server {
pub(crate) async fn request_job( pub(crate) async fn request_job(
&self, &self,
worker: Box<dyn Worker + Send + 'static>, worker: Box<dyn Worker + Send + Sync + 'static>,
) -> Result<(), Error> { ) -> Result<(), Error> {
trace!("Worker {} requested job", worker.id()); trace!("Worker {} requested job", worker.id());
@ -104,7 +104,7 @@ impl Server {
async fn try_turning( async fn try_turning(
&self, &self,
queue: String, queue: String,
worker: Box<dyn Worker + Send + 'static>, worker: Box<dyn Worker + Send + Sync + 'static>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
trace!("Trying to find job for worker {}", worker.id()); trace!("Trying to find job for worker {}", worker.id());
if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await {
@ -143,14 +143,14 @@ impl ServerCache {
cache.keys().cloned().collect() cache.keys().cloned().collect()
} }
async fn push(&self, queue: String, worker: Box<dyn Worker + Send>) { async fn push(&self, queue: String, worker: Box<dyn Worker + Send + Sync>) {
let mut cache = self.cache.lock().await; let mut cache = self.cache.lock().await;
let entry = cache.entry(queue).or_insert_with(VecDeque::new); let entry = cache.entry(queue).or_insert_with(VecDeque::new);
entry.push_back(worker); entry.push_back(worker);
} }
async fn pop(&self, queue: String) -> Option<Box<dyn Worker + Send>> { async fn pop(&self, queue: String) -> Option<Box<dyn Worker + Send + Sync>> {
let mut cache = self.cache.lock().await; let mut cache = self.cache.lock().await;
let mut vec_deque = cache.remove(&queue)?; let mut vec_deque = cache.remove(&queue)?;