Attempt restarting ticker, workers in live arbiters

This commit is contained in:
Aode (lion) 2021-10-29 15:20:04 -05:00
parent 7e1e89e777
commit 7002366d67
6 changed files with 84 additions and 21 deletions

View file

@ -1,5 +1,6 @@
use actix_rt::Arbiter;
use anyhow::Error; use anyhow::Error;
use background_jobs::{create_server, ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs::{create_server_in_arbiter, ActixJob as Job, MaxRetries, WorkerConfig};
use background_jobs_sled_storage::Storage; use background_jobs_sled_storage::Storage;
use chrono::{Duration, Utc}; use chrono::{Duration, Utc};
use std::future::{ready, Ready}; use std::future::{ready, Ready};
@ -34,15 +35,17 @@ async fn main() -> Result<(), Error> {
let db = sled::Config::new().temporary(true).open()?; let db = sled::Config::new().temporary(true).open()?;
let storage = Storage::new(db)?; let storage = Storage::new(db)?;
let arbiter = Arbiter::new();
// Start the application server. This guards access to to the jobs store // Start the application server. This guards access to to the jobs store
let queue_handle = create_server(storage); let queue_handle = create_server_in_arbiter(storage, arbiter.handle());
// Configure and start our workers // Configure and start our workers
WorkerConfig::new(move || MyState::new("My App")) WorkerConfig::new(move || MyState::new("My App"))
.register::<PanickingJob>() .register::<PanickingJob>()
.register::<MyJob>() .register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16) .set_worker_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone()); .start_in_arbiter(&arbiter.handle(), queue_handle.clone());
// Queue some panicking job // Queue some panicking job
for _ in 0..32 { for _ in 0..32 {
@ -59,6 +62,10 @@ async fn main() -> Result<(), Error> {
// Block on Actix // Block on Actix
actix_rt::signal::ctrl_c().await?; actix_rt::signal::ctrl_c().await?;
arbiter.stop();
let _ = arbiter.join();
Ok(()) Ok(())
} }

View file

@ -142,7 +142,7 @@ pub fn create_server<S>(storage: S) -> QueueHandle
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
create_server_in_arbiter(storage, &Arbiter::current()) create_server_in_arbiter(storage, Arbiter::current())
} }
/// Create a new Server /// Create a new Server
@ -152,14 +152,14 @@ where
/// primitives, the Server has become an object that gets shared between client threads. /// primitives, the Server has become an object that gets shared between client threads.
/// ///
/// This method will panic if not called from an actix runtime /// This method will panic if not called from an actix runtime
pub fn create_server_in_arbiter<S>(storage: S, arbiter: &ArbiterHandle) -> QueueHandle pub fn create_server_in_arbiter<S>(storage: S, arbiter: ArbiterHandle) -> QueueHandle
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
let tokio_rt = tokio::runtime::Handle::current(); let tokio_rt = tokio::runtime::Handle::current();
QueueHandle { QueueHandle {
inner: Server::new(&arbiter, storage), inner: Server::new(arbiter, storage),
tokio_rt, tokio_rt,
} }
} }

View file

@ -14,7 +14,7 @@ use std::{
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use tracing::{error, trace}; use tracing::{error, trace, warn};
type WorkerQueue = VecDeque<Box<dyn Worker + Send + Sync>>; type WorkerQueue = VecDeque<Box<dyn Worker + Send + Sync>>;
@ -23,6 +23,34 @@ pub(crate) struct ServerCache {
cache: Arc<Mutex<HashMap<String, WorkerQueue>>>, cache: Arc<Mutex<HashMap<String, WorkerQueue>>>,
} }
struct Ticker {
server: Server,
}
impl Drop for Ticker {
fn drop(&mut self) {
let online = self.server.arbiter.spawn(async move {});
if online {
let server = self.server.clone();
self.server.arbiter.spawn(async move {
let _ticker = server.ticker();
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
} else {
warn!("Not restarting ticker, arbiter is dead");
}
}
}
/// The server Actor /// The server Actor
/// ///
/// This server guards access to Thee storage, and keeps a list of workers that are waiting for /// This server guards access to Thee storage, and keeps a list of workers that are waiting for
@ -31,32 +59,29 @@ pub(crate) struct ServerCache {
pub(crate) struct Server { pub(crate) struct Server {
storage: Arc<dyn ActixStorage + Send + Sync>, storage: Arc<dyn ActixStorage + Send + Sync>,
cache: ServerCache, cache: ServerCache,
arbiter: ArbiterHandle,
} }
impl Server { impl Server {
fn ticker(&self) -> Ticker {
Ticker {
server: self.clone(),
}
}
/// Create a new Server from a compatible storage implementation /// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(arbiter: &ArbiterHandle, storage: S) -> Self pub(crate) fn new<S>(arbiter: ArbiterHandle, storage: S) -> Self
where where
S: Storage + Sync + 'static, S: Storage + Sync + 'static,
{ {
let server = Server { let server = Server {
storage: Arc::new(StorageWrapper(storage)), storage: Arc::new(StorageWrapper(storage)),
cache: ServerCache::new(), cache: ServerCache::new(),
arbiter,
}; };
let server2 = server.clone(); drop(server.ticker());
arbiter.spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop { server
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
server2
} }
async fn check_db(&self) -> Result<(), Error> { async fn check_db(&self) -> Result<(), Error> {

View file

@ -56,6 +56,28 @@ impl Worker for LocalWorkerHandle {
} }
} }
struct LocalWorkerStarter<State: Clone + 'static> {
queue: String,
processors: CachedProcessorMap<State>,
server: Server,
}
impl<State: Clone + 'static> Drop for LocalWorkerStarter<State> {
fn drop(&mut self) {
let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {}));
if let Ok(true) = res {
actix_rt::spawn(local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
));
} else {
warn!("Not restarting worker, Arbiter is dead");
}
}
}
struct LogOnDrop<F>(F) struct LogOnDrop<F>(F)
where where
F: Fn() -> Span; F: Fn() -> Span;
@ -76,6 +98,11 @@ pub(crate) async fn local_worker<State>(
) where ) where
State: Clone + 'static, State: Clone + 'static,
{ {
let starter = LocalWorkerStarter {
queue: queue.clone(),
processors: processors.clone(),
server: server.clone(),
};
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let (tx, mut rx) = channel(16); let (tx, mut rx) = channel(16);
@ -121,4 +148,5 @@ pub(crate) async fn local_worker<State>(
} }
drop(log_on_drop); drop(log_on_drop);
drop(starter);
} }

View file

@ -30,6 +30,7 @@ pub struct ProcessorMap<S> {
/// ///
/// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an /// [`Job`]s must be registered with the `ProcessorMap` in the initialization phase of an
/// application before workers are spawned in order to handle queued jobs. /// application before workers are spawned in order to handle queued jobs.
#[derive(Clone)]
pub struct CachedProcessorMap<S> { pub struct CachedProcessorMap<S> {
inner: HashMap<String, ProcessFn<S>>, inner: HashMap<String, ProcessFn<S>>,
state: S, state: S,

View file

@ -172,4 +172,6 @@ pub mod dev {
} }
#[cfg(feature = "background-jobs-actix")] #[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{create_server, ActixJob, QueueHandle, WorkerConfig}; pub use background_jobs_actix::{
create_server, create_server_in_arbiter, ActixJob, QueueHandle, WorkerConfig,
};