diff --git a/Cargo.toml b/Cargo.toml index 8a857dd..c840802 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "jobs-core", "jobs-sled", "examples/basic-example", + "examples/long-example", "examples/managed-example", "examples/panic-example", ] diff --git a/examples/long-example/.gitignore b/examples/long-example/.gitignore new file mode 100644 index 0000000..23f168a --- /dev/null +++ b/examples/long-example/.gitignore @@ -0,0 +1 @@ +/my-sled-db diff --git a/examples/long-example/Cargo.toml b/examples/long-example/Cargo.toml new file mode 100644 index 0000000..8d94194 --- /dev/null +++ b/examples/long-example/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "long-example" +version = "0.1.0" +authors = ["asonix "] +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix-rt = "2.0.0" +anyhow = "1.0" +async-trait = "0.1.24" +background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } +background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } +chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.2", features = ["fmt"] } +serde = { version = "1.0", features = ["derive"] } +sled = "0.34" diff --git a/examples/long-example/src/main.rs b/examples/long-example/src/main.rs new file mode 100644 index 0000000..6be86e5 --- /dev/null +++ b/examples/long-example/src/main.rs @@ -0,0 +1,135 @@ +use actix_rt::Arbiter; +use anyhow::Error; +use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; +use background_jobs_sled_storage::Storage; +use chrono::{Duration, Utc}; +use std::{ + future::{ready, Future, Ready}, + pin::Pin, +}; +use tracing::info; +use tracing_subscriber::EnvFilter; + +const DEFAULT_QUEUE: &str = "default"; + +#[derive(Clone, Debug)] +pub struct MyState { + pub app_name: String, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct MyJob { + some_usize: usize, + other_usize: usize, +} + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct LongJob; + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt::fmt() + .with_env_filter(env_filter) + .init(); + + // Set up our Storage + let db = sled::Config::new().temporary(true).open()?; + let storage = Storage::new(db)?; + + let arbiter = Arbiter::new(); + + // Configure and start our workers + let queue_handle = + WorkerConfig::new_in_arbiter(arbiter.handle(), storage, |_| MyState::new("My App")) + .register::() + .register::() + .set_worker_count(DEFAULT_QUEUE, 16) + .start(); + + // Queue a long job + queue_handle.queue(LongJob).await?; + + // Queue our jobs + queue_handle.queue(MyJob::new(1, 2)).await?; + queue_handle.queue(MyJob::new(3, 4)).await?; + queue_handle.queue(MyJob::new(5, 6)).await?; + queue_handle + .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .await?; + + // Block on Actix + actix_rt::signal::ctrl_c().await?; + + arbiter.stop(); + let _ = arbiter.join(); + + Ok(()) +} + +impl MyState { + pub fn new(app_name: &str) -> Self { + MyState { + app_name: app_name.to_owned(), + } + } +} + +impl MyJob { + pub fn new(some_usize: usize, other_usize: usize) -> Self { + MyJob { + some_usize, + other_usize, + } + } +} + +#[async_trait::async_trait] +impl Job for MyJob { + type State = MyState; + type Future = Ready>; + + // The name of the job. It is super important that each job has a unique name, + // because otherwise one job will overwrite another job when they're being + // registered. + const NAME: &'static str = "MyJob"; + + // The queue that this processor belongs to + // + // Workers have the option to subscribe to specific queues, so this is important to + // determine which worker will call the processor + // + // Jobs can optionally override the queue they're spawned on + const QUEUE: &'static str = DEFAULT_QUEUE; + + // The number of times background-jobs should try to retry a job before giving up + // + // Jobs can optionally override this value + const MAX_RETRIES: MaxRetries = MaxRetries::Count(1); + + fn run(self, state: MyState) -> Self::Future { + info!("{}: args, {:?}", state.app_name, self); + + ready(Ok(())) + } +} + +#[async_trait::async_trait] +impl Job for LongJob { + type State = MyState; + type Future = Pin>>>; + + const NAME: &'static str = "LongJob"; + + const QUEUE: &'static str = DEFAULT_QUEUE; + + const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); + + fn run(self, _: MyState) -> Self::Future { + Box::pin(async move { + actix_rt::time::sleep(std::time::Duration::from_secs(120)).await; + Ok(()) + }) + } +} diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index f85efb3..36c1ebd 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -22,5 +22,5 @@ num_cpus = "1.10.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["rt", "sync"] } +tokio = { version = "1", default-features = false, features = ["macros", "rt", "sync"] } uuid = { version ="0.8.1", features = ["v4", "serde"] } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 3247ceb..6cc20db 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use crate::Server; use background_jobs_core::{CachedProcessorMap, JobInfo}; use tokio::sync::mpsc::{channel, Sender}; @@ -112,6 +114,33 @@ where } } +async fn time_job(mut future: F, job_id: Uuid) -> ::Output { + let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5)); + interval.tick().await; + let mut count = 0; + + loop { + tokio::select! { + output = &mut future => { break output } + _ = interval.tick() => { + count += 5; + + if count > (60 * 60) { + if count % (60 * 20) == 0 { + warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60); + } + } else if count > 60 { + if count % 20 == 0 { + warn!("Job {} is taking a long time: {} minutes", job_id, count / 60); + } + } else { + info!("Job {} is taking a long time: {} seconds", job_id, count); + } + } + } + } +} + async fn local_worker( queue: String, processors: CachedProcessorMap, @@ -152,8 +181,8 @@ async fn local_worker( drop(span); if let Some(job) = rx.recv().await { - let return_job = processors - .process(job) + let job_id = job.id(); + let return_job = time_job(Box::pin(processors.process(job)), job_id) .instrument(handle.span("process")) .await;