From c8f1f6cd34d4fd946ce9e66c79e23b59142dddd1 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 16 Nov 2018 19:10:31 -0600 Subject: [PATCH] Mark jobs staged, not running Clear staged jobs on startup --- Cargo.toml | 10 +-- TODO | 5 -- examples/actix-jobs-example/Cargo.toml | 4 +- examples/actix-jobs-example/src/main.rs | 6 +- examples/server-jobs-example/Cargo.toml | 4 +- .../server-jobs-example/src/bin/server.rs | 2 +- .../server-jobs-example/src/bin/spawner.rs | 2 +- .../server-jobs-example/src/bin/worker.rs | 2 +- examples/server-jobs-example/src/lib.rs | 2 +- examples/tokio-jobs-example/Cargo.toml | 4 +- examples/tokio-jobs-example/src/main.rs | 6 +- jobs-actix/Cargo.toml | 2 +- jobs-actix/src/lib.rs | 4 +- jobs-core/Cargo.toml | 2 +- jobs-core/src/job_info.rs | 8 ++ jobs-core/src/lib.rs | 3 + jobs-core/src/storage.rs | 87 ++++++++++++++++++- jobs-server-tokio/Cargo.toml | 2 +- jobs-server-tokio/src/server/mod.rs | 3 +- jobs-server-tokio/src/server/pull.rs | 2 +- jobs-server-tokio/src/server/push.rs | 4 +- jobs-server-tokio/src/server/stalled.rs | 2 +- jobs-server-tokio/src/spawner.rs | 2 +- jobs-server-tokio/src/worker/config.rs | 61 +++++++++---- jobs-server-tokio/src/worker/mod.rs | 2 +- jobs-tokio/Cargo.toml | 2 +- jobs-tokio/src/lib.rs | 4 +- src/lib.rs | 14 +-- 28 files changed, 186 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e0a82ba..6e8eeac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,23 +16,23 @@ members = [ ] [features] -default = ["jobs-actix", "jobs-server-tokio", "jobs-server-tokio/tokio-zmq", "jobs-tokio"] +default = ["background-jobs-actix", "background-jobs-server-tokio", "background-jobs-server-tokio/tokio-zmq", "background-jobs-tokio"] -[dependencies.jobs-actix] +[dependencies.background-jobs-actix] version = "0.1" path = "jobs-actix" optional = true -[dependencies.jobs-core] +[dependencies.background-jobs-core] version = "0.1" path = "jobs-core" -[dependencies.jobs-server-tokio] +[dependencies.background-jobs-server-tokio] version = "0.1" path = "jobs-server-tokio" optional = true -[dependencies.jobs-tokio] +[dependencies.background-jobs-tokio] version = "0.1" path = "jobs-tokio" optional = true diff --git a/TODO b/TODO index bb54e9e..0ca4c53 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,2 @@ 1. Gracefull Shutdown - -2. -Don't mark pushed jobs as running, mark them as 'staged' -Clear staged jobs that are 10 minutes old -Send a Running notification from the worker to move a job from 'staged' to 'running' diff --git a/examples/actix-jobs-example/Cargo.toml b/examples/actix-jobs-example/Cargo.toml index 7ca1edc..51ecb5c 100644 --- a/examples/actix-jobs-example/Cargo.toml +++ b/examples/actix-jobs-example/Cargo.toml @@ -14,8 +14,8 @@ log = "0.4" serde = "1.0" serde_derive = "1.0" -[dependencies.jobs] +[dependencies.background-jobs] version = "0.1" path = "../.." default-features = false -features = ["jobs-actix"] +features = ["background-jobs-actix"] diff --git a/examples/actix-jobs-example/src/main.rs b/examples/actix-jobs-example/src/main.rs index ae0e609..a864b21 100644 --- a/examples/actix-jobs-example/src/main.rs +++ b/examples/actix-jobs-example/src/main.rs @@ -3,9 +3,9 @@ extern crate log; #[macro_use] extern crate serde_derive; +use background_jobs::{Backoff, JobsBuilder, MaxRetries, Processor, QueueJob}; use failure::Error; use futures::{future::IntoFuture, Future}; -use jobs::{Backoff, JobsBuilder, MaxRetries, Processor, QueueJob}; #[derive(Clone, Debug, Deserialize, Serialize)] struct MyJobArguments { @@ -23,6 +23,10 @@ impl Processor for MyProcessor { "MyProcessor" } + fn queue() -> &'static str { + "default" + } + fn max_retries() -> MaxRetries { MaxRetries::Count(1) } diff --git a/examples/server-jobs-example/Cargo.toml b/examples/server-jobs-example/Cargo.toml index 8829764..9d22b54 100644 --- a/examples/server-jobs-example/Cargo.toml +++ b/examples/server-jobs-example/Cargo.toml @@ -14,8 +14,8 @@ serde = "1.0" serde_derive = "1.0" tokio = "0.1" -[dependencies.jobs] +[dependencies.background-jobs] version = "0.1" path = "../.." default-features = false -features = ["jobs-server-tokio"] +features = ["background-jobs-server-tokio"] diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs index edb5fff..544e40f 100644 --- a/examples/server-jobs-example/src/bin/server.rs +++ b/examples/server-jobs-example/src/bin/server.rs @@ -1,5 +1,5 @@ +use background_jobs::ServerConfig; use failure::Error; -use jobs::ServerConfig; use server_jobs_example::queue_set; fn main() -> Result<(), Error> { diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs index 522002b..5846ed6 100644 --- a/examples/server-jobs-example/src/bin/spawner.rs +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -1,5 +1,5 @@ +use background_jobs::{Processor, SpawnerConfig}; use futures::{future::lazy, Future}; -use jobs::{Processor, SpawnerConfig}; use server_jobs_example::{MyJobArguments, MyProcessor}; fn main() { diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs index 5d2af9b..ab6cfe4 100644 --- a/examples/server-jobs-example/src/bin/worker.rs +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -1,5 +1,5 @@ +use background_jobs::WorkerConfig; use failure::Error; -use jobs::WorkerConfig; use server_jobs_example::{queue_map, MyProcessor}; fn main() -> Result<(), Error> { diff --git a/examples/server-jobs-example/src/lib.rs b/examples/server-jobs-example/src/lib.rs index 2d5ff52..828e397 100644 --- a/examples/server-jobs-example/src/lib.rs +++ b/examples/server-jobs-example/src/lib.rs @@ -5,9 +5,9 @@ extern crate serde_derive; use std::collections::{BTreeMap, BTreeSet}; +use background_jobs::{Backoff, MaxRetries, Processor}; use failure::Error; use futures::{future::IntoFuture, Future}; -use jobs::{Backoff, MaxRetries, Processor}; pub fn queue_map() -> BTreeMap { let mut map = BTreeMap::new(); diff --git a/examples/tokio-jobs-example/Cargo.toml b/examples/tokio-jobs-example/Cargo.toml index 0ab0fa4..8c099f6 100644 --- a/examples/tokio-jobs-example/Cargo.toml +++ b/examples/tokio-jobs-example/Cargo.toml @@ -14,8 +14,8 @@ serde = "1.0" serde_derive = "1.0" tokio = "0.1" -[dependencies.jobs] +[dependencies.background-jobs] version = "0.1" path = "../.." default-features = false -features = ["jobs-tokio"] +features = ["background-jobs-tokio"] diff --git a/examples/tokio-jobs-example/src/main.rs b/examples/tokio-jobs-example/src/main.rs index f784fba..667b86f 100644 --- a/examples/tokio-jobs-example/src/main.rs +++ b/examples/tokio-jobs-example/src/main.rs @@ -5,12 +5,12 @@ extern crate serde_derive; use std::time::Duration; +use background_jobs::{Backoff, JobRunner, MaxRetries, Processor}; use failure::Error; use futures::{ future::{lazy, IntoFuture}, Future, }; -use jobs::{Backoff, JobRunner, MaxRetries, Processor}; #[derive(Clone, Debug, Deserialize, Serialize)] struct MyJobArguments { @@ -28,6 +28,10 @@ impl Processor for MyProcessor { "MyProcessor" } + fn queue() -> &'static str { + "default" + } + fn max_retries() -> MaxRetries { MaxRetries::Count(1) } diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index 28c2fa6..a14b754 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -10,6 +10,6 @@ failure = "0.1" futures = "0.1" log = "0.4" -[dependencies.jobs-core] +[dependencies.background-jobs-core] version = "0.1" path = "../jobs-core" diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index d69c615..5c622c8 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -13,9 +13,9 @@ use actix::{ fut::wrap_future, utils::IntervalFunc, Actor, ActorFuture, ActorStream, Addr, AsyncContext, Context, ContextFutureSpawner, Handler, Message, ResponseFuture, SyncArbiter, SyncContext, }; +use background_jobs_core::{JobInfo, Processor, Processors, Storage}; use failure::Error; use futures::Future; -use jobs_core::{JobInfo, Processor, Processors, Storage}; fn coerce(res: Result, F>) -> Result where @@ -50,7 +50,7 @@ impl KvActor { } pub fn dequeue_jobs(&self, limit: usize, queue: &str) -> Result, Error> { - let jobs = self.storage.dequeue_job(limit, queue)?; + let jobs = self.storage.stage_jobs(limit, queue)?; Ok(jobs) } diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index 4f494ca..2c9a7bf 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "jobs-core" +name = "background-jobs-core" version = "0.1.0" authors = ["asonix "] edition = "2018" diff --git a/jobs-core/src/job_info.rs b/jobs-core/src/job_info.rs index fae35df..4b53a85 100644 --- a/jobs-core/src/job_info.rs +++ b/jobs-core/src/job_info.rs @@ -122,6 +122,14 @@ impl JobInfo { self.queue == queue } + pub(crate) fn stage(&mut self) { + self.status = JobStatus::Staged; + } + + pub fn run(&mut self) { + self.status = JobStatus::Running; + } + pub(crate) fn pending(&mut self) { self.status = JobStatus::Pending; } diff --git a/jobs-core/src/lib.rs b/jobs-core/src/lib.rs index 961a444..775229d 100644 --- a/jobs-core/src/lib.rs +++ b/jobs-core/src/lib.rs @@ -32,6 +32,9 @@ pub enum JobStatus { /// Job should be queued Pending, + /// Job has been dequeued, but is not yet running + Staged, + /// Job is running Running, diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index cbc166b..1557def 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -14,6 +14,7 @@ use crate::{JobInfo, JobStatus}; struct Buckets<'a> { queued: Bucket<'a, &'a [u8], ValueBuf>>, running: Bucket<'a, &'a [u8], ValueBuf>>, + staged: Bucket<'a, &'a [u8], ValueBuf>>, failed: Bucket<'a, &'a [u8], ValueBuf>>, finished: Bucket<'a, &'a [u8], ValueBuf>>, } @@ -23,6 +24,7 @@ impl<'a> Buckets<'a> { let b = Buckets { queued: store.bucket(Some(Storage::job_queue()))?, running: store.bucket(Some(Storage::job_running()))?, + staged: store.bucket(Some(Storage::job_staged()))?, failed: store.bucket(Some(Storage::job_failed()))?, finished: store.bucket(Some(Storage::job_finished()))?, }; @@ -87,6 +89,56 @@ impl Storage { Ok(new_id) } + pub fn requeue_staged_jobs(&self) -> Result<(), Error> { + let store = self.store.write()?; + let job_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_store()))?; + + let lock_bucket = + store.bucket::<&[u8], ValueBuf>>(Some(Storage::job_lock()))?; + + let buckets = Buckets::new(&store)?; + + let mut write_txn = store.write_txn()?; + let read_txn = store.read_txn()?; + + self.with_lock::<_, (), _>(&lock_bucket, &mut write_txn, b"job-queue", |inner_txn| { + let mut cursor = read_txn.read_cursor(&buckets.staged)?; + match cursor.get(None, CursorOp::First) { + Ok(_) => (), + Err(e) => match e { + Error::NotFound => { + return Ok(()); + } + e => { + return Err(e); + } + }, + } + + let initial_value = Ok(inner_txn) as Result<&mut Txn, Error>; + + let _ = cursor.iter().fold(initial_value, |acc, (key, _)| { + acc.and_then(|inner_txn| { + let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + let job_value = Json::to_value_buf(job)?; + inner_txn.set(&job_bucket, key, job_value)?; + self.queue_job(&buckets, inner_txn, key)?; + + Ok(inner_txn) + }) + })?; + + Ok(()) + })?; + + read_txn.commit()?; + write_txn.commit()?; + + Ok(()) + } + pub fn check_stalled_jobs(&self) -> Result<(), Error> { let store = self.store.write()?; let job_bucket = @@ -146,7 +198,7 @@ impl Storage { Ok(()) } - pub fn dequeue_job(&self, limit: usize, queue: &str) -> Result, Error> { + pub fn stage_jobs(&self, limit: usize, queue: &str) -> Result, Error> { let store = self.store.write()?; trace!("Got store"); @@ -194,10 +246,12 @@ impl Storage { let (_inner_txn, vec) = cursor.iter().fold(initial_value, |acc, (key, _)| { acc.and_then(|(inner_txn, mut jobs)| { if jobs.len() < limit { - let job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + let mut job = inner_txn.get(&job_bucket, &key)?.inner()?.to_serde(); + + job.stage(); if job.is_ready(now) && job.is_in_queue(queue) { - self.run_job(&buckets, inner_txn, key)?; + self.stage_job(&buckets, inner_txn, key)?; jobs.push(job); } @@ -263,6 +317,7 @@ impl Storage { match status { JobStatus::Pending => self.queue_job(&buckets, &mut txn, job_id.as_ref())?, JobStatus::Running => self.run_job(&buckets, &mut txn, job_id.as_ref())?, + JobStatus::Staged => self.stage_job(&buckets, &mut txn, job_id.as_ref())?, JobStatus::Failed => self.fail_job(&buckets, &mut txn, job_id.as_ref())?, JobStatus::Finished => self.finish_job(&buckets, &mut txn, job_id.as_ref())?, } @@ -350,6 +405,21 @@ impl Storage { Ok(queue_map) } + fn stage_job<'env>( + &self, + buckets: &'env Buckets<'env>, + txn: &mut Txn<'env>, + id: &[u8], + ) -> Result<(), Error> { + self.add_job_to(&buckets.staged, txn, id)?; + self.delete_job_from(&buckets.finished, txn, id)?; + self.delete_job_from(&buckets.failed, txn, id)?; + self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.queued, txn, id)?; + + Ok(()) + } + fn queue_job<'env>( &self, buckets: &'env Buckets<'env>, @@ -360,6 +430,7 @@ impl Storage { self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.staged, txn, id)?; Ok(()) } @@ -373,6 +444,7 @@ impl Storage { self.add_job_to(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.queued, txn, id)?; Ok(()) @@ -385,6 +457,7 @@ impl Storage { id: &[u8], ) -> Result<(), Error> { self.add_job_to(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.queued, txn, id)?; @@ -400,6 +473,7 @@ impl Storage { ) -> Result<(), Error> { self.add_job_to(&buckets.finished, txn, id)?; self.delete_job_from(&buckets.running, txn, id)?; + self.delete_job_from(&buckets.staged, txn, id)?; self.delete_job_from(&buckets.failed, txn, id)?; self.delete_job_from(&buckets.queued, txn, id)?; @@ -500,13 +574,14 @@ impl Storage { Ok(item) } - fn buckets() -> [&'static str; 8] { + fn buckets() -> [&'static str; 9] { [ Storage::id_store(), Storage::job_store(), Storage::job_queue(), Storage::job_failed(), Storage::job_running(), + Storage::job_staged(), Storage::job_lock(), Storage::job_finished(), Storage::queue_port(), @@ -533,6 +608,10 @@ impl Storage { "job-running" } + fn job_staged() -> &'static str { + "job-staged" + } + fn job_finished() -> &'static str { "job-finished" } diff --git a/jobs-server-tokio/Cargo.toml b/jobs-server-tokio/Cargo.toml index fa7a05d..7a6a530 100644 --- a/jobs-server-tokio/Cargo.toml +++ b/jobs-server-tokio/Cargo.toml @@ -17,7 +17,7 @@ zmq = "0.8" [features] default = ["tokio-zmq"] -[dependencies.jobs-core] +[dependencies.background-jobs-core] version = "0.1" path = "../jobs-core" diff --git a/jobs-server-tokio/src/server/mod.rs b/jobs-server-tokio/src/server/mod.rs index f259415..e0f3e0e 100644 --- a/jobs-server-tokio/src/server/mod.rs +++ b/jobs-server-tokio/src/server/mod.rs @@ -4,9 +4,9 @@ use std::{ sync::Arc, }; +use background_jobs_core::Storage; use failure::{Error, Fail}; use futures::{future::poll_fn, Future}; -use jobs_core::Storage; use log::{error, info}; use tokio_threadpool::blocking; use zmq::Context; @@ -47,6 +47,7 @@ impl Config { blocking(move || { let storage = Arc::new(Storage::init(runner_id, db_path)?); + storage.requeue_staged_jobs()?; storage.check_stalled_jobs()?; let port_map = storage.get_port_mapping(base_port, queues)?; diff --git a/jobs-server-tokio/src/server/pull.rs b/jobs-server-tokio/src/server/pull.rs index 9add057..7c4eb07 100644 --- a/jobs-server-tokio/src/server/pull.rs +++ b/jobs-server-tokio/src/server/pull.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Duration}; +use background_jobs_core::{JobInfo, Storage}; use failure::{Error, Fail}; use futures::{future::poll_fn, Future, Stream}; #[cfg(feature = "futures-zmq")] use futures_zmq::{prelude::*, Multipart, Pull}; -use jobs_core::{JobInfo, Storage}; use log::{error, info, trace}; use tokio::timer::Delay; use tokio_threadpool::blocking; diff --git a/jobs-server-tokio/src/server/push.rs b/jobs-server-tokio/src/server/push.rs index bbc6e9a..7c4e8f9 100644 --- a/jobs-server-tokio/src/server/push.rs +++ b/jobs-server-tokio/src/server/push.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Duration}; +use background_jobs_core::{JobInfo, Storage}; use failure::Error; use futures::{future::poll_fn, stream::iter_ok, Future, Stream}; #[cfg(feature = "futures-zmq")] use futures_zmq::{prelude::*, Multipart, Push}; -use jobs_core::{JobInfo, Storage}; use log::{error, info}; use tokio::timer::{Delay, Interval}; use tokio_threadpool::blocking; @@ -117,7 +117,7 @@ fn wrap_fetch_queue(storage: Arc, queue: &str) -> Result } fn fetch_queue(storage: Arc, queue: &str) -> Result, Error> { - storage.dequeue_job(100, queue).map_err(Error::from) + storage.stage_jobs(100, queue).map_err(Error::from) } struct ResetPushConfig { diff --git a/jobs-server-tokio/src/server/stalled.rs b/jobs-server-tokio/src/server/stalled.rs index 10587c3..11af649 100644 --- a/jobs-server-tokio/src/server/stalled.rs +++ b/jobs-server-tokio/src/server/stalled.rs @@ -1,8 +1,8 @@ use std::{sync::Arc, time::Duration}; +use background_jobs_core::Storage; use failure::Error; use futures::{future::poll_fn, Future, Stream}; -use jobs_core::Storage; use log::{error, info}; use tokio::timer::{Delay, Interval}; use tokio_threadpool::blocking; diff --git a/jobs-server-tokio/src/spawner.rs b/jobs-server-tokio/src/spawner.rs index e36ce25..8b01e1c 100644 --- a/jobs-server-tokio/src/spawner.rs +++ b/jobs-server-tokio/src/spawner.rs @@ -1,10 +1,10 @@ use std::sync::Arc; +use background_jobs_core::JobInfo; use failure::Error; use futures::{future::IntoFuture, Future}; #[cfg(feature = "futures-zmq")] use futures_zmq::{prelude::*, Push}; -use jobs_core::JobInfo; use log::{debug, trace}; #[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Push}; diff --git a/jobs-server-tokio/src/worker/config.rs b/jobs-server-tokio/src/worker/config.rs index a15c9d6..5c3b362 100644 --- a/jobs-server-tokio/src/worker/config.rs +++ b/jobs-server-tokio/src/worker/config.rs @@ -1,13 +1,13 @@ use std::{sync::Arc, time::Duration}; +use background_jobs_core::{JobInfo, Processors}; use failure::{Error, Fail}; use futures::{ - future::{Either, IntoFuture}, - Future, Stream, + sync::mpsc::{channel, Sender}, + Future, Sink, Stream, }; #[cfg(feature = "futures-zmq")] use futures_zmq::{prelude::*, Multipart, Pull, Push}; -use jobs_core::{JobInfo, Processors}; use log::{error, info}; use tokio::timer::Delay; #[cfg(feature = "tokio-zmq")] @@ -17,6 +17,7 @@ use zmq::{Context, Message}; pub(crate) struct Worker { pull: Pull, push: Push, + push2: Push, push_address: String, pull_address: String, queue: String, @@ -49,6 +50,7 @@ impl Worker { let Worker { push, + push2, pull, push_address: _, pull_address: _, @@ -57,11 +59,25 @@ impl Worker { context: _, } = self; + let (tx, rx) = channel(5); + + tokio::spawn( + rx.map_err(|_| RecvError) + .from_err::() + .and_then(serialize_request) + .forward(push2.sink(1)) + .map(|_| ()) + .or_else(|_| Ok(())), + ); + let fut = pull .stream() .from_err::() - .and_then(move |multipart| wrap_processing(multipart, &processors)) - .forward(push.sink(2)) + .and_then(parse_multipart) + .and_then(move |job| report_running(job, tx.clone())) + .and_then(move |job| process_job(job, &processors)) + .and_then(serialize_request) + .forward(push.sink(1)) .map(move |_| info!("worker for queue {} is shutting down", queue)) .map_err(|e| { error!("Error processing job, {}", e); @@ -105,14 +121,20 @@ impl ResetWorker { Push::builder(self.context.clone()) .connect(&self.push_address) .build() + .join( + Push::builder(self.context.clone()) + .connect(&self.push_address) + .build(), + ) .join( Pull::builder(self.context.clone()) .connect(&self.pull_address) .build(), ) - .map(|(push, pull)| { + .map(|((push, push2), pull)| { let config = Worker { push, + push2, pull, push_address: self.push_address, pull_address: self.pull_address, @@ -142,18 +164,15 @@ fn parse_multipart(mut multipart: Multipart) -> Result { Ok(parsed) } -fn wrap_processing( - multipart: Multipart, - processors: &Processors, -) -> impl Future { - let msg = match parse_multipart(multipart) { - Ok(msg) => msg, - Err(e) => return Either::A(Err(e).into_future()), - }; +fn report_running( + mut job: JobInfo, + push: Sender, +) -> impl Future { + job.run(); - let fut = process_job(msg, processors).and_then(serialize_request); - - Either::B(fut) + push.send(job.clone()) + .map(move |_| job) + .map_err(|_| NotifyError.into()) } fn process_job( @@ -173,3 +192,11 @@ struct ParseError; #[derive(Clone, Debug, Fail)] #[fail(display = "Error processing job")] struct ProcessError; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error notifying running has started")] +struct NotifyError; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error receiving from mpsc")] +struct RecvError; diff --git a/jobs-server-tokio/src/worker/mod.rs b/jobs-server-tokio/src/worker/mod.rs index ccbeca0..4deb4d4 100644 --- a/jobs-server-tokio/src/worker/mod.rs +++ b/jobs-server-tokio/src/worker/mod.rs @@ -1,8 +1,8 @@ use std::{collections::BTreeMap, sync::Arc}; +use background_jobs_core::{Processor, Processors}; use failure::Fail; use futures::Future; -use jobs_core::{Processor, Processors}; use log::{error, info}; use zmq::Context; diff --git a/jobs-tokio/Cargo.toml b/jobs-tokio/Cargo.toml index a12ca41..6ce0ae6 100644 --- a/jobs-tokio/Cargo.toml +++ b/jobs-tokio/Cargo.toml @@ -10,6 +10,6 @@ log = "0.4" tokio = "0.1" tokio-threadpool = "0.1" -[dependencies.jobs-core] +[dependencies.background-jobs-core] version = "0.1" path = "../jobs-core" diff --git a/jobs-tokio/src/lib.rs b/jobs-tokio/src/lib.rs index a869ee7..8504da7 100644 --- a/jobs-tokio/src/lib.rs +++ b/jobs-tokio/src/lib.rs @@ -6,12 +6,12 @@ use std::{ time::{Duration, Instant}, }; +use background_jobs_core::{JobInfo, Processor, Processors, Storage}; use futures::{ future::{poll_fn, Either, IntoFuture}, sync::mpsc::{channel, Receiver, SendError, Sender}, Future, Sink, Stream, }; -use jobs_core::{JobInfo, Processor, Processors, Storage}; use tokio::timer::Interval; use tokio_threadpool::blocking; @@ -86,7 +86,7 @@ fn try_process_job( blocking(move || { storage - .dequeue_job(processor_count, &queue) + .stage_jobs(processor_count, &queue) .map_err(|e| error!("Error dequeuing job, {}", e)) }) .map_err(|e| error!("Error blocking, {}", e)) diff --git a/src/lib.rs b/src/lib.rs index ce0a7ab..e47f700 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,12 @@ -pub use jobs_core::{ +pub use background_jobs_core::{ Backoff, JobError, JobInfo, JobStatus, MaxRetries, Processor, Processors, ShouldStop, Storage, }; -#[cfg(feature = "jobs-tokio")] -pub use jobs_tokio::{JobRunner, ProcessorHandle}; +#[cfg(feature = "background-jobs-tokio")] +pub use background_jobs_tokio::{JobRunner, ProcessorHandle}; -#[cfg(feature = "jobs-actix")] -pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob}; +#[cfg(feature = "background-jobs-actix")] +pub use background_jobs_actix::{JobsActor, JobsBuilder, QueueJob}; -#[cfg(feature = "jobs-server-tokio")] -pub use jobs_server_tokio::{ServerConfig, SpawnerConfig, WorkerConfig}; +#[cfg(feature = "background-jobs-server-tokio")] +pub use background_jobs_server_tokio::{ServerConfig, SpawnerConfig, WorkerConfig};