From 2cb81ee743a6455be88d6e6e47a59c7228fec7ee Mon Sep 17 00:00:00 2001 From: asonix Date: Wed, 6 Jan 2021 12:24:27 -0600 Subject: [PATCH] Carry Arbiter in QueueHandle to allow spawning from off the runtime --- jobs-actix/Cargo.toml | 2 +- jobs-actix/src/every.rs | 16 +++++++--------- jobs-actix/src/lib.rs | 23 ++++++++++++++--------- jobs-actix/src/server.rs | 18 +++++++++--------- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 16f57be..ffe51f9 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.8.0" +version = "0.8.1" license-file = "../LICENSE" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index b96fe91..acc3b66 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,8 +1,5 @@ use crate::{Job, QueueHandle}; -use actix_rt::{ - spawn, - time::{interval_at, Instant}, -}; +use actix_rt::time::{interval_at, Instant}; use log::error; use std::time::Duration; @@ -12,19 +9,20 @@ use std::time::Duration; /// let server = create_server(storage); /// server.every(Duration::from_secs(60 * 30), MyJob::new()); /// ``` -pub(crate) fn every(spawner: QueueHandle, duration: Duration, job: J) +pub(crate) fn every(spawner: &QueueHandle, duration: Duration, job: J) where - J: Job + Clone, + J: Job + Clone + Send, { - spawn(async move { + let spawner_clone = spawner.clone(); + spawner.arbiter.send(Box::pin(async move { let mut interval = interval_at(Instant::now(), duration); loop { interval.tick().await; - if spawner.queue(job.clone()).is_err() { + if spawner_clone.queue(job.clone()).is_err() { error!("Failed to queue job"); } } - }); + })); } diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 9fd00f3..bdf1442 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -116,7 +116,7 @@ //! } //! ``` -use actix_rt::{spawn, Arbiter}; +use actix_rt::Arbiter; use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use chrono::{DateTime, Utc}; @@ -138,13 +138,15 @@ pub use background_jobs_core::ActixJob; /// and guarded access to jobs via messages. Since we now have futures-aware synchronization /// primitives, the Server has become an object that gets shared between client threads. /// -/// This method should only be called once. +/// This method will panic if not called from an actix runtime pub fn create_server(storage: S) -> QueueHandle where S: Storage + Sync + 'static, { + let arbiter = Arbiter::current(); QueueHandle { - inner: Server::new(storage), + inner: Server::new(&arbiter, storage), + arbiter, } } @@ -204,6 +206,8 @@ where } /// Start the workers in the current arbiter + /// + /// This method will panic if not called from an actix runtime pub fn start(self, queue_handle: QueueHandle) { for (key, count) in self.queues.into_iter() { for _ in 0..count { @@ -239,6 +243,7 @@ where #[derive(Clone)] pub struct QueueHandle { inner: Server, + arbiter: Arbiter, } impl QueueHandle { @@ -252,11 +257,11 @@ impl QueueHandle { { let job = new_job(job)?; let server = self.inner.clone(); - spawn(async move { + self.arbiter.send(Box::pin(async move { if let Err(e) = server.new_job(job).await { error!("Error creating job, {}", e); } - }); + })); Ok(()) } @@ -270,11 +275,11 @@ impl QueueHandle { { let job = new_scheduled_job(job, after)?; let server = self.inner.clone(); - spawn(async move { + self.arbiter.send(Box::pin(async move { if let Err(e) = server.new_job(job).await { error!("Error creating job, {}", e); } - }); + })); Ok(()) } @@ -284,9 +289,9 @@ impl QueueHandle { /// processed whenever workers are free to do so. pub fn every(&self, duration: Duration, job: J) where - J: Job + Clone + 'static, + J: Job + Clone + Send + 'static, { - every(self.clone(), duration, job); + every(self, duration, job); } /// Return an overview of the processor's statistics diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 0a150df..53dfeb3 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -3,8 +3,8 @@ use crate::{ worker::Worker, }; use actix_rt::{ - spawn, time::{interval_at, Instant}, + Arbiter, }; use anyhow::Error; use async_mutex::Mutex; @@ -16,7 +16,7 @@ use std::{ time::Duration, }; -type WorkerQueue = VecDeque>; +type WorkerQueue = VecDeque>; #[derive(Clone)] pub(crate) struct ServerCache { @@ -35,7 +35,7 @@ pub(crate) struct Server { impl Server { /// Create a new Server from a compatible storage implementation - pub(crate) fn new(storage: S) -> Self + pub(crate) fn new(arbiter: &Arbiter, storage: S) -> Self where S: Storage + Sync + 'static, { @@ -45,7 +45,7 @@ impl Server { }; let server2 = server.clone(); - spawn(async move { + arbiter.send(Box::pin(async move { let mut interval = interval_at(Instant::now(), Duration::from_secs(1)); loop { @@ -54,7 +54,7 @@ impl Server { error!("Error while checking database for new jobs, {}", e); } } - }); + })); server2 } @@ -92,7 +92,7 @@ impl Server { pub(crate) async fn request_job( &self, - worker: Box, + worker: Box, ) -> Result<(), Error> { trace!("Worker {} requested job", worker.id()); @@ -104,7 +104,7 @@ impl Server { async fn try_turning( &self, queue: String, - worker: Box, + worker: Box, ) -> Result { trace!("Trying to find job for worker {}", worker.id()); if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { @@ -143,14 +143,14 @@ impl ServerCache { cache.keys().cloned().collect() } - async fn push(&self, queue: String, worker: Box) { + async fn push(&self, queue: String, worker: Box) { let mut cache = self.cache.lock().await; let entry = cache.entry(queue).or_insert_with(VecDeque::new); entry.push_back(worker); } - async fn pop(&self, queue: String) -> Option> { + async fn pop(&self, queue: String) -> Option> { let mut cache = self.cache.lock().await; let mut vec_deque = cache.remove(&queue)?;