Merge branch 'master' into deps-cleanup

This commit is contained in:
Adam Wright
2017-02-15 20:49:53 -08:00
committed by GitHub
14 changed files with 201 additions and 326 deletions

View File

@@ -11,7 +11,7 @@ readme = "README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[dependencies]
bincode = "1.0.0-alpha"
bincode = "1.0.0-alpha2"
byteorder = "1.0"
cfg-if = "0.1.0"
futures = "0.1.7"

View File

@@ -38,8 +38,13 @@ impl FutureService for Server {
#[bench]
fn latency(bencher: &mut Bencher) {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
bencher.iter(|| { client.ack().unwrap(); });
bencher.iter(|| {
client.ack().unwrap();
});
}

View File

@@ -122,7 +122,7 @@ fn main() {
.wait()
.unwrap();
let publisher_client =
let mut publisher_client =
publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap();
let subscriber1 = Subscriber::listen(0);

View File

@@ -51,7 +51,7 @@ impl SyncService for HelloServer {
fn main() {
let addr = HelloServer.listen("localhost:10000", server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
println!("{}", client.hello("".to_string()).unwrap_err());
}

View File

@@ -31,6 +31,6 @@ impl SyncService for HelloServer {
fn main() {
let addr = "localhost:10000";
HelloServer.listen(addr, server::Options::default()).unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());
}

View File

@@ -86,7 +86,7 @@ fn main() {
.wait()
.unwrap();
let double_client = double::SyncClient::connect(double_addr, client::Options::default())
let mut double_client = double::SyncClient::connect(double_addr, client::Options::default())
.unwrap();
for i in 0..5 {
println!("{:?}", double_client.double(i).unwrap());

View File

@@ -57,7 +57,7 @@ fn bench_tarpc(target: u64) {
server::Options::default())
.wait()
.unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
let start = time::Instant::now();
let mut nread = 0;
while nread < target {

View File

@@ -68,8 +68,8 @@ fn main() {
.wait()
.unwrap();
let bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
let mut bar_client = bar::SyncClient::connect(bar_addr, client::Options::default()).unwrap();
let mut baz_client = baz::SyncClient::connect(baz_addr, client::Options::default()).unwrap();
info!("Result: {:?}", bar_client.bar(17));

View File

@@ -4,25 +4,12 @@
// This file may not be copied, modified, or distributed except according to those terms.
use {Reactor, WireError};
use bincode::serde::DeserializeError;
use futures::{self, Future};
use protocol::Proto;
use bincode;
#[cfg(feature = "tls")]
use self::tls::*;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use stream_type::StreamType;
use tokio_core::reactor;
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, DeserializeError>;
type ResponseFuture<Req, Resp, E> = futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> = <Proto<Req, Result<Resp, WireError<E>>> as
ProtoBindClient<Multiplex, StreamType>>::BindClient;
type WireResponse<Resp, E> = Result<Result<Resp, WireError<E>>, bincode::Error>;
/// TLS-specific functionality
#[cfg(feature = "tls")]
@@ -64,74 +51,6 @@ pub mod tls {
}
}
/// A client that impls `tokio_service::Service` that writes and reads bytes.
///
/// Typically, this would be combined with a serialization pre-processing step
/// and a deserialization post-processing step.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Result<Resp, ::Error<E>>;
type Error = io::Error;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
self.inner.call(request).map(Self::map_err)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientDeserialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Additional options to configure how the client connects and operates.
#[derive(Default)]
pub struct Options {
@@ -163,24 +82,92 @@ impl Options {
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use {REMOTE, Reactor};
use futures::{self, Async, Future, future};
use {REMOTE, Reactor, WireError};
use futures::{self, Future, future};
use protocol::Proto;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use stream_type::StreamType;
use super::{Client, Options};
use tokio_core::net::{TcpStream, TcpStreamNew};
use super::{Options, WireResponse};
use tokio_core::net::TcpStream;
use tokio_core::reactor;
use tokio_proto::BindClient;
cfg_if! {
if #[cfg(feature = "tls")] {
use tokio_tls::{ConnectAsync, TlsStream, TlsConnectorExt};
use super::tls::Context;
use errors::native_to_io;
} else {}
use tokio_proto::BindClient as ProtoBindClient;
use tokio_proto::multiplex::Multiplex;
use tokio_service::Service;
#[cfg(feature = "tls")]
use tokio_tls::TlsConnectorExt;
#[cfg(feature = "tls")]
use errors::native_to_io;
/// A client that impls `tokio_service::Service` that writes and reads bytes.
#[doc(hidden)]
pub struct Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
inner: BindClient<Req, Resp, E>,
}
impl<Req, Resp, E> Clone for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn clone(&self) -> Self {
Client { inner: self.inner.clone() }
}
}
impl<Req, Resp, E> Service for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Request = Req;
type Response = Resp;
type Error = ::Error<E>;
type Future = ResponseFuture<Req, Resp, E>;
fn call(&self, request: Self::Request) -> Self::Future {
fn identity<T>(t: T) -> T { t }
self.inner.call(request)
.map(Self::map_err as _)
.map_err(::Error::from as _)
.and_then(identity as _)
}
}
impl<Req, Resp, E> Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn new(inner: BindClient<Req, Resp, E>) -> Self
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
Client { inner: inner }
}
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
resp.map(|r| r.map_err(::Error::from))
.map_err(::Error::ClientSerialize)
.and_then(|r| r)
}
}
impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Client {{ .. }}")
}
}
/// Types that can connect to a server asynchronously.
@@ -192,98 +179,10 @@ pub mod future {
fn connect(addr: SocketAddr, options: Options) -> Self::ConnectFut;
}
type ConnectFutureInner<Req, Resp, E, T> = future::Either<futures::Map<futures::AndThen<
TcpStreamNew, T, ConnectFn>, MultiplexConnect<Req, Resp, E>>, futures::Flatten<
futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>>;
/// A future that resolves to a `Client` or an `io::Error`.
#[doc(hidden)]
pub struct ConnectFuture<Req, Resp, E>
where Req: Serialize + 'static,
Resp: Deserialize + 'static,
E: Deserialize + 'static
{
#[cfg(not(feature = "tls"))]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::FutureResult<StreamType, io::Error>>,
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
inner: ConnectFutureInner<Req, Resp, E, future::Either<future::FutureResult<
StreamType, io::Error>, futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error) -> io::Error>, fn(TlsStream<TcpStream>) -> StreamType>>>,
}
impl<Req, Resp, E> Future for ConnectFuture<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
type Item = Client<Req, Resp, E>;
type Error = io::Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
// Ok to unwrap because we ensure the oneshot is always completed.
match Future::poll(&mut self.inner)? {
Async::Ready(client) => Ok(Async::Ready(client)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
struct MultiplexConnect<Req, Resp, E>(reactor::Handle, PhantomData<(Req, Resp, E)>);
impl<Req, Resp, E> MultiplexConnect<Req, Resp, E> {
fn new(handle: reactor::Handle) -> Self {
MultiplexConnect(handle, PhantomData)
}
}
impl<Req, Resp, E, I> FnOnce<(I,)> for MultiplexConnect<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static,
I: Into<StreamType>
{
type Output = Client<Req, Resp, E>;
extern "rust-call" fn call_once(self, (stream,): (I,)) -> Self::Output {
Client::new(Proto::new().bind_client(&self.0, stream.into()))
}
}
/// Provides the connection Fn impl for Tls
struct ConnectFn {
#[cfg(feature = "tls")]
tls_ctx: Option<Context>,
}
impl FnOnce<(TcpStream,)> for ConnectFn {
#[cfg(feature = "tls")]
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
type Output = future::Either<future::FutureResult<StreamType, io::Error>,
futures::Map<futures::MapErr<ConnectAsync<TcpStream>,
fn(::native_tls::Error)
-> io::Error>,
fn(TlsStream<TcpStream>) -> StreamType>>;
#[cfg(not(feature = "tls"))]
type Output = future::FutureResult<StreamType, io::Error>;
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Self::Output {
#[cfg(feature = "tls")]
match self.tls_ctx {
None => future::Either::A(future::ok(StreamType::from(tcp))),
Some(tls_ctx) => {
future::Either::B(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, tcp)
.map_err(native_to_io as fn(_) -> _)
.map(StreamType::from as fn(_) -> _))
}
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::from(tcp))
}
}
pub type ConnectFuture<Req, Resp, E> = futures::Flatten<
futures::MapErr<futures::Oneshot<io::Result<Client<Req, Resp, E>>>,
fn(futures::Canceled) -> io::Error>>;
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
@@ -300,42 +199,39 @@ pub mod future {
#[cfg(feature = "tls")]
let tls_ctx = options.tls_ctx.take();
let connect = move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
};
let setup = move |tx: futures::sync::oneshot::Sender<_>| {
move |handle: &reactor::Handle| {
let handle2 = handle.clone();
TcpStream::connect(&addr, handle)
.and_then(move |socket| {
#[cfg(feature = "tls")]
match tls_ctx {
Some(tls_ctx) => {
future::Either::A(tls_ctx.tls_connector
.connect_async(&tls_ctx.domain, socket)
.map(StreamType::Tls)
.map_err(native_to_io))
}
None => future::Either::B(future::ok(StreamType::Tcp(socket))),
}
#[cfg(not(feature = "tls"))]
future::ok(StreamType::Tcp(socket))
})
.map(move |tcp| Client::new(Proto::new().bind_client(&handle2, tcp)))
.then(move |result| {
tx.complete(result);
Ok(())
})
connect(handle).then(move |result| {
tx.complete(result);
Ok(())
})
}
};
let rx = match options.reactor {
Some(Reactor::Handle(handle)) => {
#[cfg(feature = "tls")]
let connect_fn = ConnectFn { tls_ctx: options.tls_ctx };
#[cfg(not(feature = "tls"))]
let connect_fn = ConnectFn {};
let tcp = TcpStream::connect(&addr, &handle)
.and_then(connect_fn)
.map(MultiplexConnect::new(handle));
return ConnectFuture { inner: future::Either::A(tcp) };
let (tx, rx) = futures::oneshot();
handle.spawn(setup(tx)(&handle));
rx
}
Some(Reactor::Remote(remote)) => {
let (tx, rx) = futures::oneshot();
@@ -351,39 +247,31 @@ pub mod future {
fn panic(canceled: futures::Canceled) -> io::Error {
unreachable!(canceled)
}
ConnectFuture { inner: future::Either::B(rx.map_err(panic as fn(_) -> _).flatten()) }
rx.map_err(panic as _).flatten()
}
}
type ResponseFuture<Req, Resp, E> =
futures::AndThen<futures::MapErr<
futures::Map<<BindClient<Req, Resp, E> as Service>::Future,
fn(WireResponse<Resp, E>) -> Result<Resp, ::Error<E>>>,
fn(io::Error) -> ::Error<E>>,
Result<Resp, ::Error<E>>,
fn(Result<Resp, ::Error<E>>) -> Result<Resp, ::Error<E>>>;
type BindClient<Req, Resp, E> =
<Proto<Req, Result<Resp, WireError<E>>>
as ProtoBindClient<Multiplex, StreamType>>::BindClient;
}
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use client::future::Connect as FutureConnect;
use futures::{Future, future};
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
use super::{Client, Options};
use util::FirstSocketAddr;
use super::Options;
/// Types that can connect to a server synchronously.
pub trait Connect: Sized {
/// Connects to a server located at the given address.
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error> where A: ToSocketAddrs;
}
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
where Req: Serialize + Sync + Send + 'static,
Resp: Deserialize + Sync + Send + 'static,
E: Deserialize + Sync + Send + 'static
{
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
where A: ToSocketAddrs
{
let addr = addr.try_first_socket_addr()?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
future::lazy(move || <Self as FutureConnect>::connect(addr, options)).wait()
}
}
}

View File

@@ -3,7 +3,6 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use bincode;
use serde::{Deserialize, Serialize};
use std::{fmt, io};
use std::error::Error as StdError;
@@ -13,23 +12,15 @@ use std::error::Error as StdError;
pub enum Error<E> {
/// Any IO error.
Io(io::Error),
/// Error in deserializing a server response.
/// Error serializing the client request or deserializing the server response.
///
/// Typically this indicates a faulty implementation of `serde::Serialize` or
/// `serde::Deserialize`.
ClientDeserialize(bincode::serde::DeserializeError),
/// Error in serializing a client request.
///
/// Typically this indicates a faulty implementation of `serde::Serialize`.
ClientSerialize(bincode::serde::SerializeError),
/// Error in deserializing a client request.
ClientSerialize(::bincode::Error),
/// Error serializing the server response or deserializing the client request.
///
/// Typically this indicates a faulty implementation of `serde::Serialize` or
/// `serde::Deserialize`.
ServerDeserialize(String),
/// Error in serializing a server response.
///
/// Typically this indicates a faulty implementation of `serde::Serialize`.
ServerSerialize(String),
/// The server was unable to reply to the rpc for some reason.
///
@@ -41,9 +32,7 @@ pub enum Error<E> {
impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::ClientDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ClientSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ServerDeserialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::ServerSerialize(ref e) => write!(f, r#"{}: "{}""#, self.description(), e),
Error::App(ref e) => fmt::Display::fmt(e, f),
Error::Io(ref e) => fmt::Display::fmt(e, f),
@@ -54,10 +43,8 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> fmt::Display for Er
impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<E> {
fn description(&self) -> &str {
match *self {
Error::ClientDeserialize(_) => "The client failed to deserialize the server response.",
Error::ClientSerialize(_) => "The client failed to serialize the request.",
Error::ServerDeserialize(_) => "The server failed to deserialize the request.",
Error::ServerSerialize(_) => "The server failed to serialize the response.",
Error::ClientSerialize(_) => "The client failed to serialize the request or deserialize the response.",
Error::ServerSerialize(_) => "The server failed to serialize the response or deserialize the request.",
Error::App(ref e) => e.description(),
Error::Io(ref e) => e.description(),
}
@@ -65,9 +52,7 @@ impl<E: StdError + Deserialize + Serialize + Send + 'static> StdError for Error<
fn cause(&self) -> Option<&StdError> {
match *self {
Error::ClientDeserialize(ref e) => e.cause(),
Error::ClientSerialize(ref e) => e.cause(),
Error::ServerDeserialize(_) |
Error::ServerSerialize(_) |
Error::App(_) => None,
Error::Io(ref e) => e.cause(),
@@ -84,7 +69,6 @@ impl<E> From<io::Error> for Error<E> {
impl<E> From<WireError<E>> for Error<E> {
fn from(err: WireError<E>) -> Self {
match err {
WireError::ServerDeserialize(s) => Error::ServerDeserialize(s),
WireError::ServerSerialize(s) => Error::ServerSerialize(s),
WireError::App(e) => Error::App(e),
}
@@ -95,9 +79,7 @@ impl<E> From<WireError<E>> for Error<E> {
#[doc(hidden)]
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum WireError<E> {
/// Error in deserializing a client request.
ServerDeserialize(String),
/// Error in serializing server response.
/// Error in serializing the server response or deserializing the client request.
ServerSerialize(String),
/// The server was unable to reply to the rpc for some reason.
App(E),

View File

@@ -54,7 +54,7 @@
//! fn main() {
//! let addr = "localhost:10000";
//! let _server = HelloServer.listen(addr, server::Options::default());
//! let client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! let mut client = SyncClient::connect(addr, client::Options::default()).unwrap();
//! println!("{}", client.hello("Mom".to_string()).unwrap());
//! }
//! ```
@@ -97,7 +97,7 @@
//! let addr = "localhost:10000";
//! let acceptor = get_acceptor();
//! let _server = HelloServer.listen(addr, server::Options::default().tls(acceptor));
//! let client = SyncClient::connect(addr,
//! let mut client = SyncClient::connect(addr,
//! client::Options::default()
//! .tls(client::tls::Context::new("foobar.com").unwrap()))
//! .unwrap();
@@ -106,8 +106,7 @@
//! ```
//!
#![deny(missing_docs)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, fn_traits,
specialization)]
#![feature(plugin, conservative_impl_trait, never_type, unboxed_closures, specialization)]
#![plugin(tarpc_plugins)]
extern crate byteorder;

View File

@@ -77,7 +77,9 @@ macro_rules! impl_deserialize {
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn expecting(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
fn expecting(&self, formatter: &mut ::std::fmt::Formatter)
-> ::std::fmt::Result
{
formatter.write_str("an unsigned integer")
}
@@ -408,7 +410,7 @@ macro_rules! service {
where tarpc_service_S__: FutureService
{
type Request = ::std::result::Result<tarpc_service_Request__,
$crate::bincode::serde::DeserializeError>;
$crate::bincode::Error>;
type Response = $crate::server::Response<tarpc_service_Response__,
tarpc_service_Error__>;
type Error = ::std::io::Error;
@@ -421,7 +423,7 @@ macro_rules! service {
return tarpc_service_FutureReply__::DeserializeError(
$crate::futures::finished(
::std::result::Result::Err(
$crate::WireError::ServerDeserialize(
$crate::WireError::ServerSerialize(
::std::string::ToString::to_string(
&tarpc_service_deserialize_err__)))));
}
@@ -541,22 +543,33 @@ macro_rules! service {
impl<S> SyncServiceExt for S where S: SyncService {}
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a blocking interface.
pub struct SyncClient(FutureClient);
pub struct SyncClient {
inner: FutureClient,
reactor: $crate::tokio_core::reactor::Core,
}
impl ::std::fmt::Debug for SyncClient {
fn fmt(&self, formatter: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(formatter, "SyncClient {{ inner: {:?}, .. }}", self.inner)
}
}
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr_: A, opts_: $crate::client::Options)
fn connect<A>(addr_: A, options_: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || {
<FutureClient as $crate::client::future::Connect>::connect(addr_, opts_)
}))?;
let client_ = SyncClient(client_);
::std::result::Result::Ok(client_)
let mut reactor_ = $crate::tokio_core::reactor::Core::new()?;
let options_ = options_.handle(reactor_.handle());
let client_ = <FutureClient as $crate::client::future::Connect>::connect(addr_,
options_);
let client_ = reactor_.run(client_)?;
::std::result::Result::Ok(SyncClient {
inner: client_,
reactor: reactor_,
})
}
}
@@ -564,20 +577,17 @@ macro_rules! service {
$(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*)
pub fn $fn_name(&mut self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
// Wrapped in a lazy future to ensure execution occurs when a task is present.
$crate::futures::Future::wait($crate::futures::future::lazy(move || {
(self.0).$fn_name($($arg),*)
}))
self.reactor.run(self.inner.$fn_name($($arg),*))
}
)*
}
#[allow(non_camel_case_types)]
type tarpc_service_Client__ =
$crate::client::Client<tarpc_service_Request__,
$crate::client::future::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
@@ -629,10 +639,8 @@ macro_rules! service {
-> $crate::futures::future::Then<
<tarpc_service_Client__ as $crate::tokio_service::Service>::Future,
::std::result::Result<$out, $crate::Error<$error>>,
fn(::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
fn(::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>>> {
let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*));
@@ -641,12 +649,10 @@ macro_rules! service {
return $crate::futures::Future::then(tarpc_service_fut__, then__);
fn then__(tarpc_service_msg__:
::std::result::Result<
::std::result::Result<tarpc_service_Response__,
$crate::Error<tarpc_service_Error__>>,
::std::io::Error>)
$crate::Error<tarpc_service_Error__>>)
-> ::std::result::Result<$out, $crate::Error<$error>> {
match tarpc_service_msg__? {
match tarpc_service_msg__ {
::std::result::Result::Ok(tarpc_service_msg__) => {
if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) =
tarpc_service_msg__
@@ -667,15 +673,9 @@ macro_rules! service {
unreachable!()
}
}
$crate::Error::ServerDeserialize(tarpc_service_err__) => {
$crate::Error::ServerDeserialize(tarpc_service_err__)
}
$crate::Error::ServerSerialize(tarpc_service_err__) => {
$crate::Error::ServerSerialize(tarpc_service_err__)
}
$crate::Error::ClientDeserialize(tarpc_service_err__) => {
$crate::Error::ClientDeserialize(tarpc_service_err__)
}
$crate::Error::ClientSerialize(tarpc_service_err__) => {
$crate::Error::ClientSerialize(tarpc_service_err__)
}
@@ -901,7 +901,7 @@ mod functional_test {
fn simple() {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<SyncClient, Server>(Server);
let client = unwrap!(client);
let mut client = unwrap!(client);
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".to_string()).unwrap());
}
@@ -911,10 +911,10 @@ mod functional_test {
let _ = env_logger::init();
let (_, client) = start_server_with_sync_client::<super::other_service::SyncClient,
Server>(Server);
let client = client.expect("Could not connect!");
let mut client = client.expect("Could not connect!");
match client.foo().err().expect("failed unwrap") {
::Error::ServerDeserialize(_) => {} // good
bad => panic!("Expected Error::ServerDeserialize but got {}", bad),
::Error::ServerSerialize(_) => {} // good
bad => panic!("Expected Error::ServerSerialize but got {}", bad),
}
}
}
@@ -968,8 +968,8 @@ mod functional_test {
start_server_with_async_client::<super::other_service::FutureClient,
Server>(Server);
match client.foo().wait().err().unwrap() {
::Error::ServerDeserialize(_) => {} // good
bad => panic!(r#"Expected Error::ServerDeserialize but got "{}""#, bad),
::Error::ServerSerialize(_) => {} // good
bad => panic!(r#"Expected Error::ServerSerialize but got "{}""#, bad),
}
}
@@ -978,16 +978,17 @@ mod functional_test {
use util::FirstSocketAddr;
use server;
use super::FutureServiceExt;
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default())
let addr = Server.listen("localhost:0".first_socket_addr(),
server::Options::default())
.wait()
.unwrap();
Server.listen(addr, server::Options::default())
.wait()
.unwrap();
}
#[cfg(feature = "tls")]
#[test]
fn tcp_and_tls() {
@@ -1050,7 +1051,7 @@ mod functional_test {
.wait()
.unwrap();
let client = get_sync_client::<SyncClient>(addr).unwrap();
let mut client = get_sync_client::<SyncClient>(addr).unwrap();
match client.bar().err().unwrap() {
::Error::App(e) => {
assert_eq!(e.description(), "lol jk");

View File

@@ -4,7 +4,7 @@
// This file may not be copied, modified, or distributed except according to those terms.
use {serde, tokio_core};
use bincode::{SizeLimit, serde as bincode};
use bincode::{self, SizeLimit};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, Cursor};
use std::marker::PhantomData;
@@ -40,7 +40,7 @@ impl<Encode, Decode> tokio_core::io::Codec for Codec<Encode, Decode>
Decode: serde::Deserialize
{
type Out = (RequestId, Encode);
type In = (RequestId, Result<Decode, bincode::DeserializeError>);
type In = (RequestId, Result<Decode, bincode::Error>);
fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
buf.write_u64::<BigEndian>(id).unwrap();
@@ -121,7 +121,7 @@ impl<T, Encode, Decode> ServerProto<T> for Proto<Encode, Decode>
Decode: serde::Deserialize + 'static
{
type Response = Encode;
type Request = Result<Decode, bincode::DeserializeError>;
type Request = Result<Decode, bincode::Error>;
type Transport = Framed<T, Codec<Encode, Decode>>;
type BindTransport = Result<Self::Transport, io::Error>;
@@ -135,7 +135,7 @@ impl<T, Encode, Decode> ClientProto<T> for Proto<Encode, Decode>
Encode: serde::Serialize + 'static,
Decode: serde::Deserialize + 'static
{
type Response = Result<Decode, bincode::DeserializeError>;
type Response = Result<Decode, bincode::Error>;
type Request = Encode;
type Transport = Framed<T, Codec<Encode, Decode>>;
type BindTransport = Result<Self::Transport, io::Error>;
@@ -158,8 +158,8 @@ fn serialize() {
let mut codec: Codec<(char, char, char), (char, char, char)> = Codec::new();
codec.encode(MSG, &mut vec).unwrap();
buf.get_mut().append(&mut vec);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::DeserializeError>)>,
io::Error> = codec.decode(&mut buf);
let actual: Result<Option<(u64, Result<(char, char, char), bincode::Error>)>, io::Error> =
codec.decode(&mut buf);
match actual {
Ok(Some((id, ref v))) if id == MSG.0 && *v.as_ref().unwrap() == MSG.1 => {}

View File

@@ -4,7 +4,7 @@
// This file may not be copied, modified, or distributed except according to those terms.
use {REMOTE, Reactor};
use bincode::serde::DeserializeError;
use bincode;
use errors::WireError;
use futures::{self, Async, Future, Stream, future};
use net2;
@@ -67,7 +67,7 @@ pub type Response<T, E> = Result<T, WireError<E>>;
#[doc(hidden)]
pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Options) -> ListenFuture
where S: NewService<Request = Result<Req, DeserializeError>,
where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
Req: Deserialize + 'static,
@@ -116,7 +116,7 @@ fn listen_with<S, Req, Resp, E>(new_service: S,
handle: Handle,
_acceptor: Acceptor)
-> io::Result<SocketAddr>
where S: NewService<Request = Result<Req, DeserializeError>,
where S: NewService<Request = Result<Req, bincode::Error>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,
Req: Deserialize + 'static,