From bf65fe802ad0862554ecc50ad58efc2611ce0dff Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Mon, 17 Jan 2022 17:45:24 -0600 Subject: [PATCH] Remove chrono, use std time types for public api --- Cargo.toml | 8 +++--- examples/basic-example/Cargo.toml | 4 +-- examples/basic-example/src/main.rs | 9 ++++--- examples/long-example/Cargo.toml | 4 +-- examples/long-example/src/main.rs | 8 +++--- examples/managed-example/Cargo.toml | 4 +-- examples/managed-example/src/main.rs | 12 ++++----- examples/panic-example/Cargo.toml | 5 ++-- examples/panic-example/src/main.rs | 10 +++---- jobs-actix/Cargo.toml | 9 +++---- jobs-actix/src/lib.rs | 37 ++++++++++++++------------ jobs-core/Cargo.toml | 4 +-- jobs-core/src/job.rs | 5 ++-- jobs-core/src/job_info.rs | 33 +++++++++++------------ jobs-core/src/processor_map.rs | 14 +++------- jobs-core/src/stats.rs | 8 +++--- jobs-core/src/storage.rs | 10 +++---- jobs-sled/Cargo.toml | 5 ++-- jobs-sled/src/lib.rs | 4 +-- src/lib.rs | 39 +++++++++++++++------------- 20 files changed, 111 insertions(+), 121 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f6063e0..2e55335 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "background-jobs" description = "Background Jobs implemented with actix and futures" -version = "0.11.0" +version = "0.12.0" license = "AGPL-3.0" authors = ["asonix "] -repository = "https://git.asonix.dog/Aardwolf/background-jobs" +repository = "https://git.asonix.dog/asonix/background-jobs" readme = "README.md" keywords = ["jobs", "processor", "actix"] edition = "2021" @@ -26,10 +26,10 @@ completion-logging = ["background-jobs-core/completion-logging", "error-logging" error-logging = ["background-jobs-core/error-logging"] [dependencies.background-jobs-core] -version = "0.11.0" +version = "0.12.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.11.0" +version = "0.12.0" path = "jobs-actix" optional = true diff --git a/examples/basic-example/Cargo.toml b/examples/basic-example/Cargo.toml index 838c189..9836bab 100644 --- a/examples/basic-example/Cargo.toml +++ b/examples/basic-example/Cargo.toml @@ -9,10 +9,8 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -async-trait = "0.1.24" -background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } -chrono = "0.4" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs index f5bbdfe..38e09a7 100644 --- a/examples/basic-example/src/main.rs +++ b/examples/basic-example/src/main.rs @@ -2,8 +2,10 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; -use chrono::{Duration, Utc}; -use std::future::{ready, Ready}; +use std::{ + future::{ready, Ready}, + time::{Duration, SystemTime}, +}; use tracing::info; use tracing_subscriber::EnvFilter; @@ -46,7 +48,7 @@ async fn main() -> Result<(), Error> { queue_handle.queue(MyJob::new(3, 4)).await?; queue_handle.queue(MyJob::new(5, 6)).await?; queue_handle - .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) .await?; // Block on Actix @@ -75,7 +77,6 @@ impl MyJob { } } -#[async_trait::async_trait] impl Job for MyJob { type State = MyState; type Future = Ready>; diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml index 8d94194..151faca 100644 --- a/examples/long-example/Cargo.toml +++ b/examples/long-example/Cargo.toml @@ -9,10 +9,8 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -async-trait = "0.1.24" -background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } -chrono = "0.4" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/long-example/src/main.rs b/examples/long-example/src/main.rs index 6be86e5..2545421 100644 --- a/examples/long-example/src/main.rs +++ b/examples/long-example/src/main.rs @@ -2,10 +2,10 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; -use chrono::{Duration, Utc}; use std::{ future::{ready, Future, Ready}, pin::Pin, + time::{Duration, SystemTime}, }; use tracing::info; use tracing_subscriber::EnvFilter; @@ -56,7 +56,7 @@ async fn main() -> Result<(), Error> { queue_handle.queue(MyJob::new(3, 4)).await?; queue_handle.queue(MyJob::new(5, 6)).await?; queue_handle - .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) .await?; // Block on Actix @@ -85,7 +85,6 @@ impl MyJob { } } -#[async_trait::async_trait] impl Job for MyJob { type State = MyState; type Future = Ready>; @@ -115,7 +114,6 @@ impl Job for MyJob { } } -#[async_trait::async_trait] impl Job for LongJob { type State = MyState; type Future = Pin>>>; @@ -128,7 +126,7 @@ impl Job for LongJob { fn run(self, _: MyState) -> Self::Future { Box::pin(async move { - actix_rt::time::sleep(std::time::Duration::from_secs(120)).await; + actix_rt::time::sleep(Duration::from_secs(120)).await; Ok(()) }) } diff --git a/examples/managed-example/Cargo.toml b/examples/managed-example/Cargo.toml index 0306e1c..9b9330d 100644 --- a/examples/managed-example/Cargo.toml +++ b/examples/managed-example/Cargo.toml @@ -9,10 +9,8 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -async-trait = "0.1.24" -background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } -chrono = "0.4" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/managed-example/src/main.rs b/examples/managed-example/src/main.rs index aae5588..0694222 100644 --- a/examples/managed-example/src/main.rs +++ b/examples/managed-example/src/main.rs @@ -2,8 +2,10 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; -use chrono::{Duration, Utc}; -use std::future::{ready, Ready}; +use std::{ + future::{ready, Ready}, + time::{Duration, SystemTime}, +}; use tracing::info; use tracing_subscriber::EnvFilter; @@ -47,7 +49,7 @@ async fn main() -> Result<(), Error> { manager.queue(MyJob::new(3, 4)).await?; manager.queue(MyJob::new(5, 6)).await?; manager - .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) .await?; // Block on Actix @@ -64,7 +66,7 @@ async fn main() -> Result<(), Error> { manager.queue(MyJob::new(3, 4)).await?; manager.queue(MyJob::new(5, 6)).await?; manager - .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) .await?; // Block on Actix @@ -92,7 +94,6 @@ impl MyJob { } } -#[async_trait::async_trait] impl Job for MyJob { type State = MyState; type Future = Ready>; @@ -122,7 +123,6 @@ impl Job for MyJob { } } -#[async_trait::async_trait] impl Job for StopJob { type State = MyState; type Future = Ready>; diff --git a/examples/panic-example/Cargo.toml b/examples/panic-example/Cargo.toml index 8bb4c43..8bda3e1 100644 --- a/examples/panic-example/Cargo.toml +++ b/examples/panic-example/Cargo.toml @@ -9,10 +9,9 @@ edition = "2021" [dependencies] actix-rt = "2.0.0" anyhow = "1.0" -async-trait = "0.1.24" -background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.12.0", path = "../..", features = ["error-logging"] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } -chrono = "0.4" +time = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.2", features = ["fmt"] } serde = { version = "1.0", features = ["derive"] } diff --git a/examples/panic-example/src/main.rs b/examples/panic-example/src/main.rs index ef70a28..58b53a3 100644 --- a/examples/panic-example/src/main.rs +++ b/examples/panic-example/src/main.rs @@ -2,8 +2,10 @@ use actix_rt::Arbiter; use anyhow::Error; use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs_sled_storage::Storage; -use chrono::{Duration, Utc}; -use std::future::{ready, Ready}; +use std::{ + future::{ready, Ready}, + time::{Duration, SystemTime}, +}; use tracing::info; use tracing_subscriber::EnvFilter; @@ -55,7 +57,7 @@ async fn main() -> Result<(), Error> { queue_handle.queue(MyJob::new(3, 4)).await?; queue_handle.queue(MyJob::new(5, 6)).await?; queue_handle - .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .schedule(MyJob::new(7, 8), SystemTime::now() + Duration::from_secs(2)) .await?; // Block on Actix @@ -84,7 +86,6 @@ impl MyJob { } } -#[async_trait::async_trait] impl Job for MyJob { type State = MyState; type Future = Ready>; @@ -114,7 +115,6 @@ impl Job for MyJob { } } -#[async_trait::async_trait] impl Job for PanickingJob { type State = MyState; type Future = Ready>; diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index b3edec7..cea93fc 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,21 +1,20 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.11.0" +version = "0.12.0" license = "AGPL-3.0" authors = ["asonix "] -repository = "https://git.asonix.dog/Aardwolf/background-jobs" +repository = "https://git.asonix.dog/asonix/background-jobs" keywords = ["jobs", "processor"] readme = "../README.md" edition = "2021" [dependencies] -actix-rt = "2.2.0" +actix-rt = "2.5.1" anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.11.0", path = "../jobs-core", features = ["with-actix"] } -chrono = "0.4" +background-jobs-core = { version = "0.12.0", path = "../jobs-core", features = ["with-actix"] } tracing = "0.1" tracing-futures = "0.2" num_cpus = "1.10.0" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 9cc0ceb..9fbdf87 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -11,10 +11,11 @@ //! used. By default, the number of cores of the running system is used. //! //! ### Example -//! ```rust,ignore +//! ```rust //! use anyhow::Error; -//! use background_jobs::{create_server, Backoff, Job, MaxRetries, WorkerConfig}; -//! use futures::future::{ok, Ready}; +//! use background_jobs_core::{Backoff, Job, MaxRetries}; +//! use background_jobs_actix::WorkerConfig; +//! use std::future::{ready, Ready}; //! //! const DEFAULT_QUEUE: &'static str = "default"; //! @@ -33,7 +34,7 @@ //! async fn main() -> Result<(), Error> { //! // Set up our Storage //! // For this example, we use the default in-memory storage mechanism -//! use background_jobs::memory_storage::Storage; +//! use background_jobs_core::memory_storage::Storage; //! let storage = Storage::new(); //! //! // Configure and start our workers @@ -43,11 +44,11 @@ //! .start(); //! //! // Queue our jobs -//! queue_handle.queue(MyJob::new(1, 2))?; -//! queue_handle.queue(MyJob::new(3, 4))?; -//! queue_handle.queue(MyJob::new(5, 6))?; +//! queue_handle.queue(MyJob::new(1, 2)).await?; +//! queue_handle.queue(MyJob::new(3, 4)).await?; +//! queue_handle.queue(MyJob::new(5, 6)).await?; //! -//! actix_rt::signal::ctrl_c().await?; +//! // actix_rt::signal::ctrl_c().await?; //! //! Ok(()) //! } @@ -69,7 +70,6 @@ //! } //! } //! -//! #[async_trait::async_trait] //! impl Job for MyJob { //! type State = MyState; //! type Future = Ready>; @@ -97,18 +97,18 @@ //! // //! // This value defaults to Backoff::Exponential(2) //! // Jobs can optionally override this value -//! const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(2); +//! const BACKOFF: Backoff = Backoff::Exponential(2); //! //! // When should the job be considered dead //! // //! // The timeout defines when a job is allowed to be considered dead, and so can be retried //! // by the job processor. The value is in milliseconds and defaults to 15,000 -//! const TIMEOUT: i64 = 15_000 +//! const TIMEOUT: i64 = 15_000; //! -//! async fn run(self, state: MyState) -> Self::Future { +//! fn run(self, state: MyState) -> Self::Future { //! println!("{}: args, {:?}", state.app_name, self); //! -//! ok(()) +//! ready(Ok(())) //! } //! } //! ``` @@ -116,8 +116,13 @@ use actix_rt::{Arbiter, ArbiterHandle}; use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; -use chrono::{DateTime, Utc}; -use std::{collections::BTreeMap, marker::PhantomData, ops::Deref, sync::Arc, time::Duration}; +use std::{ + collections::BTreeMap, + marker::PhantomData, + ops::Deref, + sync::Arc, + time::{Duration, SystemTime}, +}; mod every; mod server; @@ -459,7 +464,7 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute after the specified time /// and when a worker for the job's queue is free to do so. - pub async fn schedule(&self, job: J, after: DateTime) -> Result<(), Error> + pub async fn schedule(&self, job: J, after: SystemTime) -> Result<(), Error> where J: Job, { diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index d974647..c482fe9 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-core" description = "Core types for implementing an asynchronous jobs processor" -version = "0.11.1" +version = "0.12.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/asonix/background-jobs" @@ -20,7 +20,7 @@ actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" -chrono = { version = "0.4", features = ["serde"] } +time = { version = "0.3", features = ["serde-human-readable"] } tracing = "0.1" tracing-futures = "0.2.5" serde = { version = "1.0", features = ["derive"] } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index da2b0de..0f64840 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -1,9 +1,8 @@ use crate::{Backoff, JobError, MaxRetries, NewJobInfo}; use anyhow::Error; -use chrono::{offset::Utc, DateTime}; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value; -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, time::SystemTime}; use tracing::Span; use tracing_futures::Instrument; @@ -148,7 +147,7 @@ where } /// Create a NewJobInfo to schedule a job to be performed after a certain time -pub fn new_scheduled_job(job: J, after: DateTime) -> Result +pub fn new_scheduled_job(job: J, after: SystemTime) -> Result where J: Job, { diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index dea3c1d..0b86460 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -1,6 +1,7 @@ use crate::{Backoff, JobResult, JobStatus, MaxRetries, ShouldStop}; -use chrono::{offset::Utc, DateTime, Duration}; use serde_json::Value; +use std::time::SystemTime; +use time::{Duration, OffsetDateTime}; use tracing::trace; use uuid::Uuid; @@ -53,7 +54,7 @@ pub struct NewJobInfo { backoff_strategy: Backoff, /// The time this job should be dequeued - next_queue: Option>, + next_queue: Option, /// Milliseconds from execution until the job is considered dead /// @@ -62,8 +63,8 @@ pub struct NewJobInfo { } impl NewJobInfo { - pub(crate) fn schedule(&mut self, time: DateTime) { - self.next_queue = Some(time); + pub(crate) fn schedule(&mut self, time: SystemTime) { + self.next_queue = Some(time.into()); } pub(crate) fn new( @@ -106,7 +107,7 @@ impl NewJobInfo { max_retries: self.max_retries, next_queue: self.next_queue, backoff_strategy: self.backoff_strategy, - updated_at: Utc::now(), + updated_at: OffsetDateTime::now_utc(), timeout: self.timeout, } } @@ -143,10 +144,10 @@ pub struct JobInfo { backoff_strategy: Backoff, /// The time this job should be dequeued - next_queue: Option>, + next_queue: Option, /// The time this job was last updated - updated_at: DateTime, + updated_at: OffsetDateTime, /// Milliseconds from execution until the job is considered dead /// @@ -161,7 +162,7 @@ impl JobInfo { } fn updated(&mut self) { - self.updated_at = Utc::now(); + self.updated_at = OffsetDateTime::now_utc(); } pub(crate) fn name(&self) -> &str { @@ -191,8 +192,8 @@ impl JobInfo { } /// If the job is queued to run in the future, when is that - pub fn next_queue(&self) -> Option> { - self.next_queue + pub fn next_queue(&self) -> Option { + self.next_queue.map(|time| time.into()) } pub(crate) fn increment(&mut self) -> ShouldStop { @@ -202,7 +203,7 @@ impl JobInfo { } fn set_next_queue(&mut self) { - let now = Utc::now(); + let now = OffsetDateTime::now_utc(); let next_queue = match self.backoff_strategy { Backoff::Linear(secs) => now + Duration::seconds(secs as i64), @@ -218,12 +219,12 @@ impl JobInfo { "Now {}, Next queue {}, ready {}", now, next_queue, - self.is_ready(now), + self.is_ready(now.into()), ); } /// Whether this job is ready to be run - pub fn is_ready(&self, now: DateTime) -> bool { + pub fn is_ready(&self, now: SystemTime) -> bool { match self.next_queue { Some(ref time) => now > *time, None => true, @@ -242,7 +243,7 @@ impl JobInfo { } /// Whether this job is pending execution - pub fn is_pending(&self, now: DateTime) -> bool { + pub fn is_pending(&self, now: SystemTime) -> bool { self.status == JobStatus::Pending || (self.status == JobStatus::Running && (self.updated_at + Duration::milliseconds(self.timeout)) < now) @@ -254,8 +255,8 @@ impl JobInfo { } /// The the date of the most recent update - pub fn updated_at(&self) -> DateTime { - self.updated_at + pub fn updated_at(&self) -> SystemTime { + self.updated_at.into() } pub(crate) fn is_in_queue(&self, queue: &str) -> bool { diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index 7e7a6e0..e739f5d 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -1,7 +1,6 @@ use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo}; -use chrono::Utc; use serde_json::Value; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Instant}; use tracing::{error, Span}; use tracing_futures::Instrument; use uuid::Uuid; @@ -164,7 +163,7 @@ where let args = job.args(); let id = job.id(); - let start = Utc::now(); + let start = Instant::now(); let state_mtx = std::sync::Mutex::new(state); let process_mtx = std::sync::Mutex::new(process_fn); @@ -176,15 +175,10 @@ where Ok(fut) => catch_unwind(fut).await, Err(e) => Err(e), }; - let end = Utc::now(); + let end = Instant::now(); let duration = end - start; - let microseconds = duration.num_microseconds(); - let seconds: f64 = if let Some(m) = microseconds { - m as f64 / 1_000_000_f64 - } else { - 0_f64 - }; + let seconds = duration.as_micros() as f64 / 1_000_000_f64; let span = Span::current(); span.record("job.execution_time", &tracing::field::display(&seconds)); diff --git a/jobs-core/src/stats.rs b/jobs-core/src/stats.rs index c0ba240..95cfa71 100644 --- a/jobs-core/src/stats.rs +++ b/jobs-core/src/stats.rs @@ -1,4 +1,4 @@ -use chrono::{offset::Utc, DateTime, Datelike, Timelike}; +use time::OffsetDateTime; #[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] /// Statistics about the jobs processor @@ -67,7 +67,7 @@ pub struct JobStat { today: usize, this_month: usize, all_time: usize, - updated_at: DateTime, + updated_at: OffsetDateTime, } impl JobStat { @@ -86,7 +86,7 @@ impl JobStat { } fn tick(&mut self) { - let now = Utc::now(); + let now = OffsetDateTime::now_utc(); if now.month() != self.updated_at.month() { self.next_month(); @@ -141,7 +141,7 @@ impl Default for JobStat { today: 0, this_month: 0, all_time: 0, - updated_at: Utc::now(), + updated_at: OffsetDateTime::now_utc(), } } } diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 96b937b..56e2770 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -1,6 +1,5 @@ use crate::{JobInfo, NewJobInfo, ReturnJobInfo, Stats}; -use chrono::offset::Utc; -use std::error::Error; +use std::{error::Error, time::SystemTime}; use tracing::info; use uuid::Uuid; @@ -76,7 +75,7 @@ pub trait Storage: Clone + Send { ) -> Result, Self::Error> { match self.fetch_job_from_queue(queue).await? { Some(mut job) => { - let now = Utc::now(); + let now = SystemTime::now(); if job.is_pending(now) && job.is_ready(now) && job.is_in_queue(queue) { job.run(); self.run_job(job.id(), runner_id).await?; @@ -138,8 +137,7 @@ pub trait Storage: Clone + Send { pub mod memory_storage { use super::{JobInfo, Stats}; use async_mutex::Mutex; - use chrono::Utc; - use std::{collections::HashMap, convert::Infallible, sync::Arc}; + use std::{collections::HashMap, convert::Infallible, sync::Arc, time::SystemTime}; use uuid::Uuid; #[derive(Clone)] @@ -207,7 +205,7 @@ pub mod memory_storage { async fn fetch_job_from_queue(&self, queue: &str) -> Result, Self::Error> { let mut inner = self.inner.lock().await; - let now = Utc::now(); + let now = SystemTime::now(); let j = inner .queues diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 4864237..b18807a 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -4,7 +4,7 @@ description = "Sled storage backend for background-jobs" version = "0.10.0" license = "AGPL-3.0" authors = ["asonix "] -repository = "https://git.asonix.dog/Aardwolf/background-jobs" +repository = "https://git.asonix.dog/asonix/background-jobs" readme = "../README.md" edition = "2021" @@ -13,9 +13,8 @@ edition = "2021" [dependencies] actix-rt = "2.0.1" async-trait = "0.1.24" -background-jobs-core = { version = "0.11.0", path = "../jobs-core" } +background-jobs-core = { version = "0.12.0", path = "../jobs-core" } bincode = "1.2" -chrono = "0.4" sled = "0.34" serde_cbor = "0.11" thiserror = "1.0" diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 62858d9..b7f932a 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -15,8 +15,8 @@ use actix_rt::task::{spawn_blocking, JoinError}; use background_jobs_core::{JobInfo, Stats}; -use chrono::offset::Utc; use sled::{Db, Tree}; +use std::time::SystemTime; use uuid::Uuid; /// The error produced by sled storage calls @@ -110,7 +110,7 @@ impl background_jobs_core::Storage for Storage { Ok(spawn_blocking(move || { let mut job; - let now = Utc::now(); + let now = SystemTime::now(); while { let job_opt = this diff --git a/src/lib.rs b/src/lib.rs index e2c10a2..6eedc0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,10 +28,9 @@ //! #### Add Background Jobs to your project //! ```toml //! [dependencies] -//! actix = "0.8" -//! background-jobs = "0.6.0" +//! actix-rt = "2.6.0" //! anyhow = "1.0" -//! futures = "0.1" +//! background-jobs = "0.12.0" //! serde = { version = "1.0", features = ["derive"] } //! ``` //! @@ -42,7 +41,7 @@ //! ```rust,ignore //! use anyhow::Error; //! use background_jobs::Job; -//! use futures::future::{ok, Ready}; +//! use std::future::{ready, Ready}; //! //! #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] //! pub struct MyJob { @@ -65,10 +64,10 @@ //! //! const NAME: &'static str = "MyJob"; //! -//! fn run(self, _: Self::State) -> Self::Future { -//! println!("args: {:?}", self); +//! fn run(self, state: ()) -> Self::Future { +//! info!("{}: args, {:?}", state.app_name, self); //! -//! ok(()) +//! ready(Ok(())) //! } //! } //! ``` @@ -83,7 +82,7 @@ //! ```rust,ignore //! use anyhow::Error; //! use background_jobs::Job; -//! use futures::future::{ok, Ready}; +//! use std::future::{ready, Ready}; //! //! #[derive(Clone, Debug)] //! pub struct MyState { @@ -104,10 +103,10 @@ //! //! const NAME: &'static str = "MyJob"; //! -//! fn run(self, state: Self::State) -> Self::Future { +//! fn run(self, state: MyState) -> Self::Future { //! info!("{}: args, {:?}", state.app_name, self); //! -//! ok(()) +//! ready(Ok(())) //! } //! } //! ``` @@ -133,15 +132,19 @@ //! let storage = Storage::new(); //! //! // Configure and start our workers -//! let queue_handle = WorkerConfig::new(move || MyState::new("My App")) -//! .register::() -//! .set_processor_count(DEFAULT_QUEUE, 16) -//! .start(storage); +//! let arbiter = Arbiter::new(); +//! +//! // Configure and start our workers +//! let queue_handle = +//! WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App")) +//! .register::() +//! .set_worker_count(DEFAULT_QUEUE, 16) +//! .start(); //! //! // Queue our jobs -//! queue_handle.queue(MyJob::new(1, 2))?; -//! queue_handle.queue(MyJob::new(3, 4))?; -//! queue_handle.queue(MyJob::new(5, 6))?; +//! queue_handle.queue(MyJob::new(1, 2)).await?; +//! queue_handle.queue(MyJob::new(3, 4)).await?; +//! queue_handle.queue(MyJob::new(5, 6)).await?; //! //! // Block on Actix //! actix_rt::signal::ctrl_c().await?; @@ -151,7 +154,7 @@ //! //! ##### Complete Example //! For the complete example project, see -//! [the examples folder](https://git.asonix.dog/Aardwolf/background-jobs/src/branch/master/examples/actix-example) +//! [the examples folder](https://git.asonix.dog/asonix/background-jobs/src/branch/main/examples/actix-example) //! //! #### Bringing your own server/worker implementation //! If you want to create your own jobs processor based on this idea, you can depend on the