diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 093a1f5..299bf4d 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -11,7 +11,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-rt = "2.0.1" async-trait = "0.1.24" background-jobs-core = { version = "0.17.0", path = "../jobs-core" } bincode = "1.2" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 2916ea4..1299894 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -13,7 +13,6 @@ //! let queue_handle = ServerConfig::new(storage).thread_count(8).start(); //! ``` -use actix_rt::task::JoinError; use background_jobs_core::{JobInfo, JobResult, NewJobInfo, ReturnJobInfo}; use sled::{Db, Tree}; use std::{ @@ -22,7 +21,10 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use tokio::sync::Notify; +use tokio::{ + sync::Notify, + task::{JoinError, JoinHandle}, +}; use uuid::{NoContext, Timestamp, Uuid}; /// The error produced by sled storage calls @@ -95,6 +97,25 @@ struct Inner { _db: Db, } +#[cfg(tokio_unstable)] +fn spawn_blocking(name: &str, f: F) -> std::io::Result> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + tokio::task::Builder::new().name(name).spawn_blocking(f) +} + +#[cfg(not(tokio_unstable))] +fn spawn_blocking(name: &str, f: F) -> std::io::Result> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let _ = name; + Ok(tokio::task::spawn_blocking(f)) +} + #[async_trait::async_trait] impl background_jobs_core::Storage for Storage { type Error = Error; @@ -103,20 +124,14 @@ impl background_jobs_core::Storage for Storage { async fn info(&self, job_id: Uuid) -> Result> { let this = self.clone(); - tokio::task::Builder::new() - .name("jobs-info") - .spawn_blocking(move || this.get(job_id))? - .await? + spawn_blocking("jobs-info", move || this.get(job_id))?.await? } #[tracing::instrument(skip_all)] async fn push(&self, job: NewJobInfo) -> Result { let this = self.clone(); - tokio::task::Builder::new() - .name("jobs-push") - .spawn_blocking(move || this.insert(job.build()))? - .await? + spawn_blocking("jobs-push", move || this.insert(job.build()))?.await? } #[tracing::instrument(skip(self))] @@ -126,22 +141,18 @@ impl background_jobs_core::Storage for Storage { let this = self.clone(); let queue2 = queue.to_string(); - if let Some(job) = tokio::task::Builder::new() - .name("jobs-try-pop") - .spawn_blocking(move || this.try_pop(queue2, runner_id))? - .await?? + if let Some(job) = + spawn_blocking("jobs-try-pop", move || this.try_pop(queue2, runner_id))?.await?? { return Ok(job); } let this = self.clone(); let queue2 = queue.to_string(); - let duration = tokio::task::Builder::new() - .name("jobs-next-duration") - .spawn_blocking(move || { - this.next_duration(queue2).unwrap_or(Duration::from_secs(5)) - })? - .await?; + let duration = spawn_blocking("jobs-next-duration", move || { + this.next_duration(queue2).unwrap_or(Duration::from_secs(5)) + })? + .await?; match tokio::time::timeout(duration, notifier.notified()).await { Ok(()) => { @@ -158,19 +169,17 @@ impl background_jobs_core::Storage for Storage { async fn heartbeat(&self, job_id: Uuid, runner_id: Uuid) -> Result<()> { let this = self.clone(); - tokio::task::Builder::new() - .name("jobs-heartbeat") - .spawn_blocking(move || this.set_heartbeat(job_id, runner_id))? - .await? + spawn_blocking("jobs-heartbeat", move || { + this.set_heartbeat(job_id, runner_id) + })? + .await? } #[tracing::instrument(skip(self))] async fn complete(&self, ReturnJobInfo { id, result }: ReturnJobInfo) -> Result { let this = self.clone(); - let mut job = if let Some(job) = tokio::task::Builder::new() - .name("jobs-remove") - .spawn_blocking(move || this.remove_job(id))? - .await?? + let mut job = if let Some(job) = + spawn_blocking("jobs-remove", move || this.remove_job(id))?.await?? { job } else { @@ -183,19 +192,13 @@ impl background_jobs_core::Storage for Storage { // Unregistered or Unexecuted jobs are restored as-is JobResult::Unexecuted | JobResult::Unregistered => { let this = self.clone(); - tokio::task::Builder::new() - .name("jobs-requeue") - .spawn_blocking(move || this.insert(job))? - .await??; + spawn_blocking("jobs-requeue", move || this.insert(job))?.await??; Ok(false) } // retryable failed jobs are restored JobResult::Failure if job.prepare_retry() => { let this = self.clone(); - tokio::task::Builder::new() - .name("jobs-requeue") - .spawn_blocking(move || this.insert(job))? - .await??; + spawn_blocking("jobs-requeue", move || this.insert(job))?.await??; Ok(false) } // dead jobs are removed