diff --git a/benches/latency.rs b/benches/latency.rs index a203af1..3c4e49b 100644 --- a/benches/latency.rs +++ b/benches/latency.rs @@ -14,7 +14,8 @@ extern crate env_logger; extern crate futures; use futures::Future; -use tarpc::sync::Connect; +use tarpc::{client, server}; +use tarpc::client::sync::Connect; use tarpc::util::{FirstSocketAddr, Never}; #[cfg(test)] use test::Bencher; @@ -28,7 +29,7 @@ struct Server; impl FutureService for Server { type AckFut = futures::Finished<(), Never>; - fn ack(&mut self) -> Self::AckFut { + fn ack(&self) -> Self::AckFut { futures::finished(()) } } @@ -37,10 +38,8 @@ impl FutureService for Server { #[bench] fn latency(bencher: &mut Bencher) { let _ = env_logger::init(); - let addr = Server.listen("localhost:0".first_socket_addr()).wait().unwrap(); - let mut client = SyncClient::connect(addr).unwrap(); + let addr = Server.listen("localhost:0".first_socket_addr(), server::Options::default()).wait().unwrap(); + let client = SyncClient::connect(addr, client::Options::default()).unwrap(); - bencher.iter(|| { - client.ack().unwrap(); - }); + bencher.iter(|| { client.ack().unwrap(); }); } diff --git a/examples/concurrency.rs b/examples/concurrency.rs index c721fe8..7cfb84f 100644 --- a/examples/concurrency.rs +++ b/examples/concurrency.rs @@ -58,7 +58,7 @@ impl FutureService for Server { .spawn(futures::lazy(move || { let mut vec: Vec = Vec::with_capacity(size as usize); for i in 0..size { - vec.push((i % 1 << 8) as u8); + vec.push(((i % 2) << 8) as u8); } debug!("Server sending response no. {}", request_number); futures::finished(vec) diff --git a/examples/pubsub.rs b/examples/pubsub.rs index 394793b..e2f9e1e 100644 --- a/examples/pubsub.rs +++ b/examples/pubsub.rs @@ -57,7 +57,7 @@ impl subscriber::FutureService for Subscriber { } impl Subscriber { - fn new(id: u32) -> SocketAddr { + fn listen(id: u32) -> SocketAddr { Subscriber { id: id } .listen("localhost:0".first_socket_addr(), server::Options::default()) @@ -125,10 +125,10 @@ fn main() { let publisher_client = publisher::SyncClient::connect(publisher_addr, client::Options::default()).unwrap(); - let subscriber1 = Subscriber::new(0); + let subscriber1 = Subscriber::listen(0); publisher_client.subscribe(0, subscriber1).unwrap(); - let subscriber2 = Subscriber::new(1); + let subscriber2 = Subscriber::listen(1); publisher_client.subscribe(1, subscriber2).unwrap(); diff --git a/examples/throughput.rs b/examples/throughput.rs index f24b8d7..0c99c52 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -30,7 +30,7 @@ lazy_static! { fn gen_vec(size: usize) -> Vec { let mut vec: Vec = Vec::with_capacity(size); for i in 0..size { - vec.push((i % 1 << 8) as u8); + vec.push(((i % 2) << 8) as u8); } vec } @@ -77,8 +77,7 @@ fn bench_tcp(target: u64) { let addr = l.local_addr().unwrap(); thread::spawn(move || { let (mut stream, _) = l.accept().unwrap(); - while let Ok(_) = stream.write_all(&*BUF) { - } + while let Ok(_) = stream.write_all(&*BUF) {} }); let mut stream = net::TcpStream::connect(&addr).unwrap(); let mut buf = vec![0; CHUNK_SIZE as usize]; @@ -99,7 +98,7 @@ fn bench_tcp(target: u64) { fn main() { let _ = env_logger::init(); - &*BUF; // to non-lazily initialize it. + let _ = *BUF; // To non-lazily initialize it. bench_tcp(256 << 20); bench_tarpc(256 << 20); } diff --git a/src/client.rs b/src/client.rs index d648831..ac4768c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -112,6 +112,7 @@ impl Options { /// Exposes a trait for connecting asynchronously to servers. pub mod future { + use super::{Client, Options}; use {REMOTE, Reactor}; use futures::{self, Async, Future, future}; use protocol::Proto; @@ -119,7 +120,6 @@ pub mod future { use std::io; use std::marker::PhantomData; use std::net::SocketAddr; - use super::{Client, Options}; use tokio_core::{self, reactor}; use tokio_core::net::TcpStream; use tokio_proto::BindClient; @@ -140,6 +140,7 @@ pub mod future { Resp: Deserialize + 'static, E: Deserialize + 'static, { + #[allow(unknown_lints, type_complexity)] inner: future::Either< futures::Map>, @@ -231,12 +232,12 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { - use client::future; - use futures::Future; + use super::{Client, Options}; + use client::future::Connect as FutureConnect; + use futures::{Future, future}; use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; - use super::{Client, Options}; use util::FirstSocketAddr; /// Types that can connect to a server synchronously. @@ -253,7 +254,10 @@ pub mod sync { fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs { - ::connect(addr.try_first_socket_addr()?, options).wait() + let addr = addr.try_first_socket_addr()?; + + // Wrapped in a lazy future to ensure execution occurs when a task is present. + future::lazy(move || ::connect(addr, options)).wait() } } } diff --git a/src/macros.rs b/src/macros.rs index 768e345..d832001 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -12,22 +12,22 @@ macro_rules! as_item { #[doc(hidden)] #[macro_export] macro_rules! impl_serialize { - ($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($_n:expr) ) => { + ($impler:ident, { $($lifetime:tt)* }, $(@($name:ident $n:expr))* -- #($n_:expr) ) => { as_item! { impl$($lifetime)* $crate::serde::Serialize for $impler$($lifetime)* { - fn serialize(&self, __impl_serialize_serializer: &mut S) + fn serialize(&self, impl_serialize_serializer__: &mut S) -> ::std::result::Result<(), S::Error> where S: $crate::serde::Serializer { match *self { $( - $impler::$name(ref __impl_serialize_field) => + $impler::$name(ref impl_serialize_field__) => $crate::serde::Serializer::serialize_newtype_variant( - __impl_serialize_serializer, + impl_serialize_serializer__, stringify!($impler), $n, stringify!($name), - __impl_serialize_field, + impl_serialize_field__, ) ),* } @@ -55,47 +55,47 @@ macro_rules! impl_serialize { #[doc(hidden)] #[macro_export] macro_rules! impl_deserialize { - ($impler:ident, $(@($name:ident $n:expr))* -- #($_n:expr) ) => ( + ($impler:ident, $(@($name:ident $n:expr))* -- #($n_:expr) ) => ( impl $crate::serde::Deserialize for $impler { #[allow(non_camel_case_types)] - fn deserialize<__impl_deserialize_D>( - __impl_deserialize_deserializer: &mut __impl_deserialize_D) - -> ::std::result::Result<$impler, __impl_deserialize_D::Error> - where __impl_deserialize_D: $crate::serde::Deserializer + fn deserialize( + impl_deserialize_deserializer__: &mut impl_deserialize_D__) + -> ::std::result::Result<$impler, impl_deserialize_D__::Error> + where impl_deserialize_D__: $crate::serde::Deserializer { #[allow(non_camel_case_types, unused)] - enum __impl_deserialize_Field { + enum impl_deserialize_Field__ { $($name),* } - impl $crate::serde::Deserialize for __impl_deserialize_Field { - fn deserialize(__impl_deserialize_deserializer: &mut D) - -> ::std::result::Result<__impl_deserialize_Field, D::Error> + impl $crate::serde::Deserialize for impl_deserialize_Field__ { + fn deserialize(impl_deserialize_deserializer__: &mut D) + -> ::std::result::Result where D: $crate::serde::Deserializer { - struct __impl_deserialize_FieldVisitor; - impl $crate::serde::de::Visitor for __impl_deserialize_FieldVisitor { - type Value = __impl_deserialize_Field; + struct impl_deserialize_FieldVisitor__; + impl $crate::serde::de::Visitor for impl_deserialize_FieldVisitor__ { + type Value = impl_deserialize_Field__; - fn visit_usize(&mut self, __impl_deserialize_value: usize) - -> ::std::result::Result<__impl_deserialize_Field, E> + fn visit_usize(&mut self, impl_deserialize_value__: usize) + -> ::std::result::Result where E: $crate::serde::de::Error, { $( - if __impl_deserialize_value == $n { + if impl_deserialize_value__ == $n { return ::std::result::Result::Ok( - __impl_deserialize_Field::$name); + impl_deserialize_Field__::$name); } )* ::std::result::Result::Err( $crate::serde::de::Error::custom( format!("No variants have a value of {}!", - __impl_deserialize_value)) + impl_deserialize_value__)) ) } } - __impl_deserialize_deserializer.deserialize_struct_field( - __impl_deserialize_FieldVisitor) + impl_deserialize_deserializer__.deserialize_struct_field( + impl_deserialize_FieldVisitor__) } } @@ -103,27 +103,27 @@ macro_rules! impl_deserialize { impl $crate::serde::de::EnumVisitor for Visitor { type Value = $impler; - fn visit(&mut self, mut __tarpc_enum_visitor: V) + fn visit(&mut self, mut tarpc_enum_visitor__: V) -> ::std::result::Result<$impler, V::Error> where V: $crate::serde::de::VariantVisitor { - match __tarpc_enum_visitor.visit_variant()? { + match tarpc_enum_visitor__.visit_variant()? { $( - __impl_deserialize_Field::$name => { + impl_deserialize_Field__::$name => { ::std::result::Result::Ok( - $impler::$name(__tarpc_enum_visitor.visit_newtype()?)) + $impler::$name(tarpc_enum_visitor__.visit_newtype()?)) } ),* } } } - const __TARPC_VARIANTS: &'static [&'static str] = &[ + const TARPC_VARIANTS__: &'static [&'static str] = &[ $( stringify!($name) ),* ]; - __impl_deserialize_deserializer.deserialize_enum( - stringify!($impler), __TARPC_VARIANTS, Visitor) + impl_deserialize_deserializer__.deserialize_enum( + stringify!($impler), TARPC_VARIANTS__, Visitor) } } ); @@ -270,39 +270,39 @@ macro_rules! service { #[allow(non_camel_case_types, unused)] #[derive(Debug)] - enum __tarpc_service_Request { + enum tarpc_service_Request__ { NotIrrefutable(()), $( $fn_name(( $($in_,)* )) ),* } - impl_deserialize!(__tarpc_service_Request, NotIrrefutable(()) $($fn_name(($($in_),*)))*); - impl_serialize!(__tarpc_service_Request, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*); + impl_deserialize!(tarpc_service_Request__, NotIrrefutable(()) $($fn_name(($($in_),*)))*); + impl_serialize!(tarpc_service_Request__, {}, NotIrrefutable(()) $($fn_name(($($in_),*)))*); #[allow(non_camel_case_types, unused)] #[derive(Debug)] - enum __tarpc_service_Response { + enum tarpc_service_Response__ { NotIrrefutable(()), $( $fn_name($out) ),* } - impl_deserialize!(__tarpc_service_Response, NotIrrefutable(()) $($fn_name($out))*); - impl_serialize!(__tarpc_service_Response, {}, NotIrrefutable(()) $($fn_name($out))*); + impl_deserialize!(tarpc_service_Response__, NotIrrefutable(()) $($fn_name($out))*); + impl_serialize!(tarpc_service_Response__, {}, NotIrrefutable(()) $($fn_name($out))*); #[allow(non_camel_case_types, unused)] #[derive(Debug)] - enum __tarpc_service_Error { + enum tarpc_service_Error__ { NotIrrefutable(()), $( $fn_name($error) ),* } - impl_deserialize!(__tarpc_service_Error, NotIrrefutable(()) $($fn_name($error))*); - impl_serialize!(__tarpc_service_Error, {}, NotIrrefutable(()) $($fn_name($error))*); + impl_deserialize!(tarpc_service_Error__, NotIrrefutable(()) $($fn_name($error))*); + impl_serialize!(tarpc_service_Error__, {}, NotIrrefutable(()) $($fn_name($error))*); /// Defines the `Future` RPC service. Implementors must be `Clone`, `Send`, and `'static`, /// as required by `tokio_proto::NewService`. This is required so that the service can be used @@ -335,54 +335,54 @@ macro_rules! service { -> $crate::server::ListenFuture { return $crate::server::listen( - move || Ok(__tarpc_service_AsyncServer(self.clone())), + move || Ok(tarpc_service_AsyncServer__(self.clone())), addr, options); #[allow(non_camel_case_types)] #[derive(Clone)] - struct __tarpc_service_AsyncServer(S); + struct tarpc_service_AsyncServer__(S); - impl ::std::fmt::Debug for __tarpc_service_AsyncServer { + impl ::std::fmt::Debug for tarpc_service_AsyncServer__ { fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(fmt, "__tarpc_service_AsyncServer {{ .. }}") + write!(fmt, "tarpc_service_AsyncServer__ {{ .. }}") } } #[allow(non_camel_case_types)] - type __tarpc_service_Future = - $crate::futures::Finished<$crate::server::Response<__tarpc_service_Response, - __tarpc_service_Error>, + type tarpc_service_Future__ = + $crate::futures::Finished<$crate::server::Response, ::std::io::Error>; #[allow(non_camel_case_types)] - enum __tarpc_service_FutureReply<__tarpc_service_S: FutureService> { - DeserializeError(__tarpc_service_Future), + enum tarpc_service_FutureReply__ { + DeserializeError(tarpc_service_Future__), $($fn_name( - $crate::futures::Then) - -> __tarpc_service_Future>)),* + -> tarpc_service_Future__>)),* } - impl $crate::futures::Future for __tarpc_service_FutureReply { - type Item = $crate::server::Response<__tarpc_service_Response, - __tarpc_service_Error>; + impl $crate::futures::Future for tarpc_service_FutureReply__ { + type Item = $crate::server::Response; type Error = ::std::io::Error; fn poll(&mut self) -> $crate::futures::Poll { match *self { - __tarpc_service_FutureReply::DeserializeError( - ref mut __tarpc_service_future) => + tarpc_service_FutureReply__::DeserializeError( + ref mut tarpc_service_future__) => { - $crate::futures::Future::poll(__tarpc_service_future) + $crate::futures::Future::poll(tarpc_service_future__) } $( - __tarpc_service_FutureReply::$fn_name( - ref mut __tarpc_service_future) => + tarpc_service_FutureReply__::$fn_name( + ref mut tarpc_service_future__) => { - $crate::futures::Future::poll(__tarpc_service_future) + $crate::futures::Future::poll(tarpc_service_future__) } ),* } @@ -391,52 +391,52 @@ macro_rules! service { #[allow(non_camel_case_types)] - impl<__tarpc_service_S> $crate::tokio_service::Service - for __tarpc_service_AsyncServer<__tarpc_service_S> - where __tarpc_service_S: FutureService + impl $crate::tokio_service::Service + for tarpc_service_AsyncServer__ + where tarpc_service_S__: FutureService { - type Request = ::std::result::Result<__tarpc_service_Request, + type Request = ::std::result::Result; - type Response = $crate::server::Response<__tarpc_service_Response, - __tarpc_service_Error>; + type Response = $crate::server::Response; type Error = ::std::io::Error; - type Future = __tarpc_service_FutureReply<__tarpc_service_S>; + type Future = tarpc_service_FutureReply__; - fn call(&self, __tarpc_service_request: Self::Request) -> Self::Future { - let __tarpc_service_request = match __tarpc_service_request { - Ok(__tarpc_service_request) => __tarpc_service_request, - Err(__tarpc_service_deserialize_err) => { - return __tarpc_service_FutureReply::DeserializeError( + fn call(&self, tarpc_service_request__: Self::Request) -> Self::Future { + let tarpc_service_request__ = match tarpc_service_request__ { + Ok(tarpc_service_request__) => tarpc_service_request__, + Err(tarpc_service_deserialize_err__) => { + return tarpc_service_FutureReply__::DeserializeError( $crate::futures::finished( ::std::result::Result::Err( $crate::WireError::ServerDeserialize( ::std::string::ToString::to_string( - &__tarpc_service_deserialize_err))))); + &tarpc_service_deserialize_err__))))); } }; - match __tarpc_service_request { - __tarpc_service_Request::NotIrrefutable(()) => unreachable!(), + match tarpc_service_request__ { + tarpc_service_Request__::NotIrrefutable(()) => unreachable!(), $( - __tarpc_service_Request::$fn_name(( $($arg,)* )) => { - fn __tarpc_service_wrap( - __tarpc_service_response: + tarpc_service_Request__::$fn_name(( $($arg,)* )) => { + fn tarpc_service_wrap__( + tarpc_service_response__: ::std::result::Result<$out, $error>) - -> __tarpc_service_Future + -> tarpc_service_Future__ { $crate::futures::finished( - __tarpc_service_response - .map(__tarpc_service_Response::$fn_name) - .map_err(|__tarpc_service_error| { + tarpc_service_response__ + .map(tarpc_service_Response__::$fn_name) + .map_err(|tarpc_service_error__| { $crate::WireError::App( - __tarpc_service_Error::$fn_name( - __tarpc_service_error)) + tarpc_service_Error__::$fn_name( + tarpc_service_error__)) }) ) } - return __tarpc_service_FutureReply::$fn_name( + return tarpc_service_FutureReply__::$fn_name( $crate::futures::Future::then( FutureService::$fn_name(&self.0, $($arg),*), - __tarpc_service_wrap)); + tarpc_service_wrap__)); } )* } @@ -468,22 +468,26 @@ macro_rules! service { -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { - let __tarpc_service_service = __SyncServer { + let tarpc_service__ = SyncServer__ { service: self, }; - return $crate::futures::Future::wait(FutureServiceExt::listen( - __tarpc_service_service, - $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?, - options)); + + let tarpc_service_addr__ = + $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; + + // Wrapped in a lazy future to ensure execution occurs when a task is present. + return $crate::futures::Future::wait($crate::futures::future::lazy(move || { + FutureServiceExt::listen(tarpc_service__, tarpc_service_addr__, options) + })); #[derive(Clone)] - struct __SyncServer { + struct SyncServer__ { service: S, } #[allow(non_camel_case_types)] - impl<__tarpc_service_S> FutureService for __SyncServer<__tarpc_service_S> - where __tarpc_service_S: SyncService + impl FutureService for SyncServer__ + where tarpc_service_S__: SyncService { $( impl_snake_to_camel! { @@ -499,22 +503,22 @@ macro_rules! service { // TODO(tikue): what do do if SyncService panics? unimplemented!() } - let (__tarpc_service_complete, __tarpc_service_promise) = + let (tarpc_service_complete__, tarpc_service_promise__) = $crate::futures::oneshot(); - let mut __tarpc_service_service = self.clone(); + let tarpc_service__ = self.clone(); const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error = unimplemented; ::std::thread::spawn(move || { - let __tarpc_service_reply = SyncService::$fn_name( - &mut __tarpc_service_service.service, $($arg),*); - __tarpc_service_complete.complete( + let tarpc_service_reply__ = SyncService::$fn_name( + &tarpc_service__.service, $($arg),*); + tarpc_service_complete__.complete( $crate::futures::IntoFuture::into_future( - __tarpc_service_reply)); + tarpc_service_reply__)); }); - let __tarpc_service_promise = + let tarpc_service_promise__ = $crate::futures::Future::map_err( - __tarpc_service_promise, UNIMPLEMENTED); - $crate::futures::Future::flatten(__tarpc_service_promise) + tarpc_service_promise__, UNIMPLEMENTED); + $crate::futures::Future::flatten(tarpc_service_promise__) } )* } @@ -530,16 +534,17 @@ macro_rules! service { pub struct SyncClient(FutureClient); impl $crate::client::sync::Connect for SyncClient { - fn connect(addr: A, options: $crate::client::Options) + fn connect(addr_: A, opts_: $crate::client::Options) -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { - let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - let client = ::connect( - addr, options); - let client = $crate::futures::Future::wait(client)?; - let client = SyncClient(client); - ::std::result::Result::Ok(client) + let addr_ = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr_)?; + // Wrapped in a lazy future to ensure execution occurs when a task is present. + let client_ = $crate::futures::Future::wait($crate::futures::future::lazy(move || { + ::connect(addr_, opts_) + }))?; + let client_ = SyncClient(client_); + ::std::result::Result::Ok(client_) } } @@ -550,28 +555,31 @@ macro_rules! service { pub fn $fn_name(&self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - let rpc = (self.0).$fn_name($($arg),*); - $crate::futures::Future::wait(rpc) + // Wrapped in a lazy future to ensure execution occurs when a task is present. + $crate::futures::Future::wait($crate::futures::future::lazy(move || { + (self.0).$fn_name($($arg),*) + })) } )* } #[allow(non_camel_case_types)] - type __tarpc_service_Client = - $crate::client::Client<__tarpc_service_Request, - __tarpc_service_Response, - __tarpc_service_Error>; + type tarpc_service_Client__ = + $crate::client::Client; #[allow(non_camel_case_types)] /// Implementation detail: Pending connection. - pub struct __tarpc_service_ConnectFuture { - inner: $crate::futures::Map<$crate::client::future::ConnectFuture<__tarpc_service_Request, - __tarpc_service_Response, - __tarpc_service_Error>, - fn(__tarpc_service_Client) -> T>, + pub struct tarpc_service_ConnectFuture__ { + inner: $crate::futures::Map<$crate::client::future::ConnectFuture< + tarpc_service_Request__, + tarpc_service_Response__, + tarpc_service_Error__>, + fn(tarpc_service_Client__) -> T>, } - impl $crate::futures::Future for __tarpc_service_ConnectFuture { + impl $crate::futures::Future for tarpc_service_ConnectFuture__ { type Item = T; type Error = ::std::io::Error; @@ -583,19 +591,19 @@ macro_rules! service { #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. - pub struct FutureClient(__tarpc_service_Client); + pub struct FutureClient(tarpc_service_Client__); impl<'a> $crate::client::future::Connect for FutureClient { - type ConnectFut = __tarpc_service_ConnectFuture; + type ConnectFut = tarpc_service_ConnectFuture__; - fn connect(__tarpc_service_addr: ::std::net::SocketAddr, - __tarpc_service_options: $crate::client::Options) + fn connect(tarpc_service_addr__: ::std::net::SocketAddr, + tarpc_service_options__: $crate::client::Options) -> Self::ConnectFut { - let client = <__tarpc_service_Client as $crate::client::future::Connect>::connect( - __tarpc_service_addr, __tarpc_service_options); + let client = ::connect( + tarpc_service_addr__, tarpc_service_options__); - __tarpc_service_ConnectFuture { + tarpc_service_ConnectFuture__ { inner: $crate::futures::Future::map(client, FutureClient) } } @@ -609,46 +617,46 @@ macro_rules! service { -> impl $crate::futures::Future> + 'static { - let __tarpc_service_req = __tarpc_service_Request::$fn_name(($($arg,)*)); - let __tarpc_service_fut = - $crate::tokio_service::Service::call(&self.0, __tarpc_service_req); - $crate::futures::Future::then(__tarpc_service_fut, - move |__tarpc_service_msg| { - match __tarpc_service_msg? { - ::std::result::Result::Ok(__tarpc_service_msg) => { - if let __tarpc_service_Response::$fn_name(__tarpc_service_msg) = - __tarpc_service_msg + let tarpc_service_req__ = tarpc_service_Request__::$fn_name(($($arg,)*)); + let tarpc_service_fut__ = + $crate::tokio_service::Service::call(&self.0, tarpc_service_req__); + $crate::futures::Future::then(tarpc_service_fut__, + move |tarpc_service_msg__| { + match tarpc_service_msg__? { + ::std::result::Result::Ok(tarpc_service_msg__) => { + if let tarpc_service_Response__::$fn_name(tarpc_service_msg__) = + tarpc_service_msg__ { - ::std::result::Result::Ok(__tarpc_service_msg) + ::std::result::Result::Ok(tarpc_service_msg__) } else { unreachable!() } } - ::std::result::Result::Err(__tarpc_service_err) => { - ::std::result::Result::Err(match __tarpc_service_err { - $crate::Error::App(__tarpc_service_err) => { - if let __tarpc_service_Error::$fn_name( - __tarpc_service_err) = __tarpc_service_err + ::std::result::Result::Err(tarpc_service_err__) => { + ::std::result::Result::Err(match tarpc_service_err__ { + $crate::Error::App(tarpc_service_err__) => { + if let tarpc_service_Error__::$fn_name( + tarpc_service_err__) = tarpc_service_err__ { - $crate::Error::App(__tarpc_service_err) + $crate::Error::App(tarpc_service_err__) } else { unreachable!() } } - $crate::Error::ServerDeserialize(__tarpc_service_err) => { - $crate::Error::ServerDeserialize(__tarpc_service_err) + $crate::Error::ServerDeserialize(tarpc_service_err__) => { + $crate::Error::ServerDeserialize(tarpc_service_err__) } - $crate::Error::ServerSerialize(__tarpc_service_err) => { - $crate::Error::ServerSerialize(__tarpc_service_err) + $crate::Error::ServerSerialize(tarpc_service_err__) => { + $crate::Error::ServerSerialize(tarpc_service_err__) } - $crate::Error::ClientDeserialize(__tarpc_service_err) => { - $crate::Error::ClientDeserialize(__tarpc_service_err) + $crate::Error::ClientDeserialize(tarpc_service_err__) => { + $crate::Error::ClientDeserialize(tarpc_service_err__) } - $crate::Error::ClientSerialize(__tarpc_service_err) => { - $crate::Error::ClientSerialize(__tarpc_service_err) + $crate::Error::ClientSerialize(tarpc_service_err__) => { + $crate::Error::ClientSerialize(tarpc_service_err__) } - $crate::Error::Io(__tarpc_service_error) => { - $crate::Error::Io(__tarpc_service_error) + $crate::Error::Io(tarpc_service_error__) => { + $crate::Error::Io(tarpc_service_error__) } }) } @@ -698,10 +706,10 @@ mod functional_test { } mod sync { - use {client, server}; - use client::sync::Connect; use super::{SyncClient, SyncService, SyncServiceExt}; use super::env_logger; + use {client, server}; + use client::sync::Connect; use util::FirstSocketAddr; use util::Never; @@ -745,11 +753,11 @@ mod functional_test { } mod future { + use super::{FutureClient, FutureService, FutureServiceExt}; + use super::env_logger; use {client, server}; use client::future::Connect; use futures::{Finished, Future, finished}; - use super::{FutureClient, FutureService, FutureServiceExt}; - use super::env_logger; use util::FirstSocketAddr; use util::Never; diff --git a/src/server.rs b/src/server.rs index 29e34a7..8f1248d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -75,9 +75,9 @@ pub fn listen(new_service: S, addr: SocketAddr, options: Option /// Spawns a service that binds to the given address using the given handle. fn listen_with(new_service: S, - addr: SocketAddr, - handle: Handle) - -> io::Result + addr: SocketAddr, + handle: Handle) + -> io::Result where S: NewService, Response = Response, Error = io::Error> + Send + 'static,