Merge branch 'framed' of github.com:tikue/tarpc into framed

This commit is contained in:
Tim Kuehn
2016-09-30 15:19:16 -04:00

View File

@@ -5,7 +5,7 @@
use {WireError, tokio_proto as proto};
use bincode::serde::DeserializeError;
use futures::{Async, BoxFuture, Future};
use futures::{self, Async, BoxFuture, Future};
use futures::stream::Empty;
use std::fmt;
use std::io;
@@ -17,33 +17,39 @@ use util::Never;
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
pub struct Client<Req, Resp, E> {
inner: proto::Client<Req,
Result<Result<Resp, WireError<E>>,
DeserializeError>,
Empty<Never, io::Error>,
io::Error>,
inner: proto::Client<Req, WireResponse<Resp, E>, Empty<Never, io::Error>, io::Error>,
}
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
type ResponseFuture<Resp, E> = futures::Map<BoxFuture<WireResponse<Resp, E>, io::Error>,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Send + 'static,
Resp: Send + 'static,
E: Send + 'static,
E: Send + 'static
{
type Request = Req;
type Response = Result<Resp, ::Error<E>>;
type Error = io::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
type Future = ResponseFuture<Resp, E>;
fn poll_ready(&self) -> Async<()> {
Async::Ready(())
self.inner.poll_ready()
}
fn call(&self, request: Self::Request) -> Self::Future {
self.inner.call(proto::Message::WithoutBody(request))
.map(|r| r.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientDeserialize)
.and_then(|r| r))
.boxed()
self.inner
.call(proto::Message::WithoutBody(request))
.map(Self::map_err)
}
}
impl<Req, Resp, E> Client<Req, Resp, E> {
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientDeserialize)
.and_then(|r| r)
}
}
@@ -97,7 +103,7 @@ pub mod future {
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Send + 'static,
Resp: Deserialize + Send + 'static,
E: Deserialize + Send + 'static,
E: Deserialize + Send + 'static
{
type Fut = ClientFuture<Req, Resp, E>;
@@ -111,9 +117,11 @@ pub mod future {
TcpStream::connect(&addr, handle)
.and_then(move |tcp| {
let tcp = RefCell::new(Some(tcp));
let c = multiplex::connect(move || {
Ok(Framed::new(tcp.borrow_mut().take().unwrap()))
}, &handle2);
let c =
multiplex::connect(move || {
Ok(Framed::new(tcp.borrow_mut().take().unwrap()))
},
&handle2);
Ok(Client { inner: c })
})
.then(|client| Ok(tx.complete(client)))
@@ -140,7 +148,7 @@ pub mod sync {
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Send + 'static,
Resp: Deserialize + Send + 'static,
E: Deserialize + Send + 'static,
E: Deserialize + Send + 'static
{
fn connect<A>(addr: A) -> Result<Self, io::Error>
where A: ToSocketAddrs