diff --git a/Cargo.toml b/Cargo.toml index 19b97d6..5c6e698 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ readme = "README.md" description = "An RPC framework for Rust with a focus on ease of use." [dependencies] -bincode = "1.0.0-alpha" +bincode = "1.0.0-alpha2" byteorder = "1.0" cfg-if = "0.1.0" futures = "0.1.7" diff --git a/benches/latency.rs b/benches/latency.rs index 3c4e49b..fd877cf 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -38,8 +38,13 @@ impl FutureService for Server { #[bench] fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) + .wait() + .unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); - bencher.iter(|| { client.ack().unwrap(); }); + bencher.iter(|| { + client.ack().unwrap(); + }); } diff --git a/examples/pubsub.rs b/examples/pubsub.rs index e2f9e1e..1ad44db 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -122,7 +122,7 @@ fn main() { .wait() .unwrap(); - let publisher_client = + let mut publisher_client = publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); let subscriber1 = Subscriber::listen(0); diff --git a/examples/readme_errors.rs b/examples/readme_errors.rs index c3f0a9c..90306ac 100644 --- a/examples/readme_errors.rs +++ b/examples/readme_errors.rs @@ -51,7 +51,7 @@ impl SyncService for HelloServer { fn main() { let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); println!("{}", client.hello("".to_string()).unwrap_err()); } diff --git a/examples/readme_sync.rs b/examples/readme_sync.rs index 80055d4..19f0193 100644 --- a/examples/readme_sync.rs +++ b/examples/readme_sync.rs @@ -31,6 +31,6 @@ impl SyncService for HelloServer { fn main() { let addr = "localhost:10000"; HelloServer.listen(addr, server::Options::default()).unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); println!("{}", client.hello("Mom".to_string()).unwrap()); } diff --git a/examples/server_calling_server.rs b/examples/server_calling_server.rs index a1ac41c..90414d8 100644 --- a/examples/server_calling_server.rs +++ b/examples/server_calling_server.rs @@ -86,7 +86,7 @@ fn main() { .wait() .unwrap(); - let double_client = double::SyncClient::connect(double_addr, client::Options::default()) + let mut double_client = double::SyncClient::connect(double_addr, client::Options::default()) .unwrap(); for i in 0..5 { println!("{:?}", double_client.double(i).unwrap()); diff --git a/examples/throughput.rs b/examples/throughput.rs index 0c99c52..e9846f4 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -57,7 +57,7 @@ fn bench_tarpc(target: u64) { server::Options::default()) .wait() .unwrap(); - let client = SyncClient::connect(addr, client::Options::default()).unwrap(); + let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); let start = time::Instant::now(); let mut nread = 0; while nread < target { diff --git a/examples/two_clients.rs b/examples/two_clients.rs index 0e56ce4..2b79538 100644 --- a/examples/two_clients.rs +++ b/examples/two_clients.rs @@ -68,8 +68,8 @@ fn main() { .wait() .unwrap(); - let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); - let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); + let mut bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap(); + let mut baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap(); info!("Result: {:?}", bar_client.bar(17)); diff --git a/src/client.rs b/src/client.rs index a1a2b19..ecdb66d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,25 +4,12 @@ // This file may not be copied, modified, or distributed except according to those terms. use {Reactor, WireError}; -use bincode::serde::DeserializeError; -use futures::{self, Future}; -use protocol::Proto; +use bincode; #[cfg(feature = "tls")] use self::tls::*; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::io; -use stream_type::StreamType; use tokio_core::reactor; -use tokio_proto::BindClient as ProtoBindClient; -use tokio_proto::multiplex::Multiplex; -use tokio_service::Service; -type WireResponse = Result>, DeserializeError>; -type ResponseFuture = futures::Map< as Service>::Future, - fn(WireResponse) -> Result>>; -type BindClient = >> as - ProtoBindClient>::BindClient; +type WireResponse = Result>, bincode::Error>; /// TLS-specific functionality #[cfg(feature = "tls")] @@ -64,74 +51,6 @@ pub mod tls { } } -/// A client that impls `tokio_service::Service` that writes and reads bytes. -/// -/// Typically, this would be combined with a serialization pre-processing step -/// and a deserialization post-processing step. -#[doc(hidden)] -pub struct Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - inner: BindClient, -} - -impl Clone for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn clone(&self) -> Self { - Client { inner: self.inner.clone() } - } -} - -impl Service for Client - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static -{ - type Request = Req; - type Response = Result>; - type Error = io::Error; - type Future = ResponseFuture; - - fn call(&self, request: Self::Request) -> Self::Future { - self.inner.call(request).map(Self::map_err) - } -} - -impl Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn new(inner: BindClient) -> Self - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - Client { inner: inner } - } - - fn map_err(resp: WireResponse) -> Result> { - resp.map(|r| r.map_err(::Error::from)) - .map_err(::Error::ClientDeserialize) - .and_then(|r| r) - } -} - -impl fmt::Debug for Client - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static -{ - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "Client {{ .. }}") - } -} - /// Additional options to configure how the client connects and operates. #[derive(Default)] pub struct Options { @@ -163,24 +82,92 @@ impl Options { /// Exposes a trait for connecting asynchronously to servers. pub mod future { - use {REMOTE, Reactor}; - use futures::{self, Async, Future, future}; + use {REMOTE, Reactor, WireError}; + use futures::{self, Future, future}; use protocol::Proto; use serde::{Deserialize, Serialize}; + use std::fmt; use std::io; - use std::marker::PhantomData; use std::net::SocketAddr; use stream_type::StreamType; - use super::{Client, Options}; - use tokio_core::net::{TcpStream, TcpStreamNew}; + use super::{Options, WireResponse}; + use tokio_core::net::TcpStream; use tokio_core::reactor; - use tokio_proto::BindClient; - cfg_if! { - if #[cfg(feature = "tls")] { - use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt}; - use super::tls::Context; - use errors::native_to_io; - } else {} + use tokio_proto::BindClient as ProtoBindClient; + use tokio_proto::multiplex::Multiplex; + use tokio_service::Service; + #[cfg(feature = "tls")] + use tokio_tls::TlsConnectorExt; + #[cfg(feature = "tls")] + use errors::native_to_io; + + /// A client that impls `tokio_service::Service` that writes and reads bytes. + #[doc(hidden)] + pub struct Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + inner: BindClient, + } + + impl Clone for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn clone(&self) -> Self { + Client { inner: self.inner.clone() } + } + } + + impl Service for Client + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + type Request = Req; + type Response = Resp; + type Error = ::Error; + type Future = ResponseFuture; + + fn call(&self, request: Self::Request) -> Self::Future { + fn identity(t: T) -> T { t } + self.inner.call(request) + .map(Self::map_err as _) + .map_err(::Error::from as _) + .and_then(identity as _) + } + } + + impl Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn new(inner: BindClient) -> Self + where Req: Serialize + Sync + Send + 'static, + Resp: Deserialize + Sync + Send + 'static, + E: Deserialize + Sync + Send + 'static + { + Client { inner: inner } + } + + fn map_err(resp: WireResponse) -> Result> { + resp.map(|r| r.map_err(::Error::from)) + .map_err(::Error::ClientSerialize) + .and_then(|r| r) + } + } + + impl fmt::Debug for Client + where Req: Serialize + 'static, + Resp: Deserialize + 'static, + E: Deserialize + 'static + { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Client {{ .. }}") + } } /// Types that can connect to a server asynchronously. @@ -192,98 +179,10 @@ pub mod future { fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut; } - type ConnectFutureInner = future::Either, MultiplexConnect>, futures::Flatten< - futures::MapErr>>, - fn(futures::Canceled) -> io::Error>>>; - /// A future that resolves to a `Client` or an `io::Error`. - #[doc(hidden)] - pub struct ConnectFuture - where Req: Serialize + 'static, - Resp: Deserialize + 'static, - E: Deserialize + 'static - { - #[cfg(not(feature = "tls"))] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner>, - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - inner: ConnectFutureInner, futures::Map, - fn(::native_tls::Error) -> io::Error>, fn(TlsStream) -> StreamType>>>, - } - - impl Future for ConnectFuture - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - type Item = Client; - type Error = io::Error; - - fn poll(&mut self) -> futures::Poll { - // Ok to unwrap because we ensure the oneshot is always completed. - match Future::poll(&mut self.inner)? { - Async::Ready(client) => Ok(Async::Ready(client)), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - struct MultiplexConnect(reactor::Handle, PhantomData<(Req, Resp, E)>); - - impl MultiplexConnect { - fn new(handle: reactor::Handle) -> Self { - MultiplexConnect(handle, PhantomData) - } - } - - impl FnOnce<(I,)> for MultiplexConnect - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static, - I: Into - { - type Output = Client; - - extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output { - Client::new(Proto::new().bind_client(&self.0, stream.into())) - } - } - - /// Provides the connection Fn impl for Tls - struct ConnectFn { - #[cfg(feature = "tls")] - tls_ctx: Option, - } - - impl FnOnce<(TcpStream,)> for ConnectFn { - #[cfg(feature = "tls")] - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - type Output = future::Either, - futures::Map, - fn(::native_tls::Error) - -> io::Error>, - fn(TlsStream) -> StreamType>>; - #[cfg(not(feature = "tls"))] - type Output = future::FutureResult; - - extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output { - #[cfg(feature = "tls")] - match self.tls_ctx { - None => future::Either::A(future::ok(StreamType::from(tcp))), - Some(tls_ctx) => { - future::Either::B(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, tcp) - .map_err(native_to_io as fn(_) -> _) - .map(StreamType::from as fn(_) -> _)) - } - } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::from(tcp)) - } - } + pub type ConnectFuture = futures::Flatten< + futures::MapErr>>, + fn(futures::Canceled) -> io::Error>>; impl Connect for Client where Req: Serialize + Sync + Send + 'static, @@ -300,42 +199,39 @@ pub mod future { #[cfg(feature = "tls")] let tls_ctx = options.tls_ctx.take(); + let connect = move |handle: &reactor::Handle| { + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .and_then(move |socket| { + #[cfg(feature = "tls")] + match tls_ctx { + Some(tls_ctx) => { + future::Either::A(tls_ctx.tls_connector + .connect_async(&tls_ctx.domain, socket) + .map(StreamType::Tls) + .map_err(native_to_io)) + } + None => future::Either::B(future::ok(StreamType::Tcp(socket))), + } + #[cfg(not(feature = "tls"))] + future::ok(StreamType::Tcp(socket)) + }) + .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) + }; let setup = move |tx: futures::sync::oneshot::Sender<_>| { move |handle: &reactor::Handle| { - let handle2 = handle.clone(); - TcpStream::connect(&addr, handle) - .and_then(move |socket| { - #[cfg(feature = "tls")] - match tls_ctx { - Some(tls_ctx) => { - future::Either::A(tls_ctx.tls_connector - .connect_async(&tls_ctx.domain, socket) - .map(StreamType::Tls) - .map_err(native_to_io)) - } - None => future::Either::B(future::ok(StreamType::Tcp(socket))), - } - #[cfg(not(feature = "tls"))] - future::ok(StreamType::Tcp(socket)) - }) - .map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp))) - .then(move |result| { - tx.complete(result); - Ok(()) - }) + connect(handle).then(move |result| { + tx.complete(result); + Ok(()) + }) } }; let rx = match options.reactor { Some(Reactor::Handle(handle)) => { - #[cfg(feature = "tls")] - let connect_fn = ConnectFn { tls_ctx: options.tls_ctx }; - #[cfg(not(feature = "tls"))] - let connect_fn = ConnectFn {}; - let tcp = TcpStream::connect(&addr, &handle) - .and_then(connect_fn) - .map(MultiplexConnect::new(handle)); - return ConnectFuture { inner: future::Either::A(tcp) }; + let (tx, rx) = futures::oneshot(); + handle.spawn(setup(tx)(&handle)); + rx } Some(Reactor::Remote(remote)) => { let (tx, rx) = futures::oneshot(); @@ -351,39 +247,31 @@ pub mod future { fn panic(canceled: futures::Canceled) -> io::Error { unreachable!(canceled) } - ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) } + rx.map_err(panic as _).flatten() } } + + type ResponseFuture = + futures::AndThen as Service>::Future, + fn(WireResponse) -> Result>>, + fn(io::Error) -> ::Error>, + Result>, + fn(Result>) -> Result>>; + type BindClient = + >> + as ProtoBindClient>::BindClient; } /// Exposes a trait for connecting synchronously to servers. pub mod sync { - use client::future::Connect as FutureConnect; - use futures::{Future, future}; - use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; - use super::{Client, Options}; - use util::FirstSocketAddr; + use super::Options; /// Types that can connect to a server synchronously. pub trait Connect: Sized { /// Connects to a server located at the given address. fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs; } - - impl Connect for Client - where Req: Serialize + Sync + Send + 'static, - Resp: Deserialize + Sync + Send + 'static, - E: Deserialize + Sync + Send + 'static - { - fn connect(addr: A, options: Options) -> Result - where A: ToSocketAddrs - { - let addr = addr.try_first_socket_addr()?; - - // Wrapped in a lazy future to ensure execution occurs when a task is present. - future::lazy(move || ::connect(addr, options)).wait() - } - } } diff --git a/src/errors.rs b/src/errors.rs index 3bb4189..cc172d2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -3,7 +3,6 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; use serde::{Deserialize, Serialize}; use std::{fmt, io}; use std::error::Error as StdError; @@ -13,23 +12,15 @@ use std::error::Error as StdError; pub enum Error { /// Any IO error. Io(io::Error), - /// Error in deserializing a server response. + /// Error serializing the client request or deserializing the server response. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ClientDeserialize(bincode::serde::DeserializeError), - /// Error in serializing a client request. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. - ClientSerialize(bincode::serde::SerializeError), - /// Error in deserializing a client request. + ClientSerialize(::bincode::Error), + /// Error serializing the server response or deserializing the client request. /// /// Typically this indicates a faulty implementation of `serde::Serialize` or /// `serde::Deserialize`. - ServerDeserialize(String), - /// Error in serializing a server response. - /// - /// Typically this indicates a faulty implementation of `serde::Serialize`. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. /// @@ -41,9 +32,7 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), - Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e), Error::App(ref e) => fmt::Display::fmt(e, f), Error::Io(ref e) => fmt::Display::fmt(e, f), @@ -54,10 +43,8 @@ impl fmt::Display for Er impl StdError for Error { fn description(&self) -> &str { match *self { - Error::ClientDeserialize(_) => "The client failed to deserialize the server response.", - Error::ClientSerialize(_) => "The client failed to serialize the request.", - Error::ServerDeserialize(_) => "The server failed to deserialize the request.", - Error::ServerSerialize(_) => "The server failed to serialize the response.", + Error::ClientSerialize(_) => "The client failed to serialize the request or deserialize the response.", + Error::ServerSerialize(_) => "The server failed to serialize the response or deserialize the request.", Error::App(ref e) => e.description(), Error::Io(ref e) => e.description(), } @@ -65,9 +52,7 @@ impl StdError for Error< fn cause(&self) -> Option<&StdError> { match *self { - Error::ClientDeserialize(ref e) => e.cause(), Error::ClientSerialize(ref e) => e.cause(), - Error::ServerDeserialize(_) | Error::ServerSerialize(_) | Error::App(_) => None, Error::Io(ref e) => e.cause(), @@ -84,7 +69,6 @@ impl From for Error { impl From> for Error { fn from(err: WireError) -> Self { match err { - WireError::ServerDeserialize(s) => Error::ServerDeserialize(s), WireError::ServerSerialize(s) => Error::ServerSerialize(s), WireError::App(e) => Error::App(e), } @@ -95,9 +79,7 @@ impl From> for Error { #[doc(hidden)] #[derive(Deserialize, Serialize, Clone, Debug)] pub enum WireError { - /// Error in deserializing a client request. - ServerDeserialize(String), - /// Error in serializing server response. + /// Error in serializing the server response or deserializing the client request. ServerSerialize(String), /// The server was unable to reply to the rpc for some reason. App(E), diff --git a/src/lib.rs b/src/lib.rs index c72da5e..ee42ff7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ //! fn main() { //! let addr = "localhost:10000"; //! let _server = HelloServer.listen(addr, server::Options::default()); -//! let client = SyncClient::connect(addr, client::Options::default()).unwrap(); +//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap(); //! println!("{}", client.hello("Mom".to_string()).unwrap()); //! } //! ``` @@ -97,7 +97,7 @@ //! let addr = "localhost:10000"; //! let acceptor = get_acceptor(); //! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor)); -//! let client = SyncClient::connect(addr, +//! let mut client = SyncClient::connect(addr, //! client::Options::default() //! .tls(client::tls::Context::new("foobar.com").unwrap())) //! .unwrap(); @@ -106,8 +106,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits, - specialization)] +#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/macros.rs b/src/macros.rs index b5e9edd..3a5f8b9 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -77,7 +77,9 @@ macro_rules! impl_deserialize { impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ { type Value = impl_deserialize_Field__; - fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + fn expecting(&self, formatter: &mut ::std::fmt::Formatter) + -> ::std::fmt::Result + { formatter.write_str("an unsigned integer") } @@ -408,7 +410,7 @@ macro_rules! service { where tarpc_service_S__: FutureService { type Request = ::std::result::Result; + $crate::bincode::Error>; type Response = $crate::server::Response; type Error = ::std::io::Error; @@ -421,7 +423,7 @@ macro_rules! service { return tarpc_service_FutureReply__::DeserializeError( $crate::futures::finished( ::std::result::Result::Err( - $crate::WireError::ServerDeserialize( + $crate::WireError::ServerSerialize( ::std::string::ToString::to_string( &tarpc_service_deserialize_err__))))); } @@ -541,22 +543,33 @@ macro_rules! service { impl SyncServiceExt for S where S: SyncService {} #[allow(unused)] - #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a blocking interface. - pub struct SyncClient(FutureClient); + pub struct SyncClient { + inner: FutureClient, + reactor: $crate::tokio_core::reactor::Core, + } + + impl ::std::fmt::Debug for SyncClient { + fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner) + } + } impl $crate::client::sync::Connect for SyncClient { - fn connect(addr_: A, opts_: $crate::client::Options) + fn connect(addr_: A, options_: $crate::client::Options) -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; - // Wrapped in a lazy future to ensure execution occurs when a task is present. - let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || { - ::connect(addr_, opts_) - }))?; - let client_ = SyncClient(client_); - ::std::result::Result::Ok(client_) + let mut reactor_ = $crate::tokio_core::reactor::Core::new()?; + let options_ = options_.handle(reactor_.handle()); + let client_ = ::connect(addr_, + options_); + let client_ = reactor_.run(client_)?; + ::std::result::Result::Ok(SyncClient { + inner: client_, + reactor: reactor_, + }) } } @@ -564,20 +577,17 @@ macro_rules! service { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&self, $($arg: $in_),*) + pub fn $fn_name(&mut self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - // Wrapped in a lazy future to ensure execution occurs when a task is present. - $crate::futures::Future::wait($crate::futures::future::lazy(move || { - (self.0).$fn_name($($arg),*) - })) + self.reactor.run(self.inner.$fn_name($($arg),*)) } )* } #[allow(non_camel_case_types)] type tarpc_service_Client__ = - $crate::client::Client; @@ -629,10 +639,8 @@ macro_rules! service { -> $crate::futures::future::Then< ::Future, ::std::result::Result<$out, $crate::Error<$error>>, - fn(::std::result::Result< - ::std::result::Result>, - ::std::io::Error>) + fn(::std::result::Result>) -> ::std::result::Result<$out, $crate::Error<$error>>> { let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*)); @@ -641,12 +649,10 @@ macro_rules! service { return $crate::futures::Future::then(tarpc_service_fut__, then__); fn then__(tarpc_service_msg__: - ::std::result::Result< ::std::result::Result>, - ::std::io::Error>) + $crate::Error>) -> ::std::result::Result<$out, $crate::Error<$error>> { - match tarpc_service_msg__? { + match tarpc_service_msg__ { ::std::result::Result::Ok(tarpc_service_msg__) => { if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = tarpc_service_msg__ @@ -667,15 +673,9 @@ macro_rules! service { unreachable!() } } - $crate::Error::ServerDeserialize(tarpc_service_err__) => { - $crate::Error::ServerDeserialize(tarpc_service_err__) - } $crate::Error::ServerSerialize(tarpc_service_err__) => { $crate::Error::ServerSerialize(tarpc_service_err__) } - $crate::Error::ClientDeserialize(tarpc_service_err__) => { - $crate::Error::ClientDeserialize(tarpc_service_err__) - } $crate::Error::ClientSerialize(tarpc_service_err__) => { $crate::Error::ClientSerialize(tarpc_service_err__) } @@ -901,7 +901,7 @@ mod functional_test { fn simple() { let _ = env_logger::init(); let (_, client) = start_server_with_sync_client::(Server); - let client = unwrap!(client); + let mut client = unwrap!(client); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap()); } @@ -911,10 +911,10 @@ mod functional_test { let _ = env_logger::init(); let (_, client) = start_server_with_sync_client::(Server); - let client = client.expect("Could not connect!"); + let mut client = client.expect("Could not connect!"); match client.foo().err().expect("failed unwrap") { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!("Expected Error::ServerDeserialize but got {}", bad), + ::Error::ServerSerialize(_) => {} // good + bad => panic!("Expected Error::ServerSerialize but got {}", bad), } } } @@ -968,8 +968,8 @@ mod functional_test { start_server_with_async_client::(Server); match client.foo().wait().err().unwrap() { - ::Error::ServerDeserialize(_) => {} // good - bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad), + ::Error::ServerSerialize(_) => {} // good + bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad), } } @@ -978,16 +978,17 @@ mod functional_test { use util::FirstSocketAddr; use server; use super::FutureServiceExt; - + let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()) + let addr = Server.listen("localhost:0".first_socket_addr(), + server::Options::default()) .wait() .unwrap(); Server.listen(addr, server::Options::default()) .wait() .unwrap(); } - + #[cfg(feature = "tls")] #[test] fn tcp_and_tls() { @@ -1050,7 +1051,7 @@ mod functional_test { .wait() .unwrap(); - let client = get_sync_client::(addr).unwrap(); + let mut client = get_sync_client::(addr).unwrap(); match client.bar().err().unwrap() { ::Error::App(e) => { assert_eq!(e.description(), "lol jk"); diff --git a/src/protocol.rs b/src/protocol.rs index b973b38..7d38df8 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {serde, tokio_core}; -use bincode::{SizeLimit, serde as bincode}; +use bincode::{self, SizeLimit}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{self, Cursor}; use std::marker::PhantomData; @@ -40,7 +40,7 @@ impl tokio_core::io::Codec for Codec Decode: serde::Deserialize { type Out = (RequestId, Encode); - type In = (RequestId, Result); + type In = (RequestId, Result); fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); @@ -121,7 +121,7 @@ impl ServerProto for Proto Decode: serde::Deserialize + 'static { type Response = Encode; - type Request = Result; + type Request = Result; type Transport = Framed>; type BindTransport = Result; @@ -135,7 +135,7 @@ impl ClientProto for Proto Encode: serde::Serialize + 'static, Decode: serde::Deserialize + 'static { - type Response = Result; + type Response = Result; type Request = Encode; type Transport = Framed>; type BindTransport = Result; @@ -158,8 +158,8 @@ fn serialize() { let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new(); codec.encode(MSG, &mut vec).unwrap(); buf.get_mut().append(&mut vec); - let actual: Result)>, - io::Error> = codec.decode(&mut buf); + let actual: Result)>, io::Error> = + codec.decode(&mut buf); match actual { Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {} diff --git a/src/server.rs b/src/server.rs index 2739c46..e8314de 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,7 +4,7 @@ // This file may not be copied, modified, or distributed except according to those terms. use {REMOTE, Reactor}; -use bincode::serde::DeserializeError; +use bincode; use errors::WireError; use futures::{self, Async, Future, Stream, future}; use net2; @@ -67,7 +67,7 @@ pub type Response = Result>; #[doc(hidden)] pub fn listen(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture - where S: NewService, + where S: NewService, Response = Response, Error = io::Error> + Send + 'static, Req: Deserialize + 'static, @@ -116,7 +116,7 @@ fn listen_with(new_service: S, handle: Handle, _acceptor: Acceptor) -> io::Result - where S: NewService, + where S: NewService, Response = Response, Error = io::Error> + Send + 'static, Req: Deserialize + 'static,