From d1d578aa974cc2336f4f83777e8c0f0a7dd86ff1 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 7 Jan 2024 22:46:30 -0600 Subject: [PATCH] Remove actix-job, make ActixSpawner public --- jobs-actix/src/actix_job.rs | 165 +----------------------------------- jobs-actix/src/every.rs | 5 +- jobs-actix/src/lib.rs | 12 +-- 3 files changed, 12 insertions(+), 170 deletions(-) diff --git a/jobs-actix/src/actix_job.rs b/jobs-actix/src/actix_job.rs index b3e748d..123c15b 100644 --- a/jobs-actix/src/actix_job.rs +++ b/jobs-actix/src/actix_job.rs @@ -1,10 +1,9 @@ use std::future::Future; -use anyhow::Error; -use background_jobs_core::{Backoff, JoinError, MaxRetries, UnsendJob, UnsendSpawner}; -use serde::{de::DeserializeOwned, ser::Serialize}; -use tracing::Span; +use background_jobs_core::{JoinError, UnsendSpawner}; +/// Provide a spawner for actix-based systems for Unsend Jobs +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ActixSpawner; #[doc(hidden)] @@ -42,161 +41,3 @@ impl Drop for ActixHandle { self.0.abort(); } } - -/// The UnsendJob trait defines parameters pertaining to an instance of a background job -/// -/// This trait is used to implement generic Unsend Jobs in the background jobs library. It requires -/// that implementors specify a spawning mechanism that can turn an Unsend future into a Send -/// future -pub trait ActixJob: Serialize + DeserializeOwned + 'static { - /// The application state provided to this job at runtime. - type State: Clone + 'static; - - /// The future returned by this job - /// - /// Importantly, this Future does not require Send - type Future: Future>; - - /// The name of the job - /// - /// This name must be unique!!! - const NAME: &'static str; - - /// The name of the default queue for this job - /// - /// This can be overridden on an individual-job level, but if a non-existant queue is supplied, - /// the job will never be processed. - const QUEUE: &'static str = "default"; - - /// Define the default number of retries for this job - /// - /// Defaults to Count(5) - /// Jobs can override - const MAX_RETRIES: MaxRetries = MaxRetries::Count(5); - - /// Define the default backoff strategy for this job - /// - /// Defaults to Exponential(2) - /// Jobs can override - const BACKOFF: Backoff = Backoff::Exponential(2); - - /// Define the maximum number of milliseconds a job should be allowed to run before being - /// considered dead. - /// - /// This is important for allowing the job server to reap processes that were started but never - /// completed. - /// - /// Defaults to 15 seconds - /// Jobs can override - const TIMEOUT: i64 = 15_000; - - /// Users of this library must define what it means to run a job. - /// - /// This should contain all the logic needed to complete a job. If that means queuing more - /// jobs, sending an email, shelling out (don't shell out), or doing otherwise lengthy - /// processes, that logic should all be called from inside this method. - /// - /// The state passed into this job is initialized at the start of the application. The state - /// argument could be useful for containing a hook into something like r2d2, or the address of - /// an actor in an actix-based system. - fn run(self, state: Self::State) -> Self::Future; - - /// Generate a Span that the job will be processed within - fn span(&self) -> Option { - None - } - - /// If this job should not use it's default queue, this can be overridden in - /// user-code. - fn queue(&self) -> &str { - Self::QUEUE - } - - /// If this job should not use it's default maximum retry count, this can be - /// overridden in user-code. - fn max_retries(&self) -> MaxRetries { - Self::MAX_RETRIES - } - - /// If this job should not use it's default backoff strategy, this can be - /// overridden in user-code. - fn backoff_strategy(&self) -> Backoff { - Self::BACKOFF - } - - /// Define the maximum number of milliseconds this job should be allowed to run before being - /// considered dead. - /// - /// This is important for allowing the job server to reap processes that were started but never - /// completed. - fn timeout(&self) -> i64 { - Self::TIMEOUT - } -} - -/// Provide helper methods for queuing ActixJobs -pub trait ActixJobExt: ActixJob { - /// Turn an ActixJob into a type that implements Job - fn into_job(self) -> ActixJobWrapper - where - Self: Sized, - { - ActixJobWrapper(self) - } -} - -impl ActixJobExt for T where T: ActixJob {} - -impl From for ActixJobWrapper -where - T: ActixJob, -{ - fn from(value: T) -> Self { - ActixJobWrapper(value) - } -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -// A wrapper for ActixJob implementing UnsendJob with an ActixSpawner -pub struct ActixJobWrapper(T); - -impl UnsendJob for ActixJobWrapper -where - T: ActixJob, -{ - type State = ::State; - - type Future = ::Future; - - type Spawner = ActixSpawner; - - const NAME: &'static str = ::NAME; - const QUEUE: &'static str = ::QUEUE; - const MAX_RETRIES: MaxRetries = ::MAX_RETRIES; - const BACKOFF: Backoff = ::BACKOFF; - const TIMEOUT: i64 = ::TIMEOUT; - - fn run(self, state: Self::State) -> Self::Future { - ::run(self.0, state) - } - - fn span(&self) -> Option { - self.0.span() - } - - fn queue(&self) -> &str { - self.0.queue() - } - - fn max_retries(&self) -> MaxRetries { - self.0.max_retries() - } - - fn backoff_strategy(&self) -> Backoff { - self.0.backoff_strategy() - } - - fn timeout(&self) -> i64 { - self.0.timeout() - } -} diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 5687444..acce66a 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -1,5 +1,6 @@ -use crate::{ActixJob, QueueHandle}; +use crate::QueueHandle; use actix_rt::time::{interval_at, Instant}; +use background_jobs_core::Job; use std::time::Duration; /// A type used to schedule recurring jobs. @@ -10,7 +11,7 @@ use std::time::Duration; /// ``` pub(crate) async fn every(spawner: QueueHandle, duration: Duration, job: J) where - J: ActixJob + Clone + Send, + J: Job + Clone + Send, { let mut interval = interval_at(Instant::now(), duration); diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 0cd4888..567c668 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -136,7 +136,7 @@ mod worker; use self::{every::every, server::Server}; -pub use actix_job::{ActixJob, ActixJobExt}; +pub use actix_job::ActixSpawner; /// A timer implementation for the Memory Storage backend #[derive(Debug, Clone)] @@ -472,9 +472,9 @@ impl QueueHandle { /// job's queue is free to do so. pub async fn queue(&self, job: J) -> Result<(), Error> where - J: ActixJob, + J: Job, { - let job = new_job(job.into_job())?; + let job = new_job(job)?; self.inner.push(job).await?; Ok(()) } @@ -485,9 +485,9 @@ impl QueueHandle { /// and when a worker for the job's queue is free to do so. pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> where - J: ActixJob, + J: Job, { - let job = new_scheduled_job(job.into_job(), after)?; + let job = new_scheduled_job(job, after)?; self.inner.push(job).await?; Ok(()) } @@ -498,7 +498,7 @@ impl QueueHandle { /// processed whenever workers are free to do so. pub fn every(&self, duration: Duration, job: J) where - J: ActixJob + Clone + Send + 'static, + J: Job + Clone + Send + 'static, { actix_rt::spawn(every(self.clone(), duration, job)); }