From 2727645ca9d44ceefcc7e954694323eb55fd38ef Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 9 Jan 2024 18:06:28 -0600 Subject: [PATCH] Add heartbeat loop to jobs-actix --- jobs-actix/src/worker.rs | 65 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 480fb4f..9b390af 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,6 +1,9 @@ use crate::Server; use background_jobs_core::CachedProcessorMap; -use std::future::Future; +use std::{ + future::{poll_fn, Future}, + pin::Pin, +}; use tracing::{Instrument, Span}; use uuid::Uuid; @@ -47,14 +50,51 @@ where } } -async fn time_job(mut future: F, job_id: Uuid) -> ::Output { +async fn heartbeat_job( + storage: &Server, + future: F, + job_id: Uuid, + runner_id: Uuid, +) -> F::Output { + let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); + + let mut future = std::pin::pin!(future); + + let mut hb_future = Some(storage.heartbeat(job_id, runner_id)); + + loop { + tokio::select! { + output = &mut future => { + break output; + }, + Some(hb_output) = option(hb_future.as_mut()), if hb_future.is_some() => { + hb_future.take(); + + if let Err(e) = hb_output { + tracing::warn!("Failed to heartbeat: {e}"); + } + } + _ = interval.tick() => { + if hb_future.is_none() { + hb_future = Some(storage.heartbeat(job_id, runner_id)); + } + } + } + } +} + +async fn time_job(future: F, job_id: Uuid) -> F::Output { let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); interval.tick().await; let mut count = 0; + let mut future = std::pin::pin!(future); + loop { tokio::select! { - output = &mut future => { break output } + output = &mut future => { + break output; + }, _ = interval.tick() => { count += 5; @@ -74,6 +114,16 @@ async fn time_job(mut future: F, job_id: Uuid) -> (opt: Option<&mut F>) -> Option +where + F: Future + Unpin, +{ + match opt { + Some(f) => Some(poll_fn(|cx| Pin::new(&mut *f).poll(cx)).await), + None => None, + } +} + pub(crate) async fn local_worker( queue: String, processors: CachedProcessorMap, @@ -124,7 +174,14 @@ pub(crate) async fn local_worker( let process_span = make_span(id, &queue, "process"); let job_id = job.id; let return_job = process_span - .in_scope(|| time_job(Box::pin(processors.process(job)), job_id)) + .in_scope(|| { + heartbeat_job( + &server, + time_job(processors.process(job), job_id), + job_id, + id, + ) + }) .instrument(process_span) .await;