jobs-sled: spawn blocking for sled access

This commit is contained in:
asonix 2024-01-10 21:14:14 -06:00
parent 495977b8d8
commit f7f6f901f8
2 changed files with 39 additions and 37 deletions

View file

@ -11,7 +11,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-rt = "2.0.1"
async-trait = "0.1.24"
background-jobs-core = { version = "0.17.0", path = "../jobs-core" }
bincode = "1.2"

View file

@ -13,7 +13,6 @@
//! let queue_handle = ServerConfig::new(storage).thread_count(8).start();
//! ```
use actix_rt::task::JoinError;
use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo};
use sled::{Db, Tree};
use std::{
@ -22,7 +21,10 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::Notify;
use tokio::{
sync::Notify,
task::{JoinError, JoinHandle},
};
use uuid::{NoContext, Timestamp, Uuid};
/// The error produced by sled storage calls
@ -95,6 +97,25 @@ struct Inner {
_db: Db,
}
#[cfg(tokio_unstable)]
fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
tokio::task::Builder::new().name(name).spawn_blocking(f)
}
#[cfg(not(tokio_unstable))]
fn spawn_blocking<F, T>(name: &str, f: F) -> std::io::Result<JoinHandle<T>>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let _ = name;
Ok(tokio::task::spawn_blocking(f))
}
#[async_trait::async_trait]
impl background_jobs_core::Storage for Storage {
type Error = Error;
@ -103,20 +124,14 @@ impl background_jobs_core::Storage for Storage {
async fn info(&self, job_id: Uuid) -> Result<Option<JobInfo>> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-info")
.spawn_blocking(move || this.get(job_id))?
.await?
spawn_blocking("jobs-info", move || this.get(job_id))?.await?
}
#[tracing::instrument(skip_all)]
async fn push(&self, job: NewJobInfo) -> Result<Uuid> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-push")
.spawn_blocking(move || this.insert(job.build()))?
.await?
spawn_blocking("jobs-push", move || this.insert(job.build()))?.await?
}
#[tracing::instrument(skip(self))]
@ -126,22 +141,18 @@ impl background_jobs_core::Storage for Storage {
let this = self.clone();
let queue2 = queue.to_string();
if let Some(job) = tokio::task::Builder::new()
.name("jobs-try-pop")
.spawn_blocking(move || this.try_pop(queue2, runner_id))?
.await??
if let Some(job) =
spawn_blocking("jobs-try-pop", move || this.try_pop(queue2, runner_id))?.await??
{
return Ok(job);
}
let this = self.clone();
let queue2 = queue.to_string();
let duration = tokio::task::Builder::new()
.name("jobs-next-duration")
.spawn_blocking(move || {
this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
})?
.await?;
let duration = spawn_blocking("jobs-next-duration", move || {
this.next_duration(queue2).unwrap_or(Duration::from_secs(5))
})?
.await?;
match tokio::time::timeout(duration, notifier.notified()).await {
Ok(()) => {
@ -158,19 +169,17 @@ impl background_jobs_core::Storage for Storage {
async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-heartbeat")
.spawn_blocking(move || this.set_heartbeat(job_id, runner_id))?
.await?
spawn_blocking("jobs-heartbeat", move || {
this.set_heartbeat(job_id, runner_id)
})?
.await?
}
#[tracing::instrument(skip(self))]
async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result<bool> {
let this = self.clone();
let mut job = if let Some(job) = tokio::task::Builder::new()
.name("jobs-remove")
.spawn_blocking(move || this.remove_job(id))?
.await??
let mut job = if let Some(job) =
spawn_blocking("jobs-remove", move || this.remove_job(id))?.await??
{
job
} else {
@ -183,19 +192,13 @@ impl background_jobs_core::Storage for Storage {
// Unregistered or Unexecuted jobs are restored as-is
JobResult::Unexecuted | JobResult::Unregistered => {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-requeue")
.spawn_blocking(move || this.insert(job))?
.await??;
spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
Ok(false)
}
// retryable failed jobs are restored
JobResult::Failure if job.prepare_retry() => {
let this = self.clone();
tokio::task::Builder::new()
.name("jobs-requeue")
.spawn_blocking(move || this.insert(job))?
.await??;
spawn_blocking("jobs-requeue", move || this.insert(job))?.await??;
Ok(false)
}
// dead jobs are removed