diff --git a/src/client.rs b/src/client.rs index b5c98a5..112c80c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,10 +5,14 @@ use WireError; use bincode::serde::DeserializeError; +use framed::Framed; use futures::{self, Async, Future}; +use serde::{Deserialize, Serialize}; use std::fmt; use std::io; -use tokio_proto::easy::{EasyClient, EasyResponse}; +use tokio_core::net::TcpStream; +use tokio_core::reactor; +use tokio_proto::easy::{EasyClient, EasyResponse, multiplex}; use tokio_service::Service; /// A client `Service` that writes and reads bytes. @@ -43,6 +47,16 @@ impl Service for Client } impl Client { + fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self + where Req: Serialize + Send + 'static, + Resp: Deserialize + Send + 'static, + E: Deserialize + Send + 'static + { + Client { + inner: multiplex::connect(Framed::new(tcp), handle), + } + } + fn map_err(resp: WireResponse) -> Result> { resp.map(|r| r.map_err(::Error::from)) .map_err(::Error::ClientDeserialize) @@ -59,7 +73,6 @@ impl fmt::Debug for Client { /// Exposes a trait for connecting asynchronously to servers. pub mod future { use REMOTE; - use framed::Framed; use futures::{self, Async, Future}; use serde::{Deserialize, Serialize}; use std::io; @@ -68,21 +81,21 @@ pub mod future { use super::Client; use tokio_core::net::TcpStream; use tokio_core::{self, reactor}; - use tokio_proto::easy::multiplex; /// Types that can connect to a server asynchronously. - pub trait Connect: Sized { + pub trait Connect<'a>: Sized { /// The type of the future returned when calling `connect`. - type ConnectFut: Future; + type ConnectFut: Future + 'static; + /// The type of the future returned when calling `connect_with`. - type ConnectWithFut: Future; + type ConnectWithFut: Future + 'a; /// Connects to a server located at the given address, using a remote to the default /// reactor. fn connect(addr: &SocketAddr) -> Self::ConnectFut; /// Connects to a server located at the given address, using the given reactor handle. - fn connect_with(addr: &SocketAddr, handle: &reactor::Handle) -> Self::ConnectWithFut; + fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut; } /// A future that resolves to a `Client` or an `io::Error`. @@ -105,12 +118,12 @@ pub mod future { } /// A future that resolves to a `Client` or an `io::Error`. - pub struct ConnectWithFuture { + pub struct ConnectWithFuture<'a, Req, Resp, E> { inner: futures::Map>, + MultiplexConnect<'a, Req, Resp, E>>, } - impl Future for ConnectWithFuture + impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E> where Req: Serialize + Send + 'static, Resp: Deserialize + Send + 'static, E: Deserialize + Send + 'static @@ -126,15 +139,15 @@ pub mod future { } } - struct MultiplexConnect(reactor::Handle, PhantomData<(Req, Resp, E)>); + struct MultiplexConnect<'a, Req, Resp, E>(&'a reactor::Handle, PhantomData<(Req, Resp, E)>); - impl MultiplexConnect { - fn new(handle: reactor::Handle) -> Self { + impl<'a, Req, Resp, E> MultiplexConnect<'a, Req, Resp, E> { + fn new(handle: &'a reactor::Handle) -> Self { MultiplexConnect(handle, PhantomData) } } - impl FnOnce<(TcpStream,)> for MultiplexConnect + impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E> where Req: Serialize + Send + 'static, Resp: Deserialize + Send + 'static, E: Deserialize + Send + 'static @@ -142,19 +155,17 @@ pub mod future { type Output = Client; extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client { - Client { - inner: multiplex::connect(Framed::new(tcp), &self.0), - } + Client::new(tcp, self.0) } } - impl Connect for Client + impl<'a, Req, Resp, E> Connect<'a> for Client where Req: Serialize + Send + 'static, Resp: Deserialize + Send + 'static, E: Deserialize + Send + 'static { type ConnectFut = ConnectFuture; - type ConnectWithFut = ConnectWithFuture; + type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>; /// Starts an event loop on a thread and registers a new client /// connected to the given address. @@ -162,18 +173,20 @@ pub mod future { let addr = *addr; let (tx, rx) = futures::oneshot(); REMOTE.spawn(move |handle| { - Self::connect_with(&addr, handle).then(|client| { - tx.complete(client); - Ok(()) - }) + let handle2 = handle.clone(); + TcpStream::connect(&addr, handle) + .map(move |tcp| Client::new(tcp, &handle2)) + .then(move |result| { + tx.complete(result); + Ok(()) + }) }); ConnectFuture { inner: rx } } - fn connect_with(addr: &SocketAddr, handle: &reactor::Handle) -> Self::ConnectWithFut - { + fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut { ConnectWithFuture { - inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle.clone())) + inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle)) } } } diff --git a/src/macros.rs b/src/macros.rs index aabbb4e..cb83ba5 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -659,14 +659,15 @@ macro_rules! service { #[allow(non_camel_case_types)] /// Implementation detail: Pending connection. - pub struct __tarpc_service_ConnectWithFuture { - inner: $crate::futures::Map<$crate::ConnectWithFuture<__tarpc_service_Request, + pub struct __tarpc_service_ConnectWithFuture<'a, T> { + inner: $crate::futures::Map<$crate::ConnectWithFuture<'a, + __tarpc_service_Request, __tarpc_service_Response, __tarpc_service_Error>, fn(__tarpc_service_Client) -> T>, } - impl $crate::futures::Future for __tarpc_service_ConnectWithFuture { + impl<'a, T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<'a, T> { type Item = T; type Error = ::std::io::Error; @@ -680,9 +681,9 @@ macro_rules! service { /// The client stub that makes RPC calls to the server. Exposes a Future interface. pub struct FutureClient(__tarpc_service_Client); - impl $crate::future::Connect for FutureClient { + impl<'a> $crate::future::Connect<'a> for FutureClient { type ConnectFut = __tarpc_service_ConnectFuture; - type ConnectWithFut = __tarpc_service_ConnectWithFuture; + type ConnectWithFut = __tarpc_service_ConnectWithFuture<'a, Self>; fn connect(__tarpc_service_addr: &::std::net::SocketAddr) -> Self::ConnectFut { let client = <__tarpc_service_Client as $crate::future::Connect>::connect( @@ -694,7 +695,7 @@ macro_rules! service { } fn connect_with(__tarpc_service_addr: &::std::net::SocketAddr, - __tarpc_service_handle: &$crate::tokio_core::reactor::Handle) + __tarpc_service_handle: &'a $crate::tokio_core::reactor::Handle) -> Self::ConnectWithFut { let client = <__tarpc_service_Client as $crate::future::Connect>::connect_with(