Add heartbeat loop to jobs-actix

This commit is contained in:
asonix 2024-01-09 18:06:28 -06:00
parent e02de4a153
commit 2727645ca9

View file

@ -1,6 +1,9 @@
use crate::Server; use crate::Server;
use background_jobs_core::CachedProcessorMap; use background_jobs_core::CachedProcessorMap;
use std::future::Future; use std::{
future::{poll_fn, Future},
pin::Pin,
};
use tracing::{Instrument, Span}; use tracing::{Instrument, Span};
use uuid::Uuid; use uuid::Uuid;
@ -47,14 +50,51 @@ where
} }
} }
async fn time_job<F: Future + Unpin>(mut future: F, job_id: Uuid) -> <F as Future>::Output { async fn heartbeat_job<F: Future>(
storage: &Server,
future: F,
job_id: Uuid,
runner_id: Uuid,
) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
let mut future = std::pin::pin!(future);
let mut hb_future = Some(storage.heartbeat(job_id, runner_id));
loop {
tokio::select! {
output = &mut future => {
break output;
},
Some(hb_output) = option(hb_future.as_mut()), if hb_future.is_some() => {
hb_future.take();
if let Err(e) = hb_output {
tracing::warn!("Failed to heartbeat: {e}");
}
}
_ = interval.tick() => {
if hb_future.is_none() {
hb_future = Some(storage.heartbeat(job_id, runner_id));
}
}
}
}
}
async fn time_job<F: Future>(future: F, job_id: Uuid) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
interval.tick().await; interval.tick().await;
let mut count = 0; let mut count = 0;
let mut future = std::pin::pin!(future);
loop { loop {
tokio::select! { tokio::select! {
output = &mut future => { break output } output = &mut future => {
break output;
},
_ = interval.tick() => { _ = interval.tick() => {
count += 5; count += 5;
@ -74,6 +114,16 @@ async fn time_job<F: Future + Unpin>(mut future: F, job_id: Uuid) -> <F as Futur
} }
} }
async fn option<F>(opt: Option<&mut F>) -> Option<F::Output>
where
F: Future + Unpin,
{
match opt {
Some(f) => Some(poll_fn(|cx| Pin::new(&mut *f).poll(cx)).await),
None => None,
}
}
pub(crate) async fn local_worker<State, Extras>( pub(crate) async fn local_worker<State, Extras>(
queue: String, queue: String,
processors: CachedProcessorMap<State>, processors: CachedProcessorMap<State>,
@ -124,7 +174,14 @@ pub(crate) async fn local_worker<State, Extras>(
let process_span = make_span(id, &queue, "process"); let process_span = make_span(id, &queue, "process");
let job_id = job.id; let job_id = job.id;
let return_job = process_span let return_job = process_span
.in_scope(|| time_job(Box::pin(processors.process(job)), job_id)) .in_scope(|| {
heartbeat_job(
&server,
time_job(processors.process(job), job_id),
job_id,
id,
)
})
.instrument(process_span) .instrument(process_span)
.await; .await;