From dbb814467331770e5425fb30fcb3e13f61af4d61 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 10 Nov 2018 15:58:19 -0600 Subject: [PATCH] Use Push and Pull to transmit jobs. No more req/rep issues --- .../server-jobs-example/src/bin/server.rs | 2 +- .../server-jobs-example/src/bin/spawner.rs | 2 +- .../server-jobs-example/src/bin/worker.rs | 8 +- jobs-server-tokio/Cargo.toml | 1 - jobs-server-tokio/src/client.rs | 166 ------------ jobs-server-tokio/src/lib.rs | 10 +- jobs-server-tokio/src/server.rs | 251 ++++++++++-------- jobs-server-tokio/src/spawner.rs | 19 +- jobs-server-tokio/src/worker.rs | 166 ++++++++++++ src/lib.rs | 2 +- 10 files changed, 318 insertions(+), 309 deletions(-) delete mode 100644 jobs-server-tokio/src/client.rs create mode 100644 jobs-server-tokio/src/worker.rs diff --git a/examples/server-jobs-example/src/bin/server.rs b/examples/server-jobs-example/src/bin/server.rs index 89db98b..6d55b1d 100644 --- a/examples/server-jobs-example/src/bin/server.rs +++ b/examples/server-jobs-example/src/bin/server.rs @@ -5,7 +5,7 @@ fn main() -> Result<(), Error> { dotenv::dotenv().ok(); env_logger::init(); - let config = ServerConfig::init("127.0.0.1", 5555, 1234, 1, "example-db")?; + let config = ServerConfig::init("127.0.0.1", 5555, 5556, 1, "example-db")?; tokio::run(config.run()); diff --git a/examples/server-jobs-example/src/bin/spawner.rs b/examples/server-jobs-example/src/bin/spawner.rs index 2830efe..64a24bb 100644 --- a/examples/server-jobs-example/src/bin/spawner.rs +++ b/examples/server-jobs-example/src/bin/spawner.rs @@ -9,7 +9,7 @@ fn main() { (y, x + y, acc) }); - let spawner = SpawnerConfig::new("localhost", 5555); + let spawner = SpawnerConfig::new("localhost", 5556); tokio::run(lazy(move || { for job in jobs { diff --git a/examples/server-jobs-example/src/bin/worker.rs b/examples/server-jobs-example/src/bin/worker.rs index 04cc94b..09ee0b6 100644 --- a/examples/server-jobs-example/src/bin/worker.rs +++ b/examples/server-jobs-example/src/bin/worker.rs @@ -1,13 +1,13 @@ use failure::Error; -use jobs::ClientConfig; +use jobs::WorkerConfig; use server_jobs_example::MyProcessor; fn main() -> Result<(), Error> { - let mut client = ClientConfig::init(16, "localhost", 5555)?; + let mut worker = WorkerConfig::init(16, "localhost", 5555, 5556)?; - client.register_processor(MyProcessor); + worker.register_processor(MyProcessor); - tokio::run(client.run()); + tokio::run(worker.run()); Ok(()) } diff --git a/jobs-server-tokio/Cargo.toml b/jobs-server-tokio/Cargo.toml index 0c14e93..9c6f40f 100644 --- a/jobs-server-tokio/Cargo.toml +++ b/jobs-server-tokio/Cargo.toml @@ -9,7 +9,6 @@ failure = "0.1" futures = "0.1" log = "0.4" serde = "1.0" -serde_derive = "1.0" serde_json = "1.0" tokio = "0.1" tokio-threadpool = "0.1" diff --git a/jobs-server-tokio/src/client.rs b/jobs-server-tokio/src/client.rs deleted file mode 100644 index d4d71b2..0000000 --- a/jobs-server-tokio/src/client.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use failure::Error; -use futures::{ - future::{lazy, Either, IntoFuture}, - Future, Stream, -}; -use jobs_core::{Processor, Processors}; -use tokio::timer::Delay; -use tokio_zmq::{prelude::*, Multipart, Req}; -use zmq::{Context, Message}; - -use crate::{ServerRequest, ServerResponse}; - -pub struct ClientConfig { - processors: Vec, - clients: Vec, -} - -impl ClientConfig { - pub fn init( - num_processors: usize, - server_host: &str, - server_port: usize, - ) -> Result { - let ctx = Arc::new(Context::new()); - - let mut clients = Vec::new(); - - let processors = (0..num_processors).map(|_| Processors::new()).collect(); - - for _ in 0..num_processors { - clients.push( - Req::builder(ctx.clone()) - .connect(&format!("tcp://{}:{}", server_host, server_port)) - .build()?, - ); - } - - let cfg = ClientConfig { - processors, - clients, - }; - - Ok(cfg) - } - - pub fn register_processor

(&mut self, processor: P) - where - P: Processor + Send + Sync + 'static, - { - for processors in self.processors.iter_mut() { - processors.register_processor(processor.clone()); - } - } - - pub fn run(self) -> impl Future { - let ClientConfig { - processors, - clients, - } = self; - - lazy(|| { - for (client, processors) in clients.into_iter().zip(processors) { - tokio::spawn(client_future(client, processors)); - } - - Ok(()) - }) - } -} - -fn client_future(req: Req, processors: Processors) -> impl Future { - request_one_job() - .into_future() - .and_then(|multipart| req.send(multipart).from_err()) - .and_then(|req| { - let (sink, stream) = req.sink_stream().split(); - - stream - .from_err() - .and_then(move |multipart| wrap_response(multipart, &processors)) - .forward(sink) - }) - .map_err(|e| error!("Error in client, {}", e)) - .map(|_| ()) -} - -fn request_one_job() -> Result { - serialize_request(ServerRequest::FetchJobs(1)) -} - -fn serialize_request(request: ServerRequest) -> Result { - let request = serde_json::to_string(&request)?; - let msg = Message::from_slice(request.as_ref())?; - - Ok(msg.into()) -} - -fn parse_multipart(mut multipart: Multipart) -> Result { - let message = multipart.pop_front().ok_or(ParseError)?; - - let parsed = serde_json::from_slice(&message)?; - - Ok(parsed) -} - -fn wrap_response( - multipart: Multipart, - processors: &Processors, -) -> impl Future { - let default_request = Either::A(request_one_job().into_future()); - - let msg = match parse_multipart(multipart) { - Ok(msg) => msg, - Err(e) => { - error!("Error parsing response, {}", e); - return default_request; - } - }; - - let fut = process_response(msg, processors).then(move |res| match res { - Ok(request) => serialize_request(request), - Err(e) => { - error!("Error processing response, {}", e); - request_one_job() - } - }); - - Either::B(fut) -} - -fn process_response( - response: ServerResponse, - processors: &Processors, -) -> impl Future { - let either_a = Either::A( - Delay::new(tokio::clock::now() + Duration::from_millis(500)) - .from_err() - .and_then(|_| Ok(ServerRequest::FetchJobs(1))), - ); - - match response { - ServerResponse::FetchJobs(jobs) => { - let job = match jobs.into_iter().next() { - Some(job) => job, - None => return either_a, - }; - - let fut = processors - .process_job(job) - .map(ServerRequest::ReturnJob) - .or_else(|_| Ok(ServerRequest::FetchJobs(1))); - - Either::B(fut) - } - e => { - error!("Error from server, {:?}", e); - return either_a; - } - } -} - -#[derive(Clone, Debug, Fail)] -#[fail(display = "Error parsing response")] -struct ParseError; diff --git a/jobs-server-tokio/src/lib.rs b/jobs-server-tokio/src/lib.rs index 869edbf..3f8bc2a 100644 --- a/jobs-server-tokio/src/lib.rs +++ b/jobs-server-tokio/src/lib.rs @@ -2,20 +2,14 @@ extern crate failure; #[macro_use] extern crate log; -#[macro_use] -extern crate serde_derive; use failure::Error; -mod client; mod server; mod spawner; +mod worker; -pub use crate::{ - client::ClientConfig, - server::{ServerConfig, ServerRequest, ServerResponse}, - spawner::SpawnerConfig, -}; +pub use crate::{server::ServerConfig, spawner::SpawnerConfig, worker::WorkerConfig}; fn coerce(res: Result, F>) -> Result where diff --git a/jobs-server-tokio/src/server.rs b/jobs-server-tokio/src/server.rs index 7ccc5a3..1ad7618 100644 --- a/jobs-server-tokio/src/server.rs +++ b/jobs-server-tokio/src/server.rs @@ -1,169 +1,192 @@ -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use failure::Error; use futures::{ future::{lazy, poll_fn}, + stream::iter_ok, Future, Stream, }; use jobs_core::{JobInfo, Storage}; +use tokio::timer::Interval; use tokio_threadpool::blocking; -use tokio_zmq::{prelude::*, Dealer, Multipart, Rep, Router}; +use tokio_zmq::{prelude::*, Multipart, Pull, Push}; use zmq::{Context, Message}; use crate::coerce; -/// Messages from the client to the server -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum ServerRequest { - /// Request a number of jobs from the server - FetchJobs(usize), - - /// Return a processed job to the server - ReturnJob(JobInfo), +#[derive(Clone)] +struct Config { + ip: String, + job_port: usize, + queue_port: usize, + runner_id: usize, + db_path: PathBuf, + context: Arc, } -/// How the server responds to the client -#[derive(Clone, Debug, Deserialize, Serialize)] -pub enum ServerResponse { - /// Send a list of jobs to the client - FetchJobs(Vec), +impl Config { + fn create_server(&self) -> Result { + let pusher = Push::builder(self.context.clone()) + .bind(&format!("tcp://{}:{}", self.ip, self.job_port)) + .build()?; - /// Send an OK to the client after a job is returned - JobReturned, + let puller = Pull::builder(self.context.clone()) + .bind(&format!("tcp://{}:{}", self.ip, self.queue_port)) + .build()?; - /// Could not parse the client's message - Unparsable, + let storage = Storage::init(self.runner_id, self.db_path.clone())?; - /// Server experienced error - InternalServerError, + let server = ServerConfig { + pusher, + puller, + storage, + config: self.clone(), + }; + + Ok(server) + } } pub struct ServerConfig { - servers: Vec, - dealer: Dealer, - router: Router, + pusher: Push, + puller: Pull, storage: Storage, + // TODO: Recover from failure + #[allow(dead_code)] + config: Config, } impl ServerConfig { pub fn init>( ip: &str, - port: usize, + job_port: usize, + queue_port: usize, runner_id: usize, - server_count: usize, db_path: P, ) -> Result { let context = Arc::new(Context::new()); - let inproc_name = "inproc://jobs-server-tokio"; + Self::init_with_context(ip, job_port, queue_port, runner_id, db_path, context) + } - let dealer = Dealer::builder(context.clone()).bind(inproc_name).build()?; - - let router = Router::builder(context.clone()) - .bind(&format!("tcp://{}:{}", ip, port)) - .build()?; - - let mut servers = Vec::new(); - - for _ in 0..server_count { - servers.push(Rep::builder(context.clone()).connect(inproc_name).build()?); - } - - let storage = Storage::init(runner_id, db_path.as_ref().to_owned())?; - - let cfg = ServerConfig { - servers, - dealer, - router, - storage, + pub fn init_with_context>( + ip: &str, + job_port: usize, + queue_port: usize, + runner_id: usize, + db_path: P, + context: Arc, + ) -> Result { + let config = Config { + ip: ip.to_owned(), + job_port, + queue_port, + runner_id, + db_path: db_path.as_ref().to_owned(), + context, }; - Ok(cfg) + config.create_server() } pub fn run(self) -> impl Future { lazy(|| { let ServerConfig { - servers, - dealer, - router, + pusher, + puller, storage, + config: _, } = self; - for server in servers { - let (sink, stream) = server.sink_stream().split(); - let storage = storage.clone(); + let storage2 = storage.clone(); - let fut = stream - .from_err() - .and_then(move |multipart| { - let storage = storage.clone(); - let res = parse_multipart(multipart); + let fut = Interval::new(tokio::clock::now(), Duration::from_millis(250)) + .from_err() + .and_then(move |_| dequeue_jobs(storage.clone())) + .flatten() + .fold(pusher, move |pusher, multipart| { + Box::new(push_job(pusher, multipart)) + }); - poll_fn(move || { - let res = res.clone(); - let storage = storage.clone(); - blocking(move || wrap_request(res, storage)) - }) - .then(coerce) - }) - .forward(sink); + tokio::spawn( + fut.map(|_| ()) + .map_err(move |e| error!("Error in server, {}", e)), + ); - tokio::spawn( - fut.map(|_| ()) - .map_err(|e| error!("Error in server, {}", e)), - ); - } - - let (deal_sink, deal_stream) = dealer.sink_stream().split(); - let (rout_sink, rout_stream) = router.sink_stream().split(); - - deal_stream - .forward(rout_sink) - .join(rout_stream.forward(deal_sink)) - .map_err(|e| error!("Error in broker, {}", e)) - .map(|_| ()) + puller + .stream() + .from_err() + .and_then(parse_job) + .and_then(move |job| store_job(job, storage2.clone())) + .or_else(|e| Ok(error!("Error storing job, {}", e))) + .for_each(|_| Ok(())) }) } } -fn wrap_request( - res: Result, +fn dequeue_jobs( storage: Storage, -) -> Result { - let res = res.map(move |msg| process_request(msg, storage)); - - let response = match res { - Ok(response) => response, - Err(response) => response, - }; - - Ok(Message::from_slice(serde_json::to_string(&response)?.as_ref())?.into()) +) -> impl Future, Error = Error> { + poll_fn(move || { + let storage = storage.clone(); + blocking(move || wrap_fetch_queue(storage)) + }) + .then(coerce) + .map(|jobs| iter_ok(jobs)) + .or_else(|e| { + error!("Error fetching jobs, {}", e); + Ok(iter_ok(vec![])) + }) } -fn parse_multipart(mut multipart: Multipart) -> Result { - let unparsed_msg = match multipart.pop_front() { - Some(msg) => msg, - None => return Err(ServerResponse::Unparsable), - }; - - match serde_json::from_slice(&unparsed_msg) { - Ok(msg) => Ok(msg), - Err(_) => Err(ServerResponse::Unparsable), - } +fn push_job(pusher: Push, message: Multipart) -> impl Future { + pusher.send(message).map_err(Error::from) } -fn process_request(request: ServerRequest, storage: Storage) -> ServerResponse { - match request { - ServerRequest::FetchJobs(limit) => storage - .dequeue_job(limit) - .map(ServerResponse::FetchJobs) - .map_err(|e| error!("Error fetching jobs, {}", e)) - .unwrap_or(ServerResponse::InternalServerError), - ServerRequest::ReturnJob(job) => storage - .store_job(job) - .map(|_| ServerResponse::JobReturned) - .map_err(|e| error!("Error returning job, {}", e)) - .unwrap_or(ServerResponse::InternalServerError), - } +fn store_job(job: JobInfo, storage: Storage) -> impl Future { + let storage = storage.clone(); + + poll_fn(move || { + let job = job.clone(); + let storage = storage.clone(); + + blocking(move || storage.store_job(job).map_err(Error::from)).map_err(Error::from) + }) + .then(coerce) } + +fn wrap_fetch_queue(storage: Storage) -> Result, Error> { + let response = fetch_queue(storage)?; + + let jobs = response + .into_iter() + .map(|job| { + serde_json::to_string(&job) + .map_err(Error::from) + .and_then(|json| Message::from_slice(json.as_ref()).map_err(Error::from)) + .map(Multipart::from) + }) + .collect::, Error>>()?; + + Ok(jobs) +} + +fn fetch_queue(storage: Storage) -> Result, Error> { + storage.dequeue_job(100).map_err(Error::from) +} + +fn parse_job(mut multipart: Multipart) -> Result { + let unparsed_msg = multipart.pop_front().ok_or(EmptyMessage)?; + + let parsed = serde_json::from_slice(&unparsed_msg)?; + + Ok(parsed) +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Message was empty")] +pub struct EmptyMessage; diff --git a/jobs-server-tokio/src/spawner.rs b/jobs-server-tokio/src/spawner.rs index fd9d0ad..d5be08b 100644 --- a/jobs-server-tokio/src/spawner.rs +++ b/jobs-server-tokio/src/spawner.rs @@ -3,28 +3,26 @@ use std::sync::Arc; use failure::Error; use futures::{future::IntoFuture, Future}; use jobs_core::JobInfo; -use tokio_zmq::{prelude::*, Req}; +use tokio_zmq::{prelude::*, Push}; use zmq::{Context, Message}; -use crate::ServerRequest; - pub struct SpawnerConfig { server: String, ctx: Arc, } impl SpawnerConfig { - pub fn new(server_host: &str, server_port: usize) -> Self { + pub fn new(server_host: &str, queue_port: usize) -> Self { let ctx = Arc::new(Context::new()); SpawnerConfig { - server: format!("tcp://{}:{}", server_host, server_port), + server: format!("tcp://{}:{}", server_host, queue_port), ctx, } } pub fn queue(&self, job: JobInfo) -> impl Future { - let msg = serde_json::to_string(&ServerRequest::ReturnJob(job)) + let msg = serde_json::to_string(&job) .map_err(Error::from) .and_then(|s| { Message::from_slice(s.as_ref()) @@ -33,17 +31,12 @@ impl SpawnerConfig { }) .into_future(); - Req::builder(self.ctx.clone()) + Push::builder(self.ctx.clone()) .connect(&self.server) .build() .into_future() .from_err() .join(msg) - .and_then(move |(req, msg)| { - req.send(msg) - .from_err() - .and_then(|req| req.recv().from_err()) - .map(|_| ()) - }) + .and_then(move |(req, msg)| req.send(msg).from_err().map(|_| ())) } } diff --git a/jobs-server-tokio/src/worker.rs b/jobs-server-tokio/src/worker.rs new file mode 100644 index 0000000..9551f84 --- /dev/null +++ b/jobs-server-tokio/src/worker.rs @@ -0,0 +1,166 @@ +use std::sync::Arc; + +use failure::Error; +use futures::{ + future::{lazy, Either, IntoFuture}, + Future, Stream, +}; +use jobs_core::{JobInfo, Processor, Processors}; +use tokio_zmq::{prelude::*, Multipart, Pull, Push}; +use zmq::{Context, Message}; + +struct Worker { + processors: Processors, + pull: Pull, + push: Push, +} + +impl Worker { + pub fn init( + server_host: &str, + job_port: usize, + queue_port: usize, + ctx: Arc, + ) -> Result { + let pull = Pull::builder(ctx.clone()) + .connect(&format!("tcp://{}:{}", server_host, job_port)) + .build()?; + + let push = Push::builder(ctx.clone()) + .connect(&format!("tcp://{}:{}", server_host, queue_port)) + .build()?; + + let processors = Processors::new(); + + let worker = Worker { + processors, + push, + pull, + }; + + Ok(worker) + } + + fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + self.processors.register_processor(processor); + } +} + +pub struct WorkerConfig { + workers: Vec, +} + +impl WorkerConfig { + pub fn init( + num_processors: usize, + server_host: &str, + job_port: usize, + queue_port: usize, + ) -> Result { + let ctx = Arc::new(Context::new()); + + let mut workers = Vec::new(); + + for _ in 0..num_processors { + let worker = Worker::init(server_host, job_port, queue_port, ctx.clone())?; + + workers.push(worker); + } + + let cfg = WorkerConfig { workers }; + + Ok(cfg) + } + + pub fn register_processor

(&mut self, processor: P) + where + P: Processor + Send + Sync + 'static, + { + for worker in self.workers.iter_mut() { + worker.register_processor(processor.clone()); + } + } + + pub fn run(self) -> impl Future { + let WorkerConfig { workers } = self; + + lazy(|| { + for worker in workers.into_iter() { + tokio::spawn(worker_future(worker)); + } + + Ok(()) + }) + } +} + +fn worker_future(worker: Worker) -> impl Future { + let Worker { + push, + pull, + processors, + } = worker; + + pull.stream() + .from_err() + .and_then(move |multipart| wrap_processing(multipart, &processors)) + .map(Some) + .or_else(|e| { + error!("Error processing job, {}", e); + Ok(None) + }) + .filter_map(|item| item) + .forward(push.sink()) + .map_err(|e: Error| error!("Error pushing job, {}", e)) + .map(|_| ()) +} + +fn serialize_request(job: JobInfo) -> Result { + let request = serde_json::to_string(&job)?; + let msg = Message::from_slice(request.as_ref())?; + + Ok(msg.into()) +} + +fn parse_multipart(mut multipart: Multipart) -> Result { + let message = multipart.pop_front().ok_or(ParseError)?; + + let parsed = serde_json::from_slice(&message)?; + + Ok(parsed) +} + +fn wrap_processing( + multipart: Multipart, + processors: &Processors, +) -> impl Future { + let msg = match parse_multipart(multipart) { + Ok(msg) => msg, + Err(e) => return Either::A(Err(e).into_future()), + }; + + let fut = process_job(msg, processors).and_then(serialize_request); + + Either::B(fut) +} + +fn process_job( + job: JobInfo, + processors: &Processors, +) -> impl Future { + processors + .process_job(job.clone()) + .map_err(|_| ProcessError) + .from_err() +} + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error parsing job")] +struct ParseError; + +#[derive(Clone, Debug, Fail)] +#[fail(display = "Error processing job")] +struct ProcessError; diff --git a/src/lib.rs b/src/lib.rs index 279fd40..ce0a7ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,4 +9,4 @@ pub use jobs_tokio::{JobRunner, ProcessorHandle}; pub use jobs_actix::{JobsActor, JobsBuilder, QueueJob}; #[cfg(feature = "jobs-server-tokio")] -pub use jobs_server_tokio::{ClientConfig, ServerConfig, SpawnerConfig}; +pub use jobs_server_tokio::{ServerConfig, SpawnerConfig, WorkerConfig};