diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index c30f1f6..bc9633b 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -23,5 +23,6 @@ tokio = { version = "1", default-features = false, features = [ "macros", "rt", "sync", + "tracing", ] } uuid = { version = "1", features = ["v7", "serde"] } diff --git a/jobs-actix/src/actix_job.rs b/jobs-actix/src/actix_job.rs index 123c15b..a9c6ac9 100644 --- a/jobs-actix/src/actix_job.rs +++ b/jobs-actix/src/actix_job.rs @@ -1,13 +1,14 @@ use std::future::Future; use background_jobs_core::{JoinError, UnsendSpawner}; +use tokio::task::JoinHandle; /// Provide a spawner for actix-based systems for Unsend Jobs #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ActixSpawner; #[doc(hidden)] -pub struct ActixHandle(actix_rt::task::JoinHandle); +pub struct ActixHandle(Option>); impl UnsendSpawner for ActixSpawner { type Handle = ActixHandle where T: Send; @@ -17,7 +18,7 @@ impl UnsendSpawner for ActixSpawner { Fut: Future + 'static, Fut::Output: Send + 'static, { - ActixHandle(actix_rt::spawn(future)) + ActixHandle(crate::spawn::spawn("job-task", future).ok()) } } @@ -30,14 +31,19 @@ impl Future for ActixHandle { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - let res = std::task::ready!(std::pin::Pin::new(&mut self.0).poll(cx)); - - std::task::Poll::Ready(res.map_err(|_| JoinError)) + if let Some(mut handle) = self.0.as_mut() { + let res = std::task::ready!(std::pin::Pin::new(&mut handle).poll(cx)); + std::task::Poll::Ready(res.map_err(|_| JoinError)) + } else { + std::task::Poll::Ready(Err(JoinError)) + } } } impl Drop for ActixHandle { fn drop(&mut self) { - self.0.abort(); + if let Some(handle) = &self.0 { + handle.abort(); + } } } diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index acce66a..d4b741d 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,7 +1,7 @@ use crate::QueueHandle; -use actix_rt::time::{interval_at, Instant}; use background_jobs_core::Job; use std::time::Duration; +use tokio::time::{interval_at, Instant}; /// A type used to schedule recurring jobs. /// diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 567c668..bd2d70a 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -131,6 +131,7 @@ use tokio::sync::Notify; mod actix_job; mod every; mod server; +mod spawn; mod storage; mod worker; @@ -148,9 +149,7 @@ impl Timer for ActixTimer { where F: std::future::Future + Send + Sync, { - actix_rt::time::timeout(duration, future) - .await - .map_err(|_| ()) + tokio::time::timeout(duration, future).await.map_err(|_| ()) } } @@ -444,12 +443,12 @@ where let extras_2 = extras.clone(); arbiter.spawn_fn(move || { - actix_rt::spawn(worker::local_worker( - queue, - processors.cached(), - server, - extras_2, - )); + if let Err(e) = spawn::spawn( + "local-worker", + worker::local_worker(queue, processors.cached(), server, extras_2), + ) { + tracing::error!("Failed to spawn worker {e}"); + } }); } } @@ -496,10 +495,10 @@ impl QueueHandle { /// /// This job will be added to it's queue on the server once every `Duration`. It will be /// processed whenever workers are free to do so. - pub fn every(&self, duration: Duration, job: J) + pub fn every(&self, duration: Duration, job: J) -> std::io::Result<()> where J: Job + Clone + Send + 'static, { - actix_rt::spawn(every(self.clone(), duration, job)); + spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ()) } } diff --git a/jobs-actix/src/spawn.rs b/jobs-actix/src/spawn.rs new file mode 100644 index 0000000..b9cafd3 --- /dev/null +++ b/jobs-actix/src/spawn.rs @@ -0,0 +1,19 @@ +use std::future::Future; + +use tokio::task::JoinHandle; + +#[cfg(tokio_unstable)] +pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> +where + F: Future + 'static, +{ + tokio::task::Builder::new().name(name).spawn_local(future) +} + +#[cfg(not(tokio_unstable))] +pub(crate) fn spawn(name: &str, future: F) -> std::io::Result> +where + F: Future + 'static, +{ + Ok(tokio::task::spawn_local(future)) +} diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 62c77a2..b1738ac 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -24,14 +24,21 @@ impl Drop for LocalWorkerStarter( runner_id: Uuid, heartbeat_interval: u64, ) -> F::Output { - let mut interval = - actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval)); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval)); let mut future = std::pin::pin!(future); @@ -86,7 +92,7 @@ async fn heartbeat_job( } async fn time_job(future: F, job_id: Uuid) -> F::Output { - let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); interval.tick().await; let mut count = 0; @@ -147,7 +153,7 @@ pub(crate) async fn local_worker( let id = Uuid::now_v7(); let log_on_drop = RunOnDrop(|| { - make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing")); + make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing")); }); loop { diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 8df3f68..093a1f5 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -20,6 +20,6 @@ serde = { version = "1", features = ["derive"] } serde_cbor = "0.11" time = { version = "0.3", features = ["serde-human-readable"] } thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["rt", "sync"] } +tokio = { version = "1", default-features = false, features = ["rt", "sync", "tracing"] } tracing = "0.1" uuid = { version = "1", features = ["v7", "serde"] }