diff --git a/src/client.rs b/src/client.rs index 1d69bfd..0fb0092 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,7 +5,7 @@ use {WireError, tokio_proto as proto}; use bincode::serde::DeserializeError; -use futures::{Async, BoxFuture, Future}; +use futures::{self, Async, BoxFuture, Future}; use futures::stream::Empty; use std::fmt; use std::io; @@ -17,33 +17,39 @@ use util::Never; /// Typically, this would be combined with a serialization pre-processing step /// and a deserialization post-processing step. pub struct Client { - inner: proto::Client>, - DeserializeError>, - Empty, - io::Error>, + inner: proto::Client, Empty, io::Error>, } +type WireResponse = Result>, DeserializeError>; +type ResponseFuture = futures::Map, io::Error>, + fn(WireResponse) -> Result>>; + impl Service for Client where Req: Send + 'static, Resp: Send + 'static, - E: Send + 'static, + E: Send + 'static { type Request = Req; type Response = Result>; type Error = io::Error; - type Future = BoxFuture; + type Future = ResponseFuture; fn poll_ready(&self) -> Async<()> { - Async::Ready(()) + self.inner.poll_ready() } fn call(&self, request: Self::Request) -> Self::Future { - self.inner.call(proto::Message::WithoutBody(request)) - .map(|r| r.map(|r| r.map_err(::Error::from)) - .map_err(::Error::ClientDeserialize) - .and_then(|r| r)) - .boxed() + self.inner + .call(proto::Message::WithoutBody(request)) + .map(Self::map_err) + } +} + +impl Client { + fn map_err(resp: WireResponse) -> Result> { + resp.map(|r| r.map_err(::Error::from)) + .map_err(::Error::ClientDeserialize) + .and_then(|r| r) } } @@ -97,7 +103,7 @@ pub mod future { impl Connect for Client where Req: Serialize + Send + 'static, Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static, + E: Deserialize + Send + 'static { type Fut = ClientFuture; @@ -111,9 +117,11 @@ pub mod future { TcpStream::connect(&addr, handle) .and_then(move |tcp| { let tcp = RefCell::new(Some(tcp)); - let c = multiplex::connect(move || { - Ok(Framed::new(tcp.borrow_mut().take().unwrap())) - }, &handle2); + let c = + multiplex::connect(move || { + Ok(Framed::new(tcp.borrow_mut().take().unwrap())) + }, + &handle2); Ok(Client { inner: c }) }) .then(|client| Ok(tx.complete(client))) @@ -140,7 +148,7 @@ pub mod sync { impl Connect for Client where Req: Serialize + Send + 'static, Resp: Deserialize + Send + 'static, - E: Deserialize + Send + 'static, + E: Deserialize + Send + 'static { fn connect(addr: A) -> Result where A: ToSocketAddrs