From fe20c8af144c19733a38444168ac98e0d1601bf2 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 1 Feb 2017 22:25:06 -0800 Subject: [PATCH 1/6] Add a reactor::Core field to SyncClient. This allows the client to drive its own execution, as one would expect. Previously, the reactor had to be driven on a separate thread, which was confusing. This has a couple notable side effects: 1. SyncClient is no longer `Clone`. This is because `reactor::Core` is not `Clone`, and creating one is not infallible (`Core::new` returns a `Result`). 2. SyncClient does not use the user-specified `client::Options::handle` or `client::Options::remote`, because it constructs its own reactor. --- benches/latency.rs | 9 +- examples/pubsub.rs | 2 +- examples/readme_errors.rs | 2 +- examples/readme_sync.rs | 2 +- examples/server_calling_server.rs | 2 +- examples/throughput.rs | 2 +- examples/two_clients.rs | 4 +- src/client.rs | 356 ++++++++++-------------------- src/lib.rs | 7 +- src/macros.rs | 53 +++-- 10 files changed, 170 insertions(+), 269 deletions(-) diff --git a/benches/latency.rs b/benches/latency.rs index 3c4e49b..bbd0964 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -38,8 +38,13 @@ impl FutureService for Server { #[bench] fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); let client = SyncClient::connect(addr, client::Options::default()).unwrap(); - bencher.iter(|| { client.ack().unwrap(); }); + bencher.iter(|| { + client.ack().unwrap(); + }); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e2f9e1e..1ad44db 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -122,7 +122,7 @@ fn main() { .wait() .unwrap(); - let publisher_client = + let mut publisher_client = publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); let subscriber1 = Subscriber::listen(0); diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index c3f0a9c..90306ac 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -51,7 +51,7 @@ impl SyncService for HelloServer { fn main() { let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 80055d4..19f0193 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -31,6 +31,6 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; HelloServer.listen(addr, server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a1ac41c..90414d8 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -86,7 +86,7 @@ fn main() { .wait() .unwrap(); - let double_client = double::SyncClient::connect(double_addr, client::Options::default()) + let mut double_client = double::SyncClient::connect(double_addr, client::Options::default()) .unwrap(); for i in 0..5 { println!("{:?}", double_client.double(i).unwrap()); diff --git a/examples/throughput.rs b/examples/throughput.rs index 0c99c52..e9846f4 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -57,7 +57,7 @@ fn bench_tarpc(target: u64) { server::Options::default()) .wait() .unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 0e56ce4..2b79538 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -68,8 +68,8 @@ fn main() { .wait() .unwrap(); - let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); - let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); + let mut bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); + let mut baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index a1a2b19..954aa82 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,24 +5,11 @@ use {Reactor, WireError}; use bincode::serde::DeserializeError; -use futures::{self, Future}; -use protocol::Proto; #[cfg(feature = "tls")] use self::tls::*; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::io; -use stream_type::StreamType; use tokio_core::reactor; -use tokio_proto::BindClient as ProtoBindClient; -use tokio_proto::multiplex::Multiplex; -use tokio_service::Service; type WireResponse = Result>, DeserializeError>; -type ResponseFuture = futures::Map< as Service>::Future, - fn(WireResponse) -> Result>>; -type BindClient = >> as - ProtoBindClient>::BindClient; /// TLS-specific functionality #[cfg(feature = "tls")] @@ -64,74 +51,6 @@ pub mod tls { } } -/// A client that impls `tokio_service::Service` that writes and reads bytes. -/// -/// Typically, this would be combined with a serialization pre-processing step -/// and a deserialization post-processing step. -#[doc(hidden)] -pub struct Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - inner: BindClient, -} - -impl Clone for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn clone(&self) -> Self { - Client { inner: self.inner.clone() } - } -} - -impl Service for Client - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static -{ - type Request = Req; - type Response = Result>; - type Error = io::Error; - type Future = ResponseFuture; - - fn call(&self, request: Self::Request) -> Self::Future { - self.inner.call(request).map(Self::map_err) - } -} - -impl Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn new(inner: BindClient) -> Self - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - Client { inner: inner } - } - - fn map_err(resp: WireResponse) -> Result> { - resp.map(|r| r.map_err(::Error::from)) - .map_err(::Error::ClientDeserialize) - .and_then(|r| r) - } -} - -impl fmt::Debug for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "Client {{ .. }}") - } -} - /// Additional options to configure how the client connects and operates. #[derive(Default)] pub struct Options { @@ -163,24 +82,92 @@ impl Options { /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use {REMOTE, Reactor}; - use futures::{self, Async, Future, future}; + use {REMOTE, Reactor, WireError}; + use futures::{self, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; + use std::fmt; use std::io; - use std::marker::PhantomData; use std::net::SocketAddr; use stream_type::StreamType; - use super::{Client, Options}; - use tokio_core::net::{TcpStream, TcpStreamNew}; + use super::{Options, WireResponse}; + use tokio_core::net::TcpStream; use tokio_core::reactor; - use tokio_proto::BindClient; - cfg_if! { - if #[cfg(feature = "tls")] { - use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt}; - use super::tls::Context; - use errors::native_to_io; - } else {} + use tokio_proto::BindClient as ProtoBindClient; + use tokio_proto::multiplex::Multiplex; + use tokio_service::Service; + #[cfg(feature = "tls")] + use tokio_tls::TlsConnectorExt; + #[cfg(feature = "tls")] + use errors::native_to_io; + + /// A client that impls `tokio_service::Service` that writes and reads bytes. + #[doc(hidden)] + pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + inner: BindClient, + } + + impl Clone for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn clone(&self) -> Self { + Client { inner: self.inner.clone() } + } + } + + impl Service for Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + type Request = Req; + type Response = Resp; + type Error = ::Error; + type Future = ResponseFuture; + + fn call(&self, request: Self::Request) -> Self::Future { + fn identity(t: T) -> T { t } + self.inner.call(request) + .map(Self::map_err as _) + .map_err(::Error::from as _) + .and_then(identity as _) + } + } + + impl Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn new(inner: BindClient) -> Self + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + Client { inner: inner } + } + + fn map_err(resp: WireResponse) -> Result> { + resp.map(|r| r.map_err(::Error::from)) + .map_err(::Error::ClientDeserialize) + .and_then(|r| r) + } + } + + impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } } /// Types that can connect to a server asynchronously. @@ -192,98 +179,10 @@ pub mod future { fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut; } - type ConnectFutureInner = future::Either, MultiplexConnect>, futures::Flatten< - futures::MapErr>>, - fn(futures::Canceled) -> io::Error>>>; - /// A future that resolves to a `Client` or an `io::Error`. - #[doc(hidden)] - pub struct ConnectFuture - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { - #[cfg(not(feature = "tls"))] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner>, - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner, futures::Map, - fn(::native_tls::Error) -> io::Error>, fn(TlsStream) -> StreamType>>>, - } - - impl Future for ConnectFuture - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - type Item = Client; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - // Ok to unwrap because we ensure the oneshot is always completed. - match Future::poll(&mut self.inner)? { - Async::Ready(client) => Ok(Async::Ready(client)), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - struct MultiplexConnect(reactor::Handle, PhantomData<(Req, Resp, E)>); - - impl MultiplexConnect { - fn new(handle: reactor::Handle) -> Self { - MultiplexConnect(handle, PhantomData) - } - } - - impl FnOnce<(I,)> for MultiplexConnect - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static, - I: Into - { - type Output = Client; - - extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output { - Client::new(Proto::new().bind_client(&self.0, stream.into())) - } - } - - /// Provides the connection Fn impl for Tls - struct ConnectFn { - #[cfg(feature = "tls")] - tls_ctx: Option, - } - - impl FnOnce<(TcpStream,)> for ConnectFn { - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - type Output = future::Either, - futures::Map, - fn(::native_tls::Error) - -> io::Error>, - fn(TlsStream) -> StreamType>>; - #[cfg(not(feature = "tls"))] - type Output = future::FutureResult; - - extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output { - #[cfg(feature = "tls")] - match self.tls_ctx { - None => future::Either::A(future::ok(StreamType::from(tcp))), - Some(tls_ctx) => { - future::Either::B(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, tcp) - .map_err(native_to_io as fn(_) -> _) - .map(StreamType::from as fn(_) -> _)) - } - } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::from(tcp)) - } - } + pub type ConnectFuture = futures::Flatten< + futures::MapErr>>, + fn(futures::Canceled) -> io::Error>>; impl Connect for Client where Req: Serialize + Sync + Send + 'static, @@ -300,42 +199,39 @@ pub mod future { #[cfg(feature = "tls")] let tls_ctx = options.tls_ctx.take(); + let connect = move |handle: &reactor::Handle| { + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .and_then(move |socket| { + #[cfg(feature = "tls")] + match tls_ctx { + Some(tls_ctx) => { + future::Either::A(tls_ctx.tls_connector + .connect_async(&tls_ctx.domain, socket) + .map(StreamType::Tls) + .map_err(native_to_io)) + } + None => future::Either::B(future::ok(StreamType::Tcp(socket))), + } + #[cfg(not(feature = "tls"))] + future::ok(StreamType::Tcp(socket)) + }) + .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) + }; let setup = move |tx: futures::sync::oneshot::Sender<_>| { move |handle: &reactor::Handle| { - let handle2 = handle.clone(); - TcpStream::connect(&addr, handle) - .and_then(move |socket| { - #[cfg(feature = "tls")] - match tls_ctx { - Some(tls_ctx) => { - future::Either::A(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, socket) - .map(StreamType::Tls) - .map_err(native_to_io)) - } - None => future::Either::B(future::ok(StreamType::Tcp(socket))), - } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::Tcp(socket)) - }) - .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) - .then(move |result| { - tx.complete(result); - Ok(()) - }) + connect(handle).then(move |result| { + tx.complete(result); + Ok(()) + }) } }; let rx = match options.reactor { Some(Reactor::Handle(handle)) => { - #[cfg(feature = "tls")] - let connect_fn = ConnectFn { tls_ctx: options.tls_ctx }; - #[cfg(not(feature = "tls"))] - let connect_fn = ConnectFn {}; - let tcp = TcpStream::connect(&addr, &handle) - .and_then(connect_fn) - .map(MultiplexConnect::new(handle)); - return ConnectFuture { inner: future::Either::A(tcp) }; + let (tx, rx) = futures::oneshot(); + handle.spawn(setup(tx)(&handle)); + rx } Some(Reactor::Remote(remote)) => { let (tx, rx) = futures::oneshot(); @@ -351,39 +247,31 @@ pub mod future { fn panic(canceled: futures::Canceled) -> io::Error { unreachable!(canceled) } - ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) } + rx.map_err(panic as _).flatten() } } + + type ResponseFuture = + futures::AndThen as Service>::Future, + fn(WireResponse) -> Result>>, + fn(io::Error) -> ::Error>, + Result>, + fn(Result>) -> Result>>; + type BindClient = + >> + as ProtoBindClient>::BindClient; } /// Exposes a trait for connecting synchronously to servers. pub mod sync { - use client::future::Connect as FutureConnect; - use futures::{Future, future}; - use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; - use super::{Client, Options}; - use util::FirstSocketAddr; + use super::Options; /// Types that can connect to a server synchronously. pub trait Connect: Sized { /// Connects to a server located at the given address. fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs; } - - impl Connect for Client - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - fn connect(addr: A, options: Options) -> Result - where A: ToSocketAddrs - { - let addr = addr.try_first_socket_addr()?; - - // Wrapped in a lazy future to ensure execution occurs when a task is present. - future::lazy(move || ::connect(addr, options)).wait() - } - } } diff --git a/src/lib.rs b/src/lib.rs index da44cec..f3d2750 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! fn main() { //! let addr = "localhost:10000"; //! let _server = HelloServer.listen(addr, server::Options::default()); -//! let client = SyncClient::connect(addr, client::Options::default()).unwrap(); +//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); //! } //! ``` @@ -97,7 +97,7 @@ //! let addr = "localhost:10000"; //! let acceptor = get_acceptor(); //! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor)); -//! let client = SyncClient::connect(addr, +//! let mut client = SyncClient::connect(addr, //! client::Options::default() //! .tls(client::tls::Context::new("foobar.com").unwrap())) //! .unwrap(); @@ -106,8 +106,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, - specialization)] +#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/macros.rs b/src/macros.rs index 2eaeffa..0c281a1 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -529,22 +529,33 @@ macro_rules! service { impl SyncServiceExt for S where S: SyncService {} #[allow(unused)] - #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. - pub struct SyncClient(FutureClient); + pub struct SyncClient { + inner: FutureClient, + reactor: $crate::tokio_core::reactor::Core, + } + + impl ::std::fmt::Debug for SyncClient { + fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner) + } + } impl $crate::client::sync::Connect for SyncClient { - fn connect(addr_: A, opts_: $crate::client::Options) + fn connect(addr_: A, options_: $crate::client::Options) -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; - // Wrapped in a lazy future to ensure execution occurs when a task is present. - let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || { - ::connect(addr_, opts_) - }))?; - let client_ = SyncClient(client_); - ::std::result::Result::Ok(client_) + let mut reactor_ = $crate::tokio_core::reactor::Core::new()?; + let options_ = options_.handle(reactor_.handle()); + let client_ = <$crate::client::future::Client<_, _, _> + as $crate::client::future::Connect>::connect(addr_, options_); + let client_ = reactor_.run(client_)?; + ::std::result::Result::Ok(SyncClient { + inner: FutureClient(client_), + reactor: reactor_, + }) } } @@ -552,20 +563,17 @@ macro_rules! service { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) + pub fn $fn_name(&mut self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - // Wrapped in a lazy future to ensure execution occurs when a task is present. - $crate::futures::Future::wait($crate::futures::future::lazy(move || { - (self.0).$fn_name($($arg),*) - })) + self.reactor.run(self.inner.$fn_name($($arg),*)) } )* } #[allow(non_camel_case_types)] type tarpc_service_Client__ = - $crate::client::Client; @@ -622,7 +630,7 @@ macro_rules! service { $crate::tokio_service::Service::call(&self.0, tarpc_service_req__); $crate::futures::Future::then(tarpc_service_fut__, move |tarpc_service_msg__| { - match tarpc_service_msg__? { + match tarpc_service_msg__ { ::std::result::Result::Ok(tarpc_service_msg__) => { if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = tarpc_service_msg__ @@ -877,7 +885,7 @@ mod functional_test { fn simple() { let _ = env_logger::init(); let (_, client) = start_server_with_sync_client::(Server); - let client = unwrap!(client); + let mut client = unwrap!(client); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -887,7 +895,7 @@ mod functional_test { let _ = env_logger::init(); let (_, client) = start_server_with_sync_client::(Server); - let client = client.expect("Could not connect!"); + let mut client = client.expect("Could not connect!"); match client.foo().err().expect("failed unwrap") { ::Error::ServerDeserialize(_) => {} // good bad => panic!("Expected Error::ServerDeserialize but got {}", bad), @@ -954,16 +962,17 @@ mod functional_test { use util::FirstSocketAddr; use server; use super::FutureServiceExt; - + let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()) + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) .wait() .unwrap(); Server.listen(addr, server::Options::default()) .wait() .unwrap(); } - + #[cfg(feature = "tls")] #[test] fn tcp_and_tls() { @@ -1026,7 +1035,7 @@ mod functional_test { .wait() .unwrap(); - let client = get_sync_client::(addr).unwrap(); + let mut client = get_sync_client::(addr).unwrap(); match client.bar().err().unwrap() { ::Error::App(e) => { assert_eq!(e.description(), "lol jk"); From ed90f4ecea627e43b3b9a4d3479098f4373973e1 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 1 Feb 2017 22:48:49 -0800 Subject: [PATCH 2/6] Merge master into sync-reactor --- src/macros.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index df092e9..e5bb867 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -552,11 +552,11 @@ macro_rules! service { let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; let mut reactor_ = $crate::tokio_core::reactor::Core::new()?; let options_ = options_.handle(reactor_.handle()); - let client_ = <$crate::client::future::Client<_, _, _> - as $crate::client::future::Connect>::connect(addr_, options_); + let client_ = ::connect(addr_, + options_); let client_ = reactor_.run(client_)?; ::std::result::Result::Ok(SyncClient { - inner: FutureClient(client_), + inner: client_, reactor: reactor_, }) } From fa2df184e93ae48ff24786039c378d04a9b46e65 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 7 Feb 2017 20:34:52 -0800 Subject: [PATCH 3/6] Remove unused deps: bytes, take, and scoped-pool --- Cargo.toml | 13 ++++++------- src/lib.rs | 2 -- src/plugins/src/lib.rs | 1 - 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 477290c..19b97d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,21 +14,20 @@ description = "An RPC framework for Rust with a focus on ease of use." bincode = "1.0.0-alpha" byteorder = "1.0" cfg-if = "0.1.0" -bytes = "0.3" futures = "0.1.7" lazy_static = "0.2" log = "0.3" -native-tls = { version = "0.1.1", optional = true } -scoped-pool = "1.0" +net2 = "0.2" serde = "0.9" serde_derive = "0.9" tarpc-plugins = { path = "src/plugins" } -take = "0.1" -tokio-service = "0.1" -tokio-proto = "0.1" tokio-core = "0.1" +tokio-proto = "0.1" +tokio-service = "0.1" + +# Optional dependencies +native-tls = { version = "0.1.1", optional = true } tokio-tls = { version = "0.1", optional = true } -net2 = "0.2" [dev-dependencies] chrono = "0.2" diff --git a/src/lib.rs b/src/lib.rs index da44cec..c72da5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,7 +111,6 @@ #![plugin(tarpc_plugins)] extern crate byteorder; -extern crate bytes; #[macro_use] extern crate lazy_static; #[macro_use] @@ -119,7 +118,6 @@ extern crate log; extern crate net2; #[macro_use] extern crate serde_derive; -extern crate take; #[macro_use] extern crate cfg_if; diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index 95f22be..99a9eb8 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -1,7 +1,6 @@ #![feature(plugin_registrar, rustc_private)] extern crate itertools; -extern crate rustc; extern crate rustc_plugin; extern crate syntax; From 338c91d393d04f0d0e41e519546366b1c96e9291 Mon Sep 17 00:00:00 2001 From: compressed Date: Sun, 12 Feb 2017 17:52:38 -0500 Subject: [PATCH 4/6] fix(bincode): updates to support bincode 1.0.0-alpha2 (#100) Removed: - `Error::ClientDeserialize` variant - `Error::ServerDeserialize` variant - `WireError::ServerDeserialize` variant --- Cargo.toml | 2 +- src/client.rs | 6 +++--- src/errors.rs | 30 ++++++------------------------ src/macros.rs | 18 ++++++------------ src/protocol.rs | 12 ++++++------ src/server.rs | 6 +++--- 6 files changed, 25 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 477290c..1a79256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ readme = "README.md" description = "An RPC framework for Rust with a focus on ease of use." [dependencies] -bincode = "1.0.0-alpha" +bincode = "1.0.0-alpha2" byteorder = "1.0" cfg-if = "0.1.0" bytes = "0.3" diff --git a/src/client.rs b/src/client.rs index a1a2b19..f7689c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {Reactor, WireError}; -use bincode::serde::DeserializeError; +use bincode; use futures::{self, Future}; use protocol::Proto; #[cfg(feature = "tls")] @@ -18,7 +18,7 @@ use tokio_proto::BindClient as ProtoBindClient; use tokio_proto::multiplex::Multiplex; use tokio_service::Service; -type WireResponse = Result>, DeserializeError>; +type WireResponse = Result>, bincode::Error>; type ResponseFuture = futures::Map< as Service>::Future, fn(WireResponse) -> Result>>; type BindClient = >> as @@ -117,7 +117,7 @@ impl Client fn map_err(resp: WireResponse) -> Result> { resp.map(|r| r.map_err(::Error::from)) - .map_err(::Error::ClientDeserialize) + .map_err(::Error::ClientSerialize) .and_then(|r| r) } } diff --git a/src/errors.rs b/src/errors.rs index 3bb4189..cc172d2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,7 +3,6 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; @@ -13,23 +12,15 @@ use std::error::Error as StdError; pub enum Error { /// Any IO error. Io(io::Error), - /// Error in deserializing a server response. + /// Error serializing the client request or deserializing the server response. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ClientDeserialize(bincode::serde::DeserializeError), - /// Error in serializing a client request. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. - ClientSerialize(bincode::serde::SerializeError), - /// Error in deserializing a client request. + ClientSerialize(::bincode::Error), + /// Error serializing the server response or deserializing the client request. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ServerDeserialize(String), - /// Error in serializing a server response. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. /// @@ -41,9 +32,7 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), - Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::App(ref e) => fmt::Display::fmt(e, f), Error::Io(ref e) => fmt::Display::fmt(e, f), @@ -54,10 +43,8 @@ impl fmt::Display for Er impl StdError for Error { fn description(&self) -> &str { match *self { - Error::ClientDeserialize(_) => "The client failed to deserialize the server response.", - Error::ClientSerialize(_) => "The client failed to serialize the request.", - Error::ServerDeserialize(_) => "The server failed to deserialize the request.", - Error::ServerSerialize(_) => "The server failed to serialize the response.", + Error::ClientSerialize(_) => "The client failed to serialize the request or deserialize the response.", + Error::ServerSerialize(_) => "The server failed to serialize the response or deserialize the request.", Error::App(ref e) => e.description(), Error::Io(ref e) => e.description(), } @@ -65,9 +52,7 @@ impl StdError for Error< fn cause(&self) -> Option<&StdError> { match *self { - Error::ClientDeserialize(ref e) => e.cause(), Error::ClientSerialize(ref e) => e.cause(), - Error::ServerDeserialize(_) | Error::ServerSerialize(_) | Error::App(_) => None, Error::Io(ref e) => e.cause(), @@ -84,7 +69,6 @@ impl From for Error { impl From> for Error { fn from(err: WireError) -> Self { match err { - WireError::ServerDeserialize(s) => Error::ServerDeserialize(s), WireError::ServerSerialize(s) => Error::ServerSerialize(s), WireError::App(e) => Error::App(e), } @@ -95,9 +79,7 @@ impl From> for Error { #[doc(hidden)] #[derive(Deserialize, Serialize, Clone, Debug)] pub enum WireError { - /// Error in deserializing a client request. - ServerDeserialize(String), - /// Error in serializing server response. + /// Error in serializing the server response or deserializing the client request. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. App(E), diff --git a/src/macros.rs b/src/macros.rs index b5e9edd..7233269 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -408,7 +408,7 @@ macro_rules! service { where tarpc_service_S__: FutureService { type Request = ::std::result::Result; + $crate::bincode::Error>; type Response = $crate::server::Response; type Error = ::std::io::Error; @@ -421,7 +421,7 @@ macro_rules! service { return tarpc_service_FutureReply__::DeserializeError( $crate::futures::finished( ::std::result::Result::Err( - $crate::WireError::ServerDeserialize( + $crate::WireError::ServerSerialize( ::std::string::ToString::to_string( &tarpc_service_deserialize_err__))))); } @@ -667,15 +667,9 @@ macro_rules! service { unreachable!() } } - $crate::Error::ServerDeserialize(tarpc_service_err__) => { - $crate::Error::ServerDeserialize(tarpc_service_err__) - } $crate::Error::ServerSerialize(tarpc_service_err__) => { $crate::Error::ServerSerialize(tarpc_service_err__) } - $crate::Error::ClientDeserialize(tarpc_service_err__) => { - $crate::Error::ClientDeserialize(tarpc_service_err__) - } $crate::Error::ClientSerialize(tarpc_service_err__) => { $crate::Error::ClientSerialize(tarpc_service_err__) } @@ -913,8 +907,8 @@ mod functional_test { Server>(Server); let client = client.expect("Could not connect!"); match client.foo().err().expect("failed unwrap") { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!("Expected Error::ServerDeserialize but got {}", bad), + ::Error::ServerSerialize(_) => {} // good + bad => panic!("Expected Error::ServerSerialize but got {}", bad), } } } @@ -968,8 +962,8 @@ mod functional_test { start_server_with_async_client::(Server); match client.foo().wait().err().unwrap() { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), + ::Error::ServerSerialize(_) => {} // good + bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad), } } diff --git a/src/protocol.rs b/src/protocol.rs index b973b38..7d38df8 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {serde, tokio_core}; -use bincode::{SizeLimit, serde as bincode}; +use bincode::{self, SizeLimit}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{self, Cursor}; use std::marker::PhantomData; @@ -40,7 +40,7 @@ impl tokio_core::io::Codec for Codec Decode: serde::Deserialize { type Out = (RequestId, Encode); - type In = (RequestId, Result); + type In = (RequestId, Result); fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); @@ -121,7 +121,7 @@ impl ServerProto for Proto Decode: serde::Deserialize + 'static { type Response = Encode; - type Request = Result; + type Request = Result; type Transport = Framed>; type BindTransport = Result; @@ -135,7 +135,7 @@ impl ClientProto for Proto Encode: serde::Serialize + 'static, Decode: serde::Deserialize + 'static { - type Response = Result; + type Response = Result; type Request = Encode; type Transport = Framed>; type BindTransport = Result; @@ -158,8 +158,8 @@ fn serialize() { let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Result)>, - io::Error> = codec.decode(&mut buf); + let actual: Result)>, io::Error> = + codec.decode(&mut buf); match actual { Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} diff --git a/src/server.rs b/src/server.rs index 2739c46..e8314de 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {REMOTE, Reactor}; -use bincode::serde::DeserializeError; +use bincode; use errors::WireError; use futures::{self, Async, Future, Stream, future}; use net2; @@ -67,7 +67,7 @@ pub type Response = Result>; #[doc(hidden)] pub fn listen(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture - where S: NewService, + where S: NewService, Response = Response, Error = io::Error> + Send + 'static, Req: Deserialize + 'static, @@ -116,7 +116,7 @@ fn listen_with(new_service: S, handle: Handle, _acceptor: Acceptor) -> io::Result - where S: NewService, + where S: NewService, Response = Response, Error = io::Error> + Send + 'static, Req: Deserialize + 'static, From acdf03c8cadc3861761aab861d595fd4fda7051f Mon Sep 17 00:00:00 2001 From: Cyril Plisko Date: Wed, 15 Feb 2017 19:26:57 +0200 Subject: [PATCH 5/6] Fix compilation error for benches (#101) --- benches/latency.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/latency.rs b/benches/latency.rs index bbd0964..fd877cf 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -42,7 +42,7 @@ fn latency(bencher: &mut Bencher) { server::Options::default()) .wait() .unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); bencher.iter(|| { client.ack().unwrap(); From 2c09a3570548e7acae070c8565c0bd1dfcfeb3af Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 15 Feb 2017 23:47:35 -0800 Subject: [PATCH 6/6] Remove the `Send` bound from `FutureService` (#96) * Make a reactor handle mandatory for server. This removes the Send bound from FutureService. The Send bound is still required for SyncService, since clones are sent to new threads for each request. (This is more fodder for the argument that there should be a distinct Options struct for each combination of async/sync and client/server.) This commit also makes FutureService::listen return an io::Result rather than a Future; the future was never really necessary and had the unintended consequence of making SyncService::listen deadlock when the options specified a handle (because that means the reactor driving the service lives on the same thread that SyncService is waiting on). `SyncClient` is no longer `Clone` because it needs to create a new `reactor::Core` when cloning. Tokio Clients are `Clone` but they don't allow moving the cloned client onto a new reactor. * Change pubsub to use Rc> instead of Arc>. This is possible since services no longer need to be Send. * Remove some unnecessary unstable features. There 3 remaining unstable features. The hardest to remove is plugin, because we rely on compiler plugins to rewrite types from snake case to camel. It's possible this can be removed before the proc macros rewrite lands if impl Trait is extended to work with traits. * Clean up example * Sync servers now spawn a reactor on a thread. It's decided that sync users should not have to know about tokio at all. * Don't allow specifying a reactor::Core on client options. * Fail fast in server::listen if local_addr() returns Err. --- README.md | 34 +-- benches/latency.rs | 16 +- examples/concurrency.rs | 10 +- examples/pubsub.rs | 80 +++---- examples/readme_errors.rs | 8 +- examples/readme_futures.rs | 14 +- examples/readme_sync.rs | 10 +- examples/server_calling_server.rs | 47 ++-- examples/throughput.rs | 8 +- examples/two_clients.rs | 35 ++- src/client.rs | 135 +++++++---- src/errors.rs | 8 +- src/lib.rs | 15 +- src/macros.rs | 364 ++++++++++++++++++------------ src/protocol.rs | 2 - src/server.rs | 58 +---- src/util.rs | 15 -- 17 files changed, 482 insertions(+), 377 deletions(-) diff --git a/README.md b/README.md index da7b06c..0604e41 100644 --- a/README.md +++ b/README.md @@ -50,10 +50,12 @@ tarpc-plugins = { git = "https://github.com/google/tarpc" } extern crate futures; #[macro_use] extern crate tarpc; +extern crate tokio_core; use tarpc::{client, server}; use tarpc::client::sync::Connect; -use tarpc::util::Never; +use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; service! { rpc hello(name: String) -> String; @@ -69,9 +71,11 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr, server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let reactor = reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } ``` @@ -101,7 +105,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -121,9 +125,10 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); - HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::from(core.handle())) + .unwrap(); let options = client::Options::default().handle(core.handle()); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) @@ -169,7 +174,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; use tarpc::native_tls::{Pkcs12, TlsAcceptor}; @@ -196,14 +201,15 @@ fn get_acceptor() -> TlsAcceptor { } fn main() { - let addr = "localhost:10000".first_socket_addr(); let mut core = reactor::Core::new().unwrap(); let acceptor = get_acceptor(); - HelloServer.listen(addr, server::Options::default() - .handle(core.handle()) - .tls(acceptor)).wait().unwrap(); - let options = client::Options::default().handle(core.handle() - .tls(client::tls::Context::new("foobar.com").unwrap())); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::from(core.handle()) + .tls(acceptor)) + .unwrap(); + let options = client::Options::default() + .handle(core.handle()) + .tls(client::tls::Context::new("foobar.com").unwrap())); core.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) diff --git a/benches/latency.rs b/benches/latency.rs index fd877cf..768bd3d 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -12,13 +12,14 @@ extern crate tarpc; extern crate test; extern crate env_logger; extern crate futures; +extern crate tokio_core; -use futures::Future; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; #[cfg(test)] use test::Bencher; +use tokio_core::reactor; service! { rpc ack(); @@ -38,13 +39,12 @@ impl FutureService for Server { #[bench] fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); + let mut reactor = reactor::Core::new().unwrap(); let addr = Server.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() + &reactor.handle(), + server::Options::default()) .unwrap(); - let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let client = reactor.run(FutureClient::connect(addr, client::Options::default())).unwrap(); - bencher.iter(|| { - client.ack().unwrap(); - }); + bencher.iter(|| reactor.run(client.ack()).unwrap()); } diff --git a/examples/concurrency.rs b/examples/concurrency.rs index 7cfb84f..33b6307 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, mpsc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -167,10 +167,11 @@ fn main() { .map(Result::unwrap) .unwrap_or(4); + let mut reactor = reactor::Core::new().unwrap(); let addr = Server::new() .listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); info!("Server listening on {}.", addr); @@ -190,8 +191,5 @@ fn main() { info!("Starting..."); - // The driver of the main future. - let mut core = reactor::Core::new().unwrap(); - - core.run(run).unwrap(); + reactor.run(run).unwrap(); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 1ad44db..d7dec88 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -10,20 +10,21 @@ extern crate env_logger; extern crate futures; #[macro_use] extern crate tarpc; -extern crate tokio_proto as tokio; +extern crate tokio_core; -use futures::{BoxFuture, Future}; +use futures::{Future, future}; use publisher::FutureServiceExt as PublisherExt; +use std::cell::RefCell; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::rc::Rc; use std::thread; use std::time::Duration; use subscriber::FutureServiceExt as SubscriberExt; use tarpc::{client, server}; -use tarpc::client::future::Connect as Fc; -use tarpc::client::sync::Connect as Sc; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Message, Never}; +use tokio_core::reactor; pub mod subscriber { service! { @@ -57,39 +58,39 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn listen(id: u32) -> SocketAddr { + fn listen(id: u32, handle: &reactor::Handle, options: server::Options) -> SocketAddr { Subscriber { id: id } - .listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() + .listen("localhost:0".first_socket_addr(), handle, options) .unwrap() } } #[derive(Clone, Debug)] struct Publisher { - clients: Arc>>, + clients: Rc>>, } impl Publisher { fn new() -> Publisher { - Publisher { clients: Arc::new(Mutex::new(HashMap::new())) } + Publisher { clients: Rc::new(RefCell::new(HashMap::new())) } } } impl publisher::FutureService for Publisher { - type BroadcastFut = BoxFuture<(), Never>; + type BroadcastFut = Box>; fn broadcast(&self, message: String) -> Self::BroadcastFut { - futures::collect(self.clients - .lock() - .unwrap() - .values_mut() - // Ignore failing subscribers. - .map(move |client| client.receive(message.clone()).then(|_| Ok(()))) - .collect::>()) - .map(|_| ()) - .boxed() + let acks = self.clients + .borrow() + .values() + .map(move |client| client.receive(message.clone()) + // Ignore failing subscribers. In a real pubsub, + // you'd want to continually retry until subscribers + // ack. + .then(|_| Ok(()))) + // Collect to a vec to end the borrow on `self.clients`. + .collect::>(); + Box::new(future::join_all(acks).map(|_| ())) } type SubscribeFut = Box>; @@ -99,42 +100,45 @@ impl publisher::FutureService for Publisher { Box::new(subscriber::FutureClient::connect(address, client::Options::default()) .map(move |subscriber| { println!("Subscribing {}.", id); - clients.lock().unwrap().insert(id, subscriber); + clients.borrow_mut().insert(id, subscriber); () }) .map_err(|e| e.to_string().into())) } - type UnsubscribeFut = BoxFuture<(), Never>; + type UnsubscribeFut = Box>; fn unsubscribe(&self, id: u32) -> Self::UnsubscribeFut { println!("Unsubscribing {}", id); - self.clients.lock().unwrap().remove(&id).unwrap(); + self.clients.borrow_mut().remove(&id).unwrap(); futures::finished(()).boxed() } } fn main() { let _ = env_logger::init(); + let mut reactor = reactor::Core::new().unwrap(); let publisher_addr = Publisher::new() .listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); - let mut publisher_client = - publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); + let subscriber1 = Subscriber::listen(0, &reactor.handle(), server::Options::default()); + let subscriber2 = Subscriber::listen(1, &reactor.handle(), server::Options::default()); - let subscriber1 = Subscriber::listen(0); - publisher_client.subscribe(0, subscriber1).unwrap(); - - let subscriber2 = Subscriber::listen(1); - publisher_client.subscribe(1, subscriber2).unwrap(); - - - println!("Broadcasting..."); - publisher_client.broadcast("hello to all".to_string()).unwrap(); - publisher_client.unsubscribe(1).unwrap(); - publisher_client.broadcast("hello again".to_string()).unwrap(); + let publisher = + reactor.run(publisher::FutureClient::connect(publisher_addr, client::Options::default())) + .unwrap(); + reactor.run(publisher.subscribe(0, subscriber1) + .and_then(|_| publisher.subscribe(1, subscriber2)) + .map_err(|e| panic!(e)) + .and_then(|_| { + println!("Broadcasting..."); + publisher.broadcast("hello to all".to_string()) + }) + .and_then(|_| publisher.unsubscribe(1)) + .and_then(|_| publisher.broadcast("hi again".to_string()))) + .unwrap(); thread::sleep(Duration::from_millis(300)); } diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index 90306ac..60c340a 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -11,11 +11,13 @@ extern crate futures; extern crate tarpc; #[macro_use] extern crate serde_derive; +extern crate tokio_core; use std::error::Error; use std::fmt; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; +use tarpc::util::FirstSocketAddr; service! { rpc hello(name: String) -> String | NoNameGiven; @@ -50,7 +52,9 @@ impl SyncService for HelloServer { } fn main() { - let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + server::Options::default()) + .unwrap(); let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); diff --git a/examples/readme_futures.rs b/examples/readme_futures.rs index 7bf66d2..b5aadb2 100644 --- a/examples/readme_futures.rs +++ b/examples/readme_futures.rs @@ -13,7 +13,7 @@ extern crate tokio_core; use futures::Future; use tarpc::{client, server}; -use tarpc::client::future::Connect; +use tarpc::client::future::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; use tokio_core::reactor; @@ -33,11 +33,13 @@ impl FutureService for HelloServer { } fn main() { - let addr = "localhost:10000".first_socket_addr(); - let mut core = reactor::Core::new().unwrap(); - HelloServer.listen(addr, server::Options::default().handle(core.handle())).wait().unwrap(); - let options = client::Options::default().handle(core.handle()); - core.run(FutureClient::connect(addr, options) + let mut reactor = reactor::Core::new().unwrap(); + let addr = HelloServer.listen("localhost:10000".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + let options = client::Options::default().handle(reactor.handle()); + reactor.run(FutureClient::connect(addr, options) .map_err(tarpc::Error::from) .and_then(|client| client.hello("Mom".to_string())) .map(|resp| println!("{}", resp))) diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 19f0193..e2097f6 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -10,10 +10,11 @@ extern crate futures; #[macro_use] extern crate tarpc; +extern crate tokio_core; use tarpc::{client, server}; -use tarpc::client::sync::Connect; -use tarpc::util::Never; +use tarpc::client::sync::ClientExt; +use tarpc::util::{FirstSocketAddr, Never}; service! { rpc hello(name: String) -> String; @@ -29,8 +30,9 @@ impl SyncService for HelloServer { } fn main() { - let addr = "localhost:10000"; - HelloServer.listen(addr, server::Options::default()).unwrap(); + let addr = HelloServer.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .unwrap(); let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index 90414d8..689503f 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -10,15 +10,15 @@ extern crate env_logger; #[macro_use] extern crate tarpc; extern crate futures; +extern crate tokio_core; use add::{FutureService as AddFutureService, FutureServiceExt as AddExt}; use double::{FutureService as DoubleFutureService, FutureServiceExt as DoubleExt}; -use futures::{BoxFuture, Future}; -use std::sync::{Arc, Mutex}; +use futures::{BoxFuture, Future, Stream}; use tarpc::{client, server}; -use tarpc::client::future::Connect as Fc; -use tarpc::client::sync::Connect as Sc; +use tarpc::client::future::ClientExt as Fc; use tarpc::util::{FirstSocketAddr, Message, Never}; +use tokio_core::reactor; pub mod add { service! { @@ -49,12 +49,12 @@ impl AddFutureService for AddServer { #[derive(Clone)] struct DoubleServer { - client: Arc>, + client: add::FutureClient, } impl DoubleServer { fn new(client: add::FutureClient) -> Self { - DoubleServer { client: Arc::new(Mutex::new(client)) } + DoubleServer { client: client } } } @@ -63,8 +63,6 @@ impl DoubleFutureService for DoubleServer { fn double(&self, x: i32) -> Self::DoubleFut { self.client - .lock() - .unwrap() .add(x, x) .map_err(|e| e.to_string().into()) .boxed() @@ -73,22 +71,29 @@ impl DoubleFutureService for DoubleServer { fn main() { let _ = env_logger::init(); + let mut reactor = reactor::Core::new().unwrap(); let add_addr = AddServer.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() - .unwrap(); - let add_client = - add::FutureClient::connect(add_addr, client::Options::default()).wait().unwrap(); - - let double = DoubleServer::new(add_client); - let double_addr = double.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() .unwrap(); - let mut double_client = double::SyncClient::connect(double_addr, client::Options::default()) + let options = client::Options::default().handle(reactor.handle()); + let add_client = reactor.run(add::FutureClient::connect(add_addr, options)).unwrap(); + + let double_addr = DoubleServer::new(add_client) + .listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + + let double_client = + reactor.run(double::FutureClient::connect(double_addr, client::Options::default())) + .unwrap(); + reactor.run(futures::stream::futures_unordered((0..5).map(|i| double_client.double(i))) + .map_err(|e| println!("{}", e)) + .for_each(|i| { + println!("{:?}", i); + Ok(()) + })) .unwrap(); - for i in 0..5 { - println!("{:?}", double_client.double(i).unwrap()); - } } diff --git a/examples/throughput.rs b/examples/throughput.rs index e9846f4..3846d45 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -12,16 +12,17 @@ extern crate lazy_static; extern crate tarpc; extern crate env_logger; extern crate futures; +extern crate tokio_core; -use futures::Future; use std::io::{Read, Write, stdout}; use std::net; use std::sync::Arc; use std::thread; use std::time; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; lazy_static! { static ref BUF: Arc> = Arc::new(gen_vec(CHUNK_SIZE as usize)); @@ -53,9 +54,10 @@ impl FutureService for Server { const CHUNK_SIZE: u32 = 1 << 19; fn bench_tarpc(target: u64) { + let reactor = reactor::Core::new().unwrap(); let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 2b79538..2f4166f 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -13,13 +13,14 @@ extern crate tarpc; extern crate bincode; extern crate env_logger; extern crate futures; +extern crate tokio_core; use bar::FutureServiceExt as BarExt; use baz::FutureServiceExt as BazExt; -use futures::Future; use tarpc::{client, server}; -use tarpc::client::sync::Connect; +use tarpc::client::sync::ClientExt; use tarpc::util::{FirstSocketAddr, Never}; +use tokio_core::reactor; mod bar { service! { @@ -59,17 +60,27 @@ macro_rules! pos { fn main() { let _ = env_logger::init(); - let bar_addr = Bar.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() - .unwrap(); - let baz_addr = Baz.listen("localhost:0".first_socket_addr(), - server::Options::default()) - .wait() - .unwrap(); + let mut bar_client = { + let reactor = reactor::Core::new().unwrap(); + let addr = Bar.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + // TODO: Need to set up each client with its own reactor. Should it be shareable across + // multiple clients? e.g. Rc> or something similar? + bar::SyncClient::connect(addr, client::Options::default()).unwrap() + }; + + let mut baz_client = { + // Need to set up each client with its own reactor. + let reactor = reactor::Core::new().unwrap(); + let addr = Baz.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server::Options::default()) + .unwrap(); + baz::SyncClient::connect(addr, client::Options::default()).unwrap() + }; - let mut bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); - let mut baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index ecdb66d..00543c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,10 +3,9 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {Reactor, WireError}; -use bincode; #[cfg(feature = "tls")] use self::tls::*; +use {WireError, bincode}; use tokio_core::reactor; type WireResponse = Result>, bincode::Error>; @@ -60,13 +59,13 @@ pub struct Options { } impl Options { - /// Connect using the given reactor handle. + /// Drive using the given reactor handle. Only used by `FutureClient`s. pub fn handle(mut self, handle: reactor::Handle) -> Self { self.reactor = Some(Reactor::Handle(handle)); self } - /// Connect using the given reactor remote. + /// Drive using the given reactor remote. Only used by `FutureClient`s. pub fn remote(mut self, remote: reactor::Remote) -> Self { self.reactor = Some(Reactor::Remote(remote)); self @@ -80,9 +79,17 @@ impl Options { } } +enum Reactor { + Handle(reactor::Handle), + Remote(reactor::Remote), +} + /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use {REMOTE, Reactor, WireError}; + use super::{Options, Reactor, WireResponse}; + use {REMOTE, WireError}; + #[cfg(feature = "tls")] + use errors::native_to_io; use futures::{self, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; @@ -90,7 +97,6 @@ pub mod future { use std::io; use std::net::SocketAddr; use stream_type::StreamType; - use super::{Options, WireResponse}; use tokio_core::net::TcpStream; use tokio_core::reactor; use tokio_proto::BindClient as ProtoBindClient; @@ -98,10 +104,7 @@ pub mod future { use tokio_service::Service; #[cfg(feature = "tls")] use tokio_tls::TlsConnectorExt; - #[cfg(feature = "tls")] - use errors::native_to_io; - /// A client that impls `tokio_service::Service` that writes and reads bytes. #[doc(hidden)] pub struct Client where Req: Serialize + 'static, @@ -128,12 +131,15 @@ pub mod future { { type Request = Req; type Response = Resp; - type Error = ::Error; + type Error = ::Error; type Future = ResponseFuture; fn call(&self, request: Self::Request) -> Self::Future { - fn identity(t: T) -> T { t } - self.inner.call(request) + fn identity(t: T) -> T { + t + } + self.inner + .call(request) .map(Self::map_err as _) .map_err(::Error::from as _) .and_then(identity as _) @@ -170,8 +176,8 @@ pub mod future { } } - /// Types that can connect to a server asynchronously. - pub trait Connect: Sized { + /// Extension methods for clients. + pub trait ClientExt: Sized { /// The type of the future returned when calling `connect`. type ConnectFut: Future; @@ -180,11 +186,11 @@ pub mod future { } /// A future that resolves to a `Client` or an `io::Error`. - pub type ConnectFuture = futures::Flatten< - futures::MapErr>>, - fn(futures::Canceled) -> io::Error>>; + pub type ConnectFuture = + futures::Flatten>>, + fn(futures::Canceled) -> io::Error>>; - impl Connect for Client + impl ClientExt for Client where Req: Serialize + Sync + Send + 'static, Resp: Deserialize + Sync + Send + 'static, E: Deserialize + Sync + Send + 'static @@ -218,32 +224,25 @@ pub mod future { }) .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) }; - let setup = move |tx: futures::sync::oneshot::Sender<_>| { - move |handle: &reactor::Handle| { - connect(handle).then(move |result| { - tx.complete(result); - Ok(()) - }) - } + let (tx, rx) = futures::oneshot(); + let setup = move |handle: &reactor::Handle| { + connect(handle).then(move |result| { + tx.complete(result); + Ok(()) + }) }; - let rx = match options.reactor { + match options.reactor { Some(Reactor::Handle(handle)) => { - let (tx, rx) = futures::oneshot(); - handle.spawn(setup(tx)(&handle)); - rx + handle.spawn(setup(&handle)); } Some(Reactor::Remote(remote)) => { - let (tx, rx) = futures::oneshot(); - remote.spawn(setup(tx)); - rx + remote.spawn(setup); } None => { - let (tx, rx) = futures::oneshot(); - REMOTE.spawn(setup(tx)); - rx + REMOTE.spawn(setup); } - }; + } fn panic(canceled: futures::Canceled) -> io::Error { unreachable!(canceled) } @@ -265,13 +264,69 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { + use super::Options; + use super::Reactor; + use super::future::{Client as FutureClient, ClientExt as FutureClientExt}; + use serde::{Deserialize, Serialize}; + use std::fmt; use std::io; use std::net::ToSocketAddrs; - use super::Options; + use tokio_core::reactor; + use tokio_service::Service; + use util::FirstSocketAddr; - /// Types that can connect to a server synchronously. - pub trait Connect: Sized { + #[doc(hidden)] + pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + inner: FutureClient, + reactor: reactor::Core, + } + + impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } + } + + impl Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + /// Drives an RPC call for the given request. + pub fn call(&mut self, request: Req) -> Result> { + self.reactor.run(self.inner.call(request)) + } + } + + /// Extension methods for Clients. + pub trait ClientExt: Sized { /// Connects to a server located at the given address. - fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs; + fn connect(addr: A, options: Options) -> io::Result where A: ToSocketAddrs; + } + + impl ClientExt for Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + fn connect(addr: A, mut options: Options) -> io::Result + where A: ToSocketAddrs + { + let mut reactor = reactor::Core::new()?; + let addr = addr.try_first_socket_addr()?; + options.reactor = Some(Reactor::Handle(reactor.handle())); + Ok(Client { + inner: reactor.run(FutureClient::connect(addr, options))?, + reactor: reactor, + }) + } } } diff --git a/src/errors.rs b/src/errors.rs index cc172d2..1916f74 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -43,8 +43,12 @@ impl fmt::Display for Er impl StdError for Error { fn description(&self) -> &str { match *self { - Error::ClientSerialize(_) => "The client failed to serialize the request or deserialize the response.", - Error::ServerSerialize(_) => "The server failed to serialize the response or deserialize the request.", + Error::ClientSerialize(_) => { + "The client failed to serialize the request or deserialize the response." + } + Error::ServerSerialize(_) => { + "The server failed to serialize the response or deserialize the request." + } Error::App(ref e) => e.description(), Error::Io(ref e) => e.description(), } diff --git a/src/lib.rs b/src/lib.rs index ee42ff7..4c8b479 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,10 +33,12 @@ //! //! #[macro_use] //! extern crate tarpc; +//! extern crate tokio_core; //! //! use tarpc::{client, server}; -//! use tarpc::client::sync::Connect; +//! use tarpc::client::sync::ClientExt; //! use tarpc::util::Never; +//! use tokio_core::reactor; //! //! service! { //! rpc hello(name: String) -> String; @@ -53,6 +55,7 @@ //! //! fn main() { //! let addr = "localhost:10000"; +//! let reactor = reactor::Core::new().unwrap(); //! let _server = HelloServer.listen(addr, server::Options::default()); //! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); @@ -70,7 +73,7 @@ //! extern crate tarpc; //! //! use tarpc::{client, server}; -//! use tarpc::client::sync::Connect; +//! use tarpc::client::sync::ClientExt; //! use tarpc::util::Never; //! use tarpc::native_tls::{TlsAcceptor, Pkcs12}; //! @@ -106,7 +109,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, specialization)] +#![feature(plugin, never_type, struct_field_attributes)] #![plugin(tarpc_plugins)] extern crate byteorder; @@ -180,12 +183,6 @@ fn spawn_core() -> reactor::Remote { rx.recv().unwrap() } -#[derive(Clone)] -enum Reactor { - Handle(reactor::Handle), - Remote(reactor::Remote), -} - cfg_if! { if #[cfg(feature = "tls")] { extern crate tokio_tls; diff --git a/src/macros.rs b/src/macros.rs index 3a5f8b9..aacedc9 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -109,7 +109,9 @@ macro_rules! impl_deserialize { impl $crate::serde::de::Visitor for Visitor { type Value = $impler; - fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + fn expecting(&self, formatter: &mut ::std::fmt::Formatter) + -> ::std::fmt::Result + { formatter.write_str("an enum variant") } @@ -318,11 +320,10 @@ macro_rules! service { impl_deserialize!(tarpc_service_Error__, NotIrrefutable(()) $($fn_name($error))*); impl_serialize!(tarpc_service_Error__, {}, NotIrrefutable(()) $($fn_name($error))*); -/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, +/// Defines the `Future` RPC service. Implementors must be `Clone` and `'static`, /// as required by `tokio_proto::NewService`. This is required so that the service can be used /// to respond to multiple requests concurrently. pub trait FutureService: - ::std::marker::Send + ::std::clone::Clone + 'static { @@ -345,13 +346,14 @@ macro_rules! service { /// the default tokio `Loop`. fn listen(self, addr: ::std::net::SocketAddr, + handle: &$crate::tokio_core::reactor::Handle, options: $crate::server::Options) - -> $crate::server::ListenFuture + -> ::std::io::Result<::std::net::SocketAddr> { - return $crate::server::listen( - move || Ok(tarpc_service_AsyncServer__(self.clone())), - addr, - options); + return $crate::server::listen(move || Ok(tarpc_service_AsyncServer__(self.clone())), + addr, + handle, + options); #[allow(non_camel_case_types)] #[derive(Clone)] @@ -478,9 +480,9 @@ macro_rules! service { pub trait SyncServiceExt: SyncService { /// Spawns the service, binding to the given address and running on /// the default tokio `Loop`. - fn listen(self, addr: L, options: $crate::server::Options) + fn listen(self, addr: A, options: $crate::server::Options) -> ::std::io::Result<::std::net::SocketAddr> - where L: ::std::net::ToSocketAddrs + where A: ::std::net::ToSocketAddrs { let tarpc_service__ = SyncServer__ { service: self, @@ -489,10 +491,26 @@ macro_rules! service { let tarpc_service_addr__ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - // Wrapped in a lazy future to ensure execution occurs when a task is present. - return $crate::futures::Future::wait($crate::futures::future::lazy(move || { - FutureServiceExt::listen(tarpc_service__, tarpc_service_addr__, options) - })); + let (tx_, rx_) = ::std::sync::mpsc::channel(); + + ::std::thread::spawn(move || { + match $crate::tokio_core::reactor::Core::new() { + ::std::result::Result::Ok(mut reactor_) => { + let addr_ = FutureServiceExt::listen(tarpc_service__, + tarpc_service_addr__, + &reactor_.handle(), + options); + tx_.send(addr_).unwrap(); + loop { + reactor_.turn(::std::option::Option::None); + } + } + ::std::result::Result::Err(error_) => { + tx_.send(Err(error_)).unwrap(); + } + } + }); + return rx_.recv().unwrap(); #[derive(Clone)] struct SyncServer__ { @@ -545,8 +563,7 @@ macro_rules! service { #[allow(unused)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. pub struct SyncClient { - inner: FutureClient, - reactor: $crate::tokio_core::reactor::Core, + inner: tarpc_service_SyncClient__, } impl ::std::fmt::Debug for SyncClient { @@ -555,20 +572,14 @@ macro_rules! service { } } - impl $crate::client::sync::Connect for SyncClient { - fn connect(addr_: A, options_: $crate::client::Options) - -> ::std::result::Result + impl $crate::client::sync::ClientExt for SyncClient { + fn connect(addr_: A, options_: $crate::client::Options) -> ::std::io::Result where A: ::std::net::ToSocketAddrs, { - let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; - let mut reactor_ = $crate::tokio_core::reactor::Core::new()?; - let options_ = options_.handle(reactor_.handle()); - let client_ = ::connect(addr_, - options_); - let client_ = reactor_.run(client_)?; + let client_ = ::connect(addr_, options_)?; ::std::result::Result::Ok(SyncClient { inner: client_, - reactor: reactor_, }) } } @@ -580,16 +591,62 @@ macro_rules! service { pub fn $fn_name(&mut self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - self.reactor.run(self.inner.$fn_name($($arg),*)) + return then__(self.inner.call(tarpc_service_Request__::$fn_name(($($arg,)*)))); + + // TODO: this code is duplicated in both FutureClient and SyncClient. + fn then__(tarpc_service_msg__: + ::std::result::Result>) + -> ::std::result::Result<$out, $crate::Error<$error>> { + match tarpc_service_msg__ { + ::std::result::Result::Ok(tarpc_service_msg__) => { + if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = + tarpc_service_msg__ + { + ::std::result::Result::Ok(tarpc_service_msg__) + } else { + unreachable!() + } + } + ::std::result::Result::Err(tarpc_service_err__) => { + ::std::result::Result::Err(match tarpc_service_err__ { + $crate::Error::App(tarpc_service_err__) => { + if let tarpc_service_Error__::$fn_name( + tarpc_service_err__) = tarpc_service_err__ + { + $crate::Error::App(tarpc_service_err__) + } else { + unreachable!() + } + } + $crate::Error::ServerSerialize(tarpc_service_err__) => { + $crate::Error::ServerSerialize(tarpc_service_err__) + } + $crate::Error::ClientSerialize(tarpc_service_err__) => { + $crate::Error::ClientSerialize(tarpc_service_err__) + } + $crate::Error::Io(tarpc_service_error__) => { + $crate::Error::Io(tarpc_service_error__) + } + }) + } + } + } } )* } #[allow(non_camel_case_types)] - type tarpc_service_Client__ = + type tarpc_service_FutureClient__ = $crate::client::future::Client; + tarpc_service_Response__, + tarpc_service_Error__>; + + #[allow(non_camel_case_types)] + type tarpc_service_SyncClient__ = + $crate::client::sync::Client; #[allow(non_camel_case_types)] /// Implementation detail: Pending connection. @@ -598,7 +655,7 @@ macro_rules! service { tarpc_service_Request__, tarpc_service_Response__, tarpc_service_Error__>, - fn(tarpc_service_Client__) -> T>, + fn(tarpc_service_FutureClient__) -> T>, } impl $crate::futures::Future for tarpc_service_ConnectFuture__ { @@ -613,17 +670,18 @@ macro_rules! service { #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. - pub struct FutureClient(tarpc_service_Client__); + pub struct FutureClient(tarpc_service_FutureClient__); - impl<'a> $crate::client::future::Connect for FutureClient { + impl<'a> $crate::client::future::ClientExt for FutureClient { type ConnectFut = tarpc_service_ConnectFuture__; fn connect(tarpc_service_addr__: ::std::net::SocketAddr, tarpc_service_options__: $crate::client::Options) -> Self::ConnectFut { - let client = ::connect( - tarpc_service_addr__, tarpc_service_options__); + let client = ::connect(tarpc_service_addr__, + tarpc_service_options__); tarpc_service_ConnectFuture__ { inner: $crate::futures::Future::map(client, FutureClient) @@ -637,7 +695,7 @@ macro_rules! service { $(#[$attr])* pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::futures::future::Then< - ::Future, + ::Future, ::std::result::Result<$out, $crate::Error<$error>>, fn(::std::result::Result>) @@ -724,6 +782,7 @@ mod functional_test { use futures::{Future, failed}; use std::io; use std::net::SocketAddr; + use tokio_core::reactor; use util::FirstSocketAddr; extern crate env_logger; @@ -746,14 +805,11 @@ mod functional_test { use client::tls::Context; use native_tls::{Pkcs12, TlsAcceptor, TlsConnector}; - fn tls_context() -> (server::Options, client::Options) { + fn get_tls_server_options() -> server::Options { let buf = include_bytes!("../test/identity.p12"); let pkcs12 = unwrap!(Pkcs12::from_der(buf, "mypass")); let acceptor = unwrap!(unwrap!(TlsAcceptor::builder(pkcs12)).build()); - let server_options = server::Options::default().tls(acceptor); - let client_options = get_tls_client_options(); - - (server_options, client_options) + server::Options::default().tls(acceptor) } // Making the TlsConnector for testing needs to be OS-dependent just like native-tls. @@ -773,10 +829,11 @@ mod functional_test { let mut connector = unwrap!(TlsConnector::builder()); connector.anchor_certificates(&[cert]); - client::Options::default().tls(Context { - domain: DOMAIN.into(), - tls_connector: unwrap!(connector.build()), - }) + client::Options::default() + .tls(Context { + domain: DOMAIN.into(), + tls_connector: unwrap!(connector.build()), + }) } } else if #[cfg(all(not(target_os = "macos"), not(windows)))] { use native_tls_inner::backend::openssl::TlsConnectorBuilderExt; @@ -787,10 +844,11 @@ mod functional_test { .builder_mut() .set_ca_file("test/root-ca.pem")); - client::Options::default().tls(Context { - domain: DOMAIN.into(), - tls_connector: unwrap!(connector.build()), - }) + client::Options::default() + .tls(Context { + domain: DOMAIN.into(), + tls_connector: unwrap!(connector.build()), + }) } // not implemented for windows or other platforms } else { @@ -800,41 +858,42 @@ mod functional_test { } } - fn get_sync_client(addr: SocketAddr) -> io::Result - where C: client::sync::Connect + fn start_server_with_sync_client(server: S) -> io::Result<(SocketAddr, C)> + where C: client::sync::ClientExt, S: SyncServiceExt { - let client_options = get_tls_client_options(); - C::connect(addr, client_options) - } - - fn start_server_with_sync_client(server: S) -> (SocketAddr, io::Result) - where C: client::sync::Connect, S: SyncServiceExt - { - let (server_options, client_options) = tls_context(); + let server_options = get_tls_server_options(); let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), server_options)); - let client = C::connect(addr, client_options); - (addr, client) + let client = unwrap!(C::connect(addr, get_tls_client_options())); + Ok((addr, client)) } - fn start_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: FutureServiceExt + fn start_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: FutureServiceExt { - let (server_options, client_options) = tls_context(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - server_options).wait()); - let client = unwrap!(C::connect(addr, client_options).wait()); - (addr, client) + let mut reactor = reactor::Core::new()?; + let server_options = get_tls_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + let client_options = get_tls_client_options().handle(reactor.handle()); + let client = unwrap!(reactor.run(C::connect(addr, client_options))); + Ok((addr, reactor, client)) } - fn start_err_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: error_service::FutureServiceExt + fn start_err_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: error_service::FutureServiceExt { - let (server_options, client_options) = tls_context(); - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - server_options).wait()); - let client = unwrap!(C::connect(addr, client_options).wait()); - (addr, client) + let mut reactor = reactor::Core::new()?; + let server_options = get_tls_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + server_options)?; + let client_options = get_tls_client_options().handle(reactor.handle()); + let client = unwrap!(reactor.run(C::connect(addr, client_options))); + Ok((addr, reactor, client)) } } else { fn get_server_options() -> server::Options { @@ -846,36 +905,45 @@ mod functional_test { } fn get_sync_client(addr: SocketAddr) -> io::Result - where C: client::sync::Connect + where C: client::sync::ClientExt { C::connect(addr, get_client_options()) } - fn start_server_with_sync_client(server: S) -> (SocketAddr, io::Result) - where C: client::sync::Connect, S: SyncServiceExt + fn start_server_with_sync_client(server: S) -> io::Result<(SocketAddr, C)> + where C: client::sync::ClientExt, S: SyncServiceExt { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options())); + let options = get_server_options(); + let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), options)); + let client = unwrap!(get_sync_client(addr)); + Ok((addr, client)) + } + + fn start_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: FutureServiceExt + { + let mut reactor = reactor::Core::new()?; + let options = get_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + options)?; + let client = unwrap!(reactor.run(C::connect(addr, get_client_options()))); + Ok((addr, reactor, client)) + } + + fn start_err_server_with_async_client(server: S) + -> io::Result<(SocketAddr, reactor::Core, C)> + where C: client::future::ClientExt, S: error_service::FutureServiceExt + { + let mut reactor = reactor::Core::new()?; + let options = get_server_options(); + let addr = server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), + options)?; let client = C::connect(addr, get_client_options()); - (addr, client) - } - - fn start_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: FutureServiceExt - { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options()).wait()); - let client = unwrap!(C::connect(addr, get_client_options()).wait()); - (addr, client) - } - - fn start_err_server_with_async_client(server: S) -> (SocketAddr, C) - where C: client::future::Connect, S: error_service::FutureServiceExt - { - let addr = unwrap!(server.listen("localhost:0".first_socket_addr(), - get_server_options()).wait()); - let client = unwrap!(C::connect(addr, get_client_options()).wait()); - (addr, client) + let client = unwrap!(reactor.run(client)); + Ok((addr, reactor, client)) } } } @@ -900,8 +968,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, client) = start_server_with_sync_client::(Server); - let mut client = unwrap!(client); + let (_, mut client) = unwrap!(start_server_with_sync_client::(Server)); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -909,9 +977,9 @@ mod functional_test { #[test] fn other_service() { let _ = env_logger::init(); - let (_, client) = start_server_with_sync_client::(Server); - let mut client = client.expect("Could not connect!"); + let (_, mut client) = + unwrap!(start_server_with_sync_client::(Server)); match client.foo().err().expect("failed unwrap") { ::Error::ServerSerialize(_) => {} // good bad => panic!("Expected Error::ServerSerialize but got {}", bad), @@ -920,8 +988,9 @@ mod functional_test { } mod future { - use futures::{Finished, Future, finished}; use super::{FutureClient, FutureService, env_logger, start_server_with_async_client}; + use futures::{Finished, finished}; + use tokio_core::reactor; use util::Never; #[derive(Clone)] @@ -944,30 +1013,33 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); } #[test] fn concurrent() { let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); let req1 = client.add(1, 2); let req2 = client.add(3, 4); let req3 = client.hey("Tim".to_string()); - assert_eq!(3, req1.wait().unwrap()); - assert_eq!(7, req2.wait().unwrap()); - assert_eq!("Hey, Tim.", req3.wait().unwrap()); + assert_eq!(3, reactor.run(req1).unwrap()); + assert_eq!(7, reactor.run(req2).unwrap()); + assert_eq!("Hey, Tim.", reactor.run(req3).unwrap()); } #[test] fn other_service() { let _ = env_logger::init(); - let (_, client) = - start_server_with_async_client::(Server); - match client.foo().wait().err().unwrap() { + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + match reactor.run(client.foo()).err().unwrap() { ::Error::ServerSerialize(_) => {} // good bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad), } @@ -980,13 +1052,12 @@ mod functional_test { use super::FutureServiceExt; let _ = env_logger::init(); + let reactor = reactor::Core::new().unwrap(); let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() - .unwrap(); - Server.listen(addr, server::Options::default()) - .wait() .unwrap(); + Server.listen(addr, &reactor.handle(), server::Options::default()).unwrap(); } #[cfg(feature = "tls")] @@ -994,21 +1065,25 @@ mod functional_test { fn tcp_and_tls() { use {client, server}; use util::FirstSocketAddr; - use client::future::Connect; + use client::future::ClientExt; use super::FutureServiceExt; let _ = env_logger::init(); - let (_, client) = start_server_with_async_client::(Server); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let (_, mut reactor, client) = + unwrap!(start_server_with_async_client::(Server)); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); let addr = Server.listen("localhost:0".first_socket_addr(), + &reactor.handle(), server::Options::default()) - .wait() .unwrap(); - let client = FutureClient::connect(addr, client::Options::default()).wait().unwrap(); - assert_eq!(3, client.add(1, 2).wait().unwrap()); - assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).wait().unwrap()); + let options = client::Options::default().handle(reactor.handle()); + let client = reactor.run(FutureClient::connect(addr, options)).unwrap(); + assert_eq!(3, reactor.run(client.add(1, 2)).unwrap()); + assert_eq!("Hey, Tim.", + reactor.run(client.hey("Tim".to_string())).unwrap()); } } @@ -1036,28 +1111,19 @@ mod functional_test { use self::error_service::*; let _ = env_logger::init(); - let (addr, client) = start_err_server_with_async_client::(ErrorServer); - client.bar() - .then(move |result| { - match result.err().unwrap() { - ::Error::App(e) => { - assert_eq!(e.description(), "lol jk"); - Ok::<_, ()>(()) - } // good - bad => panic!("Expected Error::App but got {:?}", bad), - } - }) - .wait() + let (_, mut reactor, client) = + start_err_server_with_async_client::(ErrorServer).unwrap(); + reactor.run(client.bar() + .then(move |result| { + match result.err().unwrap() { + ::Error::App(e) => { + assert_eq!(e.description(), "lol jk"); + Ok::<_, ()>(()) + } // good + bad => panic!("Expected Error::App but got {:?}", bad), + } + })) .unwrap(); - - let mut client = get_sync_client::(addr).unwrap(); - match client.bar().err().unwrap() { - ::Error::App(e) => { - assert_eq!(e.description(), "lol jk"); - } // good - bad => panic!("Expected Error::App but got {:?}", bad), - } } pub mod other_service { diff --git a/src/protocol.rs b/src/protocol.rs index 7d38df8..ec9ed96 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -12,7 +12,6 @@ use std::mem; use tokio_core::io::{EasyBuf, Framed, Io}; use tokio_proto::multiplex::{ClientProto, ServerProto}; use tokio_proto::streaming::multiplex::RequestId; -use util::Debugger; // `Encode` is the type that `Codec` encodes. `Decode` is the type it decodes. pub struct Codec { @@ -97,7 +96,6 @@ impl tokio_core::io::Codec for Codec // message. self.state = Id; - trace!("--> Parsed message: {:?}", Debugger(&result)); return Ok(Some((id, result))); } } diff --git a/src/server.rs b/src/server.rs index e8314de..1e1b4e6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,6 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use {REMOTE, Reactor}; use bincode; use errors::WireError; use futures::{self, Async, Future, Stream, future}; @@ -35,24 +34,11 @@ enum Acceptor { /// Additional options to configure how the server operates. #[derive(Default)] pub struct Options { - reactor: Option, #[cfg(feature = "tls")] tls_acceptor: Option, } impl Options { - /// Listen using the given reactor handle. - pub fn handle(mut self, handle: reactor::Handle) -> Self { - self.reactor = Some(Reactor::Handle(handle)); - self - } - - /// Listen using the given reactor remote. - pub fn remote(mut self, remote: reactor::Remote) -> Self { - self.reactor = Some(Reactor::Remote(remote)); - self - } - /// Set the `TlsAcceptor` #[cfg(feature = "tls")] pub fn tls(mut self, tls_acceptor: TlsAcceptor) -> Self { @@ -66,10 +52,14 @@ impl Options { pub type Response = Result>; #[doc(hidden)] -pub fn listen(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture +pub fn listen(new_service: S, + addr: SocketAddr, + handle: &reactor::Handle, + _options: Options) + -> io::Result where S: NewService, Response = Response, - Error = io::Error> + Send + 'static, + Error = io::Error> + 'static, Req: Deserialize + 'static, Resp: Serialize + 'static, E: Serialize + 'static @@ -77,53 +67,30 @@ pub fn listen(new_service: S, addr: SocketAddr, options: Option // Similar to the client, since `Options` is not `Send`, we take the `TlsAcceptor` when it is // available. #[cfg(feature = "tls")] - let acceptor = match options.tls_acceptor { + let acceptor = match _options.tls_acceptor { Some(tls_acceptor) => Acceptor::Tls(tls_acceptor), None => Acceptor::Tcp, }; #[cfg(not(feature = "tls"))] let acceptor = Acceptor::Tcp; - match options.reactor { - None => { - let (tx, rx) = futures::oneshot(); - REMOTE.spawn(move |handle| { - Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor))) - }); - ListenFuture { inner: future::Either::A(rx) } - } - Some(Reactor::Remote(remote)) => { - let (tx, rx) = futures::oneshot(); - remote.spawn(move |handle| { - Ok(tx.complete(listen_with(new_service, addr, handle.clone(), acceptor))) - }); - ListenFuture { inner: future::Either::A(rx) } - } - Some(Reactor::Handle(handle)) => { - ListenFuture { - inner: future::Either::B(future::ok(listen_with(new_service, - addr, - handle, - acceptor))), - } - } - } + listen_with(new_service, addr, handle, acceptor) } /// Spawns a service that binds to the given address using the given handle. fn listen_with(new_service: S, addr: SocketAddr, - handle: Handle, + handle: &Handle, _acceptor: Acceptor) -> io::Result where S: NewService, Response = Response, - Error = io::Error> + Send + 'static, + Error = io::Error> + 'static, Req: Deserialize + 'static, Resp: Serialize + 'static, E: Serialize + 'static { - let listener = listener(&addr, &handle)?; + let listener = listener(&addr, handle)?; let addr = listener.local_addr()?; let handle2 = handle.clone(); @@ -175,8 +142,7 @@ fn listener(addr: &SocketAddr, handle: &Handle) -> io::Result { /// A future that resolves to a `ServerHandle`. #[doc(hidden)] pub struct ListenFuture { - inner: future::Either>, - future::FutureResult, futures::Canceled>>, + inner: future::FutureResult, futures::Canceled>, } impl Future for ListenFuture { diff --git a/src/util.rs b/src/util.rs index 0eb1f75..44f36d2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -111,18 +111,3 @@ pub trait FirstSocketAddr: ToSocketAddrs { } impl FirstSocketAddr for A {} - -/// A struct that will format as the contained type if the type impls Debug. -pub struct Debugger<'a, T: 'a>(pub &'a T); - -impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{:?}", self.0) - } -} - -impl<'a, T> fmt::Debug for Debugger<'a, T> { - default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{{not debuggable}}") - } -}