From 960c0235c4fe1ba228b0f59bd44a82c874e67b00 Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 16 Nov 2018 18:09:53 -0600 Subject: [PATCH] Update tokio-zmq, optionally use futures-zmq instead --- Cargo.toml | 2 +- examples/server-jobs-example/.env | 2 +- jobs-server-tokio/Cargo.toml | 12 +++++- jobs-server-tokio/src/lib.rs | 5 --- jobs-server-tokio/src/server/mod.rs | 3 +- jobs-server-tokio/src/server/portmap.rs | 34 +++++++++-------- jobs-server-tokio/src/server/pull.rs | 39 +++++++++---------- jobs-server-tokio/src/server/push.rs | 42 ++++++++++---------- jobs-server-tokio/src/server/stalled.rs | 1 + jobs-server-tokio/src/spawner.rs | 4 ++ jobs-server-tokio/src/worker/config.rs | 51 +++++++++++++------------ jobs-server-tokio/src/worker/mod.rs | 2 + jobs-server-tokio/src/worker/portmap.rs | 38 +++++++++--------- 13 files changed, 126 insertions(+), 109 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3a152ff..d309766 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ members = [ ] [features] -default = ["jobs-actix", "jobs-server-tokio", "jobs-tokio"] +default = ["jobs-actix", "jobs-server-tokio", "jobs-server-tokio/tokio-zmq", "jobs-tokio"] [dependencies.jobs-actix] version = "0.1" diff --git a/examples/server-jobs-example/.env b/examples/server-jobs-example/.env index a6ece83..ee391cb 100644 --- a/examples/server-jobs-example/.env +++ b/examples/server-jobs-example/.env @@ -1 +1 @@ -RUST_LOG=info +RUST_LOG=server_jobs_example=info diff --git a/jobs-server-tokio/Cargo.toml b/jobs-server-tokio/Cargo.toml index 1b25ad9..99b7887 100644 --- a/jobs-server-tokio/Cargo.toml +++ b/jobs-server-tokio/Cargo.toml @@ -12,9 +12,19 @@ serde = "1.0" serde_json = "1.0" tokio = "0.1" tokio-threadpool = "0.1" -tokio-zmq = "0.6.1" zmq = "0.8" +[features] +default = ["tokio-zmq"] + [dependencies.jobs-core] version = "0.1" path = "../jobs-core" + +[dependencies.tokio-zmq] +version = "0.8" +optional = true + +[dependencies.futures-zmq] +version = "0.3" +optional = true diff --git a/jobs-server-tokio/src/lib.rs b/jobs-server-tokio/src/lib.rs index 3f8bc2a..f7664f9 100644 --- a/jobs-server-tokio/src/lib.rs +++ b/jobs-server-tokio/src/lib.rs @@ -1,8 +1,3 @@ -#[macro_use] -extern crate failure; -#[macro_use] -extern crate log; - use failure::Error; mod server; diff --git a/jobs-server-tokio/src/server/mod.rs b/jobs-server-tokio/src/server/mod.rs index 3975ffc..f259415 100644 --- a/jobs-server-tokio/src/server/mod.rs +++ b/jobs-server-tokio/src/server/mod.rs @@ -4,9 +4,10 @@ use std::{ sync::Arc, }; -use failure::Error; +use failure::{Error, Fail}; use futures::{future::poll_fn, Future}; use jobs_core::Storage; +use log::{error, info}; use tokio_threadpool::blocking; use zmq::Context; diff --git a/jobs-server-tokio/src/server/portmap.rs b/jobs-server-tokio/src/server/portmap.rs index 0bde1f6..bd4845a 100644 --- a/jobs-server-tokio/src/server/portmap.rs +++ b/jobs-server-tokio/src/server/portmap.rs @@ -1,8 +1,12 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use failure::Error; -use futures::{future::lazy, Future, Stream}; +use futures::{Future, Stream}; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Multipart, Rep}; +use log::{error, info}; use tokio::timer::Delay; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Multipart, Rep}; use zmq::Message; @@ -85,21 +89,19 @@ impl ResetPortMapConfig { } fn build(self) -> impl Future { - lazy(|| { - let rep = Rep::builder(self.config.context.clone()) - .bind(&self.address) - .build()?; + Rep::builder(self.config.context.clone()) + .bind(&self.address) + .build() + .map(|rep| { + let config = PortMapConfig { + rep, + address: self.address, + port_map: self.port_map, + config: self.config, + }; - let config = PortMapConfig { - rep, - address: self.address, - port_map: self.port_map, - config: self.config, - }; - - tokio::spawn(config.run()); - - Ok(()) - }) + tokio::spawn(config.run()); + }) + .from_err() } } diff --git a/jobs-server-tokio/src/server/pull.rs b/jobs-server-tokio/src/server/pull.rs index ca3a47c..9add057 100644 --- a/jobs-server-tokio/src/server/pull.rs +++ b/jobs-server-tokio/src/server/pull.rs @@ -1,13 +1,14 @@ use std::{sync::Arc, time::Duration}; -use failure::Error; -use futures::{ - future::{lazy, poll_fn}, - Future, Stream, -}; +use failure::{Error, Fail}; +use futures::{future::poll_fn, Future, Stream}; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Multipart, Pull}; use jobs_core::{JobInfo, Storage}; +use log::{error, info, trace}; use tokio::timer::Delay; use tokio_threadpool::blocking; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Multipart, Pull}; use crate::server::{coerce, Config}; @@ -112,21 +113,19 @@ impl ResetPullConfig { } fn build(self) -> impl Future { - lazy(|| { - let puller = Pull::builder(self.config.context.clone()) - .bind(&self.address) - .build()?; + Pull::builder(self.config.context.clone()) + .bind(&self.address) + .build() + .map(|puller| { + let config = PullConfig { + puller, + address: self.address, + storage: self.storage, + config: self.config, + }; - let config = PullConfig { - puller, - address: self.address, - storage: self.storage, - config: self.config, - }; - - tokio::spawn(config.run()); - - Ok(()) - }) + tokio::spawn(config.run()); + }) + .from_err() } } diff --git a/jobs-server-tokio/src/server/push.rs b/jobs-server-tokio/src/server/push.rs index 79f8d37..bbc6e9a 100644 --- a/jobs-server-tokio/src/server/push.rs +++ b/jobs-server-tokio/src/server/push.rs @@ -1,14 +1,14 @@ use std::{sync::Arc, time::Duration}; use failure::Error; -use futures::{ - future::{lazy, poll_fn}, - stream::iter_ok, - Future, Stream, -}; +use futures::{future::poll_fn, stream::iter_ok, Future, Stream}; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Multipart, Push}; use jobs_core::{JobInfo, Storage}; +use log::{error, info}; use tokio::timer::{Delay, Interval}; use tokio_threadpool::blocking; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Multipart, Push}; use zmq::Message; @@ -136,23 +136,21 @@ impl ResetPushConfig { } fn build(self) -> impl Future { - lazy(|| { - info!("Building and spawning new server"); - let pusher = Push::builder(self.config.context.clone()) - .bind(&self.address) - .build()?; + info!("Building and spawning new server"); + Push::builder(self.config.context.clone()) + .bind(&self.address) + .build() + .map(|pusher| { + let config = PushConfig { + pusher, + address: self.address, + queue: self.queue, + storage: self.storage, + config: self.config, + }; - let config = PushConfig { - pusher, - address: self.address, - queue: self.queue, - storage: self.storage, - config: self.config, - }; - - tokio::spawn(config.run()); - - Ok(()) - }) + tokio::spawn(config.run()); + }) + .from_err() } } diff --git a/jobs-server-tokio/src/server/stalled.rs b/jobs-server-tokio/src/server/stalled.rs index adad3c7..10587c3 100644 --- a/jobs-server-tokio/src/server/stalled.rs +++ b/jobs-server-tokio/src/server/stalled.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use failure::Error; use futures::{future::poll_fn, Future, Stream}; use jobs_core::Storage; +use log::{error, info}; use tokio::timer::{Delay, Interval}; use tokio_threadpool::blocking; diff --git a/jobs-server-tokio/src/spawner.rs b/jobs-server-tokio/src/spawner.rs index 9893f96..e36ce25 100644 --- a/jobs-server-tokio/src/spawner.rs +++ b/jobs-server-tokio/src/spawner.rs @@ -2,7 +2,11 @@ use std::sync::Arc; use failure::Error; use futures::{future::IntoFuture, Future}; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Push}; use jobs_core::JobInfo; +use log::{debug, trace}; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Push}; use zmq::{Context, Message}; diff --git a/jobs-server-tokio/src/worker/config.rs b/jobs-server-tokio/src/worker/config.rs index 601c46a..a15c9d6 100644 --- a/jobs-server-tokio/src/worker/config.rs +++ b/jobs-server-tokio/src/worker/config.rs @@ -1,12 +1,16 @@ use std::{sync::Arc, time::Duration}; -use failure::Error; +use failure::{Error, Fail}; use futures::{ - future::{lazy, Either, IntoFuture}, + future::{Either, IntoFuture}, Future, Stream, }; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Multipart, Pull, Push}; use jobs_core::{JobInfo, Processors}; +use log::{error, info}; use tokio::timer::Delay; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Multipart, Pull, Push}; use zmq::{Context, Message}; @@ -98,29 +102,28 @@ impl ResetWorker { } fn build(self) -> impl Future { - lazy(|| { - let push = Push::builder(self.context.clone()) - .connect(&self.push_address) - .build()?; + Push::builder(self.context.clone()) + .connect(&self.push_address) + .build() + .join( + Pull::builder(self.context.clone()) + .connect(&self.pull_address) + .build(), + ) + .map(|(push, pull)| { + let config = Worker { + push, + pull, + push_address: self.push_address, + pull_address: self.pull_address, + queue: self.queue, + processors: self.processors, + context: self.context, + }; - let pull = Pull::builder(self.context.clone()) - .connect(&self.pull_address) - .build()?; - - let config = Worker { - push, - pull, - push_address: self.push_address, - pull_address: self.pull_address, - queue: self.queue, - processors: self.processors, - context: self.context, - }; - - tokio::spawn(config.run()); - - Ok(()) - }) + tokio::spawn(config.run()); + }) + .from_err() } } diff --git a/jobs-server-tokio/src/worker/mod.rs b/jobs-server-tokio/src/worker/mod.rs index 779cf07..ccbeca0 100644 --- a/jobs-server-tokio/src/worker/mod.rs +++ b/jobs-server-tokio/src/worker/mod.rs @@ -1,7 +1,9 @@ use std::{collections::BTreeMap, sync::Arc}; +use failure::Fail; use futures::Future; use jobs_core::{Processor, Processors}; +use log::{error, info}; use zmq::Context; mod config; diff --git a/jobs-server-tokio/src/worker/portmap.rs b/jobs-server-tokio/src/worker/portmap.rs index f5054e3..7019791 100644 --- a/jobs-server-tokio/src/worker/portmap.rs +++ b/jobs-server-tokio/src/worker/portmap.rs @@ -1,7 +1,10 @@ use std::{collections::BTreeMap, sync::Arc}; -use failure::Error; -use futures::{future::lazy, Future}; +use failure::{Error, Fail}; +use futures::Future; +#[cfg(feature = "futures-zmq")] +use futures_zmq::{prelude::*, Req}; +#[cfg(feature = "tokio-zmq")] use tokio_zmq::{prelude::*, Req}; use zmq::{Context, Message}; @@ -12,24 +15,23 @@ impl PortMap { address: String, context: Arc, ) -> impl Future, Error = Error> { - lazy(move || { - let req = Req::builder(context.clone()).connect(&address).build()?; + Req::builder(context.clone()) + .connect(&address) + .build() + .from_err() + .and_then(|req| { + Message::from_slice(b"h") + .map_err(Error::from) + .map(move |msg| (req, msg.into())) + }) + .and_then(|(req, msg)| req.send(msg).and_then(|req| req.recv()).from_err()) + .and_then(|(mut multipart, _)| { + let msg = multipart.pop_front().ok_or(EmptyMessage)?; - Ok(req) - }) - .and_then(|req| { - Message::from_slice(b"h") - .map_err(Error::from) - .map(move |msg| (req, msg.into())) - }) - .and_then(|(req, msg)| req.send(msg).and_then(|req| req.recv()).from_err()) - .and_then(|(mut multipart, _)| { - let msg = multipart.pop_front().ok_or(EmptyMessage)?; + let map = serde_json::from_slice(&msg)?; - let map = serde_json::from_slice(&msg)?; - - Ok(map) - }) + Ok(map) + }) } }