Merge pull request #82 from tikue/master

Make sure synchronous RPCs are wrapped in a lazy future.
This commit is contained in:
Adam Wright
2017-01-23 00:48:10 -08:00
committed by GitHub
7 changed files with 193 additions and 183 deletions

View File

@@ -14,7 +14,8 @@ extern crate env_logger;
extern crate futures;
use futures::Future;
use tarpc::sync::Connect;
use tarpc::{client, server};
use tarpc::client::sync::Connect;
use tarpc::util::{FirstSocketAddr, Never};
#[cfg(test)]
use test::Bencher;
@@ -28,7 +29,7 @@ struct Server;
impl FutureService for Server {
type AckFut = futures::Finished<(), Never>;
fn ack(&mut self) -> Self::AckFut {
fn ack(&self) -> Self::AckFut {
futures::finished(())
}
}
@@ -37,10 +38,8 @@ impl FutureService for Server {
#[bench]
fn latency(bencher: &mut Bencher) {
let _ = env_logger::init();
let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap();
let mut client = SyncClient::connect(addr).unwrap();
let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap();
let client = SyncClient::connect(addr, client::Options::default()).unwrap();
bencher.iter(|| {
client.ack().unwrap();
});
bencher.iter(|| { client.ack().unwrap(); });
}

View File

@@ -58,7 +58,7 @@ impl FutureService for Server {
.spawn(futures::lazy(move || {
let mut vec: Vec<u8> = Vec::with_capacity(size as usize);
for i in 0..size {
vec.push((i % 1 << 8) as u8);
vec.push(((i % 2) << 8) as u8);
}
debug!("Server sending response no. {}", request_number);
futures::finished(vec)

View File

@@ -57,7 +57,7 @@ impl subscriber::FutureService for Subscriber {
}
impl Subscriber {
fn new(id: u32) -> SocketAddr {
fn listen(id: u32) -> SocketAddr {
Subscriber { id: id }
.listen("localhost:0".first_socket_addr(),
server::Options::default())
@@ -125,10 +125,10 @@ fn main() {
let publisher_client =
publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap();
let subscriber1 = Subscriber::new(0);
let subscriber1 = Subscriber::listen(0);
publisher_client.subscribe(0, subscriber1).unwrap();
let subscriber2 = Subscriber::new(1);
let subscriber2 = Subscriber::listen(1);
publisher_client.subscribe(1, subscriber2).unwrap();

View File

@@ -30,7 +30,7 @@ lazy_static! {
fn gen_vec(size: usize) -> Vec<u8> {
let mut vec: Vec<u8> = Vec::with_capacity(size);
for i in 0..size {
vec.push((i % 1 << 8) as u8);
vec.push(((i % 2) << 8) as u8);
}
vec
}
@@ -77,8 +77,7 @@ fn bench_tcp(target: u64) {
let addr = l.local_addr().unwrap();
thread::spawn(move || {
let (mut stream, _) = l.accept().unwrap();
while let Ok(_) = stream.write_all(&*BUF) {
}
while let Ok(_) = stream.write_all(&*BUF) {}
});
let mut stream = net::TcpStream::connect(&addr).unwrap();
let mut buf = vec![0; CHUNK_SIZE as usize];
@@ -99,7 +98,7 @@ fn bench_tcp(target: u64) {
fn main() {
let _ = env_logger::init();
&*BUF; // to non-lazily initialize it.
let _ = *BUF; // To non-lazily initialize it.
bench_tcp(256 << 20);
bench_tarpc(256 << 20);
}

View File

@@ -112,6 +112,7 @@ impl Options {
/// Exposes a trait for connecting asynchronously to servers.
pub mod future {
use super::{Client, Options};
use {REMOTE, Reactor};
use futures::{self, Async, Future, future};
use protocol::Proto;
@@ -119,7 +120,6 @@ pub mod future {
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use super::{Client, Options};
use tokio_core::{self, reactor};
use tokio_core::net::TcpStream;
use tokio_proto::BindClient;
@@ -140,6 +140,7 @@ pub mod future {
Resp: Deserialize + 'static,
E: Deserialize + 'static,
{
#[allow(unknown_lints, type_complexity)]
inner:
future::Either<
futures::Map<tokio_core::net::TcpStreamNew, MultiplexConnect<Req, Resp, E>>,
@@ -231,12 +232,12 @@ pub mod future {
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use client::future;
use futures::Future;
use super::{Client, Options};
use client::future::Connect as FutureConnect;
use futures::{Future, future};
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
use super::{Client, Options};
use util::FirstSocketAddr;
/// Types that can connect to a server synchronously.
@@ -253,7 +254,10 @@ pub mod sync {
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
where A: ToSocketAddrs
{
<Self as future::Connect>::connect(addr.try_first_socket_addr()?, options).wait()
let addr = addr.try_first_socket_addr()?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
future::lazy(move || <Self as FutureConnect>::connect(addr, options)).wait()
}
}
}

View File

@@ -12,22 +12,22 @@ macro_rules! as_item {
#[doc(hidden)]
#[macro_export]
macro_rules! impl_serialize {
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($_n:expr) ) => {
($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($n_:expr) ) => {
as_item! {
impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* {
fn serialize<S>(&self, __impl_serialize_serializer: &mut S)
fn serialize<S>(&self, impl_serialize_serializer__: &mut S)
-> ::std::result::Result<(), S::Error>
where S: $crate::serde::Serializer
{
match *self {
$(
$impler::$name(ref __impl_serialize_field) =>
$impler::$name(ref impl_serialize_field__) =>
$crate::serde::Serializer::serialize_newtype_variant(
__impl_serialize_serializer,
impl_serialize_serializer__,
stringify!($impler),
$n,
stringify!($name),
__impl_serialize_field,
impl_serialize_field__,
)
),*
}
@@ -55,47 +55,47 @@ macro_rules! impl_serialize {
#[doc(hidden)]
#[macro_export]
macro_rules! impl_deserialize {
($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => (
($impler:ident, $(@($name:ident $n:expr))* -- #($n_:expr) ) => (
impl $crate::serde::Deserialize for $impler {
#[allow(non_camel_case_types)]
fn deserialize<__impl_deserialize_D>(
__impl_deserialize_deserializer: &mut __impl_deserialize_D)
-> ::std::result::Result<$impler, __impl_deserialize_D::Error>
where __impl_deserialize_D: $crate::serde::Deserializer
fn deserialize<impl_deserialize_D__>(
impl_deserialize_deserializer__: &mut impl_deserialize_D__)
-> ::std::result::Result<$impler, impl_deserialize_D__::Error>
where impl_deserialize_D__: $crate::serde::Deserializer
{
#[allow(non_camel_case_types, unused)]
enum __impl_deserialize_Field {
enum impl_deserialize_Field__ {
$($name),*
}
impl $crate::serde::Deserialize for __impl_deserialize_Field {
fn deserialize<D>(__impl_deserialize_deserializer: &mut D)
-> ::std::result::Result<__impl_deserialize_Field, D::Error>
impl $crate::serde::Deserialize for impl_deserialize_Field__ {
fn deserialize<D>(impl_deserialize_deserializer__: &mut D)
-> ::std::result::Result<impl_deserialize_Field__, D::Error>
where D: $crate::serde::Deserializer
{
struct __impl_deserialize_FieldVisitor;
impl $crate::serde::de::Visitor for __impl_deserialize_FieldVisitor {
type Value = __impl_deserialize_Field;
struct impl_deserialize_FieldVisitor__;
impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ {
type Value = impl_deserialize_Field__;
fn visit_usize<E>(&mut self, __impl_deserialize_value: usize)
-> ::std::result::Result<__impl_deserialize_Field, E>
fn visit_usize<E>(&mut self, impl_deserialize_value__: usize)
-> ::std::result::Result<impl_deserialize_Field__, E>
where E: $crate::serde::de::Error,
{
$(
if __impl_deserialize_value == $n {
if impl_deserialize_value__ == $n {
return ::std::result::Result::Ok(
__impl_deserialize_Field::$name);
impl_deserialize_Field__::$name);
}
)*
::std::result::Result::Err(
$crate::serde::de::Error::custom(
format!("No variants have a value of {}!",
__impl_deserialize_value))
impl_deserialize_value__))
)
}
}
__impl_deserialize_deserializer.deserialize_struct_field(
__impl_deserialize_FieldVisitor)
impl_deserialize_deserializer__.deserialize_struct_field(
impl_deserialize_FieldVisitor__)
}
}
@@ -103,27 +103,27 @@ macro_rules! impl_deserialize {
impl $crate::serde::de::EnumVisitor for Visitor {
type Value = $impler;
fn visit<V>(&mut self, mut __tarpc_enum_visitor: V)
fn visit<V>(&mut self, mut tarpc_enum_visitor__: V)
-> ::std::result::Result<$impler, V::Error>
where V: $crate::serde::de::VariantVisitor
{
match __tarpc_enum_visitor.visit_variant()? {
match tarpc_enum_visitor__.visit_variant()? {
$(
__impl_deserialize_Field::$name => {
impl_deserialize_Field__::$name => {
::std::result::Result::Ok(
$impler::$name(__tarpc_enum_visitor.visit_newtype()?))
$impler::$name(tarpc_enum_visitor__.visit_newtype()?))
}
),*
}
}
}
const __TARPC_VARIANTS: &'static [&'static str] = &[
const TARPC_VARIANTS__: &'static [&'static str] = &[
$(
stringify!($name)
),*
];
__impl_deserialize_deserializer.deserialize_enum(
stringify!($impler), __TARPC_VARIANTS, Visitor)
impl_deserialize_deserializer__.deserialize_enum(
stringify!($impler), TARPC_VARIANTS__, Visitor)
}
}
);
@@ -270,39 +270,39 @@ macro_rules! service {
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Request {
enum tarpc_service_Request__ {
NotIrrefutable(()),
$(
$fn_name(( $($in_,)* ))
),*
}
impl_deserialize!(__tarpc_service_Request, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_serialize!(__tarpc_service_Request, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_deserialize!(tarpc_service_Request__, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
impl_serialize!(tarpc_service_Request__, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*);
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Response {
enum tarpc_service_Response__ {
NotIrrefutable(()),
$(
$fn_name($out)
),*
}
impl_deserialize!(__tarpc_service_Response, NotIrrefutable(()) $($fn_name($out))*);
impl_serialize!(__tarpc_service_Response, {}, NotIrrefutable(()) $($fn_name($out))*);
impl_deserialize!(tarpc_service_Response__, NotIrrefutable(()) $($fn_name($out))*);
impl_serialize!(tarpc_service_Response__, {}, NotIrrefutable(()) $($fn_name($out))*);
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __tarpc_service_Error {
enum tarpc_service_Error__ {
NotIrrefutable(()),
$(
$fn_name($error)
),*
}
impl_deserialize!(__tarpc_service_Error, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(__tarpc_service_Error, {}, NotIrrefutable(()) $($fn_name($error))*);
impl_deserialize!(tarpc_service_Error__, NotIrrefutable(()) $($fn_name($error))*);
impl_serialize!(tarpc_service_Error__, {}, NotIrrefutable(()) $($fn_name($error))*);
/// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`,
/// as required by `tokio_proto::NewService`. This is required so that the service can be used
@@ -335,54 +335,54 @@ macro_rules! service {
-> $crate::server::ListenFuture
{
return $crate::server::listen(
move || Ok(__tarpc_service_AsyncServer(self.clone())),
move || Ok(tarpc_service_AsyncServer__(self.clone())),
addr,
options);
#[allow(non_camel_case_types)]
#[derive(Clone)]
struct __tarpc_service_AsyncServer<S>(S);
struct tarpc_service_AsyncServer__<S>(S);
impl<S> ::std::fmt::Debug for __tarpc_service_AsyncServer<S> {
impl<S> ::std::fmt::Debug for tarpc_service_AsyncServer__<S> {
fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(fmt, "__tarpc_service_AsyncServer {{ .. }}")
write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}")
}
}
#[allow(non_camel_case_types)]
type __tarpc_service_Future =
$crate::futures::Finished<$crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>,
type tarpc_service_Future__ =
$crate::futures::Finished<$crate::server::Response<tarpc_service_Response__,
tarpc_service_Error__>,
::std::io::Error>;
#[allow(non_camel_case_types)]
enum __tarpc_service_FutureReply<__tarpc_service_S: FutureService> {
DeserializeError(__tarpc_service_Future),
enum tarpc_service_FutureReply__<tarpc_service_S__: FutureService> {
DeserializeError(tarpc_service_Future__),
$($fn_name(
$crate::futures::Then<ty_snake_to_camel!(__tarpc_service_S::$fn_name),
__tarpc_service_Future,
$crate::futures::Then<ty_snake_to_camel!(tarpc_service_S__::$fn_name),
tarpc_service_Future__,
fn(::std::result::Result<$out, $error>)
-> __tarpc_service_Future>)),*
-> tarpc_service_Future__>)),*
}
impl<S: FutureService> $crate::futures::Future for __tarpc_service_FutureReply<S> {
type Item = $crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>;
impl<S: FutureService> $crate::futures::Future for tarpc_service_FutureReply__<S> {
type Item = $crate::server::Response<tarpc_service_Response__,
tarpc_service_Error__>;
type Error = ::std::io::Error;
fn poll(&mut self) -> $crate::futures::Poll<Self::Item, Self::Error> {
match *self {
__tarpc_service_FutureReply::DeserializeError(
ref mut __tarpc_service_future) =>
tarpc_service_FutureReply__::DeserializeError(
ref mut tarpc_service_future__) =>
{
$crate::futures::Future::poll(__tarpc_service_future)
$crate::futures::Future::poll(tarpc_service_future__)
}
$(
__tarpc_service_FutureReply::$fn_name(
ref mut __tarpc_service_future) =>
tarpc_service_FutureReply__::$fn_name(
ref mut tarpc_service_future__) =>
{
$crate::futures::Future::poll(__tarpc_service_future)
$crate::futures::Future::poll(tarpc_service_future__)
}
),*
}
@@ -391,52 +391,52 @@ macro_rules! service {
#[allow(non_camel_case_types)]
impl<__tarpc_service_S> $crate::tokio_service::Service
for __tarpc_service_AsyncServer<__tarpc_service_S>
where __tarpc_service_S: FutureService
impl<tarpc_service_S__> $crate::tokio_service::Service
for tarpc_service_AsyncServer__<tarpc_service_S__>
where tarpc_service_S__: FutureService
{
type Request = ::std::result::Result<__tarpc_service_Request,
type Request = ::std::result::Result<tarpc_service_Request__,
$crate::bincode::serde::DeserializeError>;
type Response = $crate::server::Response<__tarpc_service_Response,
__tarpc_service_Error>;
type Response = $crate::server::Response<tarpc_service_Response__,
tarpc_service_Error__>;
type Error = ::std::io::Error;
type Future = __tarpc_service_FutureReply<__tarpc_service_S>;
type Future = tarpc_service_FutureReply__<tarpc_service_S__>;
fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future {
let __tarpc_service_request = match __tarpc_service_request {
Ok(__tarpc_service_request) => __tarpc_service_request,
Err(__tarpc_service_deserialize_err) => {
return __tarpc_service_FutureReply::DeserializeError(
fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future {
let tarpc_service_request__ = match tarpc_service_request__ {
Ok(tarpc_service_request__) => tarpc_service_request__,
Err(tarpc_service_deserialize_err__) => {
return tarpc_service_FutureReply__::DeserializeError(
$crate::futures::finished(
::std::result::Result::Err(
$crate::WireError::ServerDeserialize(
::std::string::ToString::to_string(
&__tarpc_service_deserialize_err)))));
&tarpc_service_deserialize_err__)))));
}
};
match __tarpc_service_request {
__tarpc_service_Request::NotIrrefutable(()) => unreachable!(),
match tarpc_service_request__ {
tarpc_service_Request__::NotIrrefutable(()) => unreachable!(),
$(
__tarpc_service_Request::$fn_name(( $($arg,)* )) => {
fn __tarpc_service_wrap(
__tarpc_service_response:
tarpc_service_Request__::$fn_name(( $($arg,)* )) => {
fn tarpc_service_wrap__(
tarpc_service_response__:
::std::result::Result<$out, $error>)
-> __tarpc_service_Future
-> tarpc_service_Future__
{
$crate::futures::finished(
__tarpc_service_response
.map(__tarpc_service_Response::$fn_name)
.map_err(|__tarpc_service_error| {
tarpc_service_response__
.map(tarpc_service_Response__::$fn_name)
.map_err(|tarpc_service_error__| {
$crate::WireError::App(
__tarpc_service_Error::$fn_name(
__tarpc_service_error))
tarpc_service_Error__::$fn_name(
tarpc_service_error__))
})
)
}
return __tarpc_service_FutureReply::$fn_name(
return tarpc_service_FutureReply__::$fn_name(
$crate::futures::Future::then(
FutureService::$fn_name(&self.0, $($arg),*),
__tarpc_service_wrap));
tarpc_service_wrap__));
}
)*
}
@@ -468,22 +468,26 @@ macro_rules! service {
-> ::std::io::Result<::std::net::SocketAddr>
where L: ::std::net::ToSocketAddrs
{
let __tarpc_service_service = __SyncServer {
let tarpc_service__ = SyncServer__ {
service: self,
};
return $crate::futures::Future::wait(FutureServiceExt::listen(
__tarpc_service_service,
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?,
options));
let tarpc_service_addr__ =
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
return $crate::futures::Future::wait($crate::futures::future::lazy(move || {
FutureServiceExt::listen(tarpc_service__, tarpc_service_addr__, options)
}));
#[derive(Clone)]
struct __SyncServer<S> {
struct SyncServer__<S> {
service: S,
}
#[allow(non_camel_case_types)]
impl<__tarpc_service_S> FutureService for __SyncServer<__tarpc_service_S>
where __tarpc_service_S: SyncService
impl<tarpc_service_S__> FutureService for SyncServer__<tarpc_service_S__>
where tarpc_service_S__: SyncService
{
$(
impl_snake_to_camel! {
@@ -499,22 +503,22 @@ macro_rules! service {
// TODO(tikue): what do do if SyncService panics?
unimplemented!()
}
let (__tarpc_service_complete, __tarpc_service_promise) =
let (tarpc_service_complete__, tarpc_service_promise__) =
$crate::futures::oneshot();
let mut __tarpc_service_service = self.clone();
let tarpc_service__ = self.clone();
const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error =
unimplemented;
::std::thread::spawn(move || {
let __tarpc_service_reply = SyncService::$fn_name(
&mut __tarpc_service_service.service, $($arg),*);
__tarpc_service_complete.complete(
let tarpc_service_reply__ = SyncService::$fn_name(
&tarpc_service__.service, $($arg),*);
tarpc_service_complete__.complete(
$crate::futures::IntoFuture::into_future(
__tarpc_service_reply));
tarpc_service_reply__));
});
let __tarpc_service_promise =
let tarpc_service_promise__ =
$crate::futures::Future::map_err(
__tarpc_service_promise, UNIMPLEMENTED);
$crate::futures::Future::flatten(__tarpc_service_promise)
tarpc_service_promise__, UNIMPLEMENTED);
$crate::futures::Future::flatten(tarpc_service_promise__)
}
)*
}
@@ -530,16 +534,17 @@ macro_rules! service {
pub struct SyncClient(FutureClient);
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr: A, options: $crate::client::Options)
fn connect<A>(addr_: A, opts_: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let client = <FutureClient as $crate::client::future::Connect>::connect(
addr, options);
let client = $crate::futures::Future::wait(client)?;
let client = SyncClient(client);
::std::result::Result::Ok(client)
let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || {
<FutureClient as $crate::client::future::Connect>::connect(addr_, opts_)
}))?;
let client_ = SyncClient(client_);
::std::result::Result::Ok(client_)
}
}
@@ -550,28 +555,31 @@ macro_rules! service {
pub fn $fn_name(&self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
let rpc = (self.0).$fn_name($($arg),*);
$crate::futures::Future::wait(rpc)
// Wrapped in a lazy future to ensure execution occurs when a task is present.
$crate::futures::Future::wait($crate::futures::future::lazy(move || {
(self.0).$fn_name($($arg),*)
}))
}
)*
}
#[allow(non_camel_case_types)]
type __tarpc_service_Client =
$crate::client::Client<__tarpc_service_Request,
__tarpc_service_Response,
__tarpc_service_Error>;
type tarpc_service_Client__ =
$crate::client::Client<tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>;
#[allow(non_camel_case_types)]
/// Implementation detail: Pending connection.
pub struct __tarpc_service_ConnectFuture<T> {
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<__tarpc_service_Request,
__tarpc_service_Response,
__tarpc_service_Error>,
fn(__tarpc_service_Client) -> T>,
pub struct tarpc_service_ConnectFuture__<T> {
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<
tarpc_service_Request__,
tarpc_service_Response__,
tarpc_service_Error__>,
fn(tarpc_service_Client__) -> T>,
}
impl<T> $crate::futures::Future for __tarpc_service_ConnectFuture<T> {
impl<T> $crate::futures::Future for tarpc_service_ConnectFuture__<T> {
type Item = T;
type Error = ::std::io::Error;
@@ -583,19 +591,19 @@ macro_rules! service {
#[allow(unused)]
#[derive(Clone, Debug)]
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
pub struct FutureClient(__tarpc_service_Client);
pub struct FutureClient(tarpc_service_Client__);
impl<'a> $crate::client::future::Connect for FutureClient {
type ConnectFut = __tarpc_service_ConnectFuture<Self>;
type ConnectFut = tarpc_service_ConnectFuture__<Self>;
fn connect(__tarpc_service_addr: ::std::net::SocketAddr,
__tarpc_service_options: $crate::client::Options)
fn connect(tarpc_service_addr__: ::std::net::SocketAddr,
tarpc_service_options__: $crate::client::Options)
-> Self::ConnectFut
{
let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect(
__tarpc_service_addr, __tarpc_service_options);
let client = <tarpc_service_Client__ as $crate::client::future::Connect>::connect(
tarpc_service_addr__, tarpc_service_options__);
__tarpc_service_ConnectFuture {
tarpc_service_ConnectFuture__ {
inner: $crate::futures::Future::map(client, FutureClient)
}
}
@@ -609,46 +617,46 @@ macro_rules! service {
-> impl $crate::futures::Future<Item=$out, Error=$crate::Error<$error>>
+ 'static
{
let __tarpc_service_req = __tarpc_service_Request::$fn_name(($($arg,)*));
let __tarpc_service_fut =
$crate::tokio_service::Service::call(&self.0, __tarpc_service_req);
$crate::futures::Future::then(__tarpc_service_fut,
move |__tarpc_service_msg| {
match __tarpc_service_msg? {
::std::result::Result::Ok(__tarpc_service_msg) => {
if let __tarpc_service_Response::$fn_name(__tarpc_service_msg) =
__tarpc_service_msg
let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*));
let tarpc_service_fut__ =
$crate::tokio_service::Service::call(&self.0, tarpc_service_req__);
$crate::futures::Future::then(tarpc_service_fut__,
move |tarpc_service_msg__| {
match tarpc_service_msg__? {
::std::result::Result::Ok(tarpc_service_msg__) => {
if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) =
tarpc_service_msg__
{
::std::result::Result::Ok(__tarpc_service_msg)
::std::result::Result::Ok(tarpc_service_msg__)
} else {
unreachable!()
}
}
::std::result::Result::Err(__tarpc_service_err) => {
::std::result::Result::Err(match __tarpc_service_err {
$crate::Error::App(__tarpc_service_err) => {
if let __tarpc_service_Error::$fn_name(
__tarpc_service_err) = __tarpc_service_err
::std::result::Result::Err(tarpc_service_err__) => {
::std::result::Result::Err(match tarpc_service_err__ {
$crate::Error::App(tarpc_service_err__) => {
if let tarpc_service_Error__::$fn_name(
tarpc_service_err__) = tarpc_service_err__
{
$crate::Error::App(__tarpc_service_err)
$crate::Error::App(tarpc_service_err__)
} else {
unreachable!()
}
}
$crate::Error::ServerDeserialize(__tarpc_service_err) => {
$crate::Error::ServerDeserialize(__tarpc_service_err)
$crate::Error::ServerDeserialize(tarpc_service_err__) => {
$crate::Error::ServerDeserialize(tarpc_service_err__)
}
$crate::Error::ServerSerialize(__tarpc_service_err) => {
$crate::Error::ServerSerialize(__tarpc_service_err)
$crate::Error::ServerSerialize(tarpc_service_err__) => {
$crate::Error::ServerSerialize(tarpc_service_err__)
}
$crate::Error::ClientDeserialize(__tarpc_service_err) => {
$crate::Error::ClientDeserialize(__tarpc_service_err)
$crate::Error::ClientDeserialize(tarpc_service_err__) => {
$crate::Error::ClientDeserialize(tarpc_service_err__)
}
$crate::Error::ClientSerialize(__tarpc_service_err) => {
$crate::Error::ClientSerialize(__tarpc_service_err)
$crate::Error::ClientSerialize(tarpc_service_err__) => {
$crate::Error::ClientSerialize(tarpc_service_err__)
}
$crate::Error::Io(__tarpc_service_error) => {
$crate::Error::Io(__tarpc_service_error)
$crate::Error::Io(tarpc_service_error__) => {
$crate::Error::Io(tarpc_service_error__)
}
})
}
@@ -698,10 +706,10 @@ mod functional_test {
}
mod sync {
use {client, server};
use client::sync::Connect;
use super::{SyncClient, SyncService, SyncServiceExt};
use super::env_logger;
use {client, server};
use client::sync::Connect;
use util::FirstSocketAddr;
use util::Never;
@@ -745,11 +753,11 @@ mod functional_test {
}
mod future {
use super::{FutureClient, FutureService, FutureServiceExt};
use super::env_logger;
use {client, server};
use client::future::Connect;
use futures::{Finished, Future, finished};
use super::{FutureClient, FutureService, FutureServiceExt};
use super::env_logger;
use util::FirstSocketAddr;
use util::Never;

View File

@@ -75,9 +75,9 @@ pub fn listen<S, Req, Resp, E>(new_service: S, addr: SocketAddr, options: Option
/// Spawns a service that binds to the given address using the given handle.
fn listen_with<S, Req, Resp, E>(new_service: S,
addr: SocketAddr,
handle: Handle)
-> io::Result<SocketAddr>
addr: SocketAddr,
handle: Handle)
-> io::Result<SocketAddr>
where S: NewService<Request = Result<Req, DeserializeError>,
Response = Response<Resp, E>,
Error = io::Error> + Send + 'static,