From 45fa4c7bf1aba3e3158aa9361f08e89858424b3d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 22 Jan 2017 19:38:25 -0800 Subject: [PATCH] Make sure synchronous RPCs are wrapped in a lazy future. Some future-returning fns implicitly require the presence of an execution task. Wrapping in a lazy future ensures that by the time the future is polled, there is a task present. --- src/client.rs | 9 ++++++--- src/macros.rs | 46 +++++++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/client.rs b/src/client.rs index d648831..80fcbd6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -231,8 +231,8 @@ pub mod future { /// Exposes a trait for connecting synchronously to servers. pub mod sync { - use client::future; - use futures::Future; + use client::future::Connect as FutureConnect; + use futures::{Future, future}; use serde::{Deserialize, Serialize}; use std::io; use std::net::ToSocketAddrs; @@ -253,7 +253,10 @@ pub mod sync { fn connect(addr: A, options: Options) -> Result where A: ToSocketAddrs { - ::connect(addr.try_first_socket_addr()?, options).wait() + let addr = addr.try_first_socket_addr()?; + + // Wrapped in a lazy future to ensure execution occurs when a task is present. + future::lazy(move || ::connect(addr, options)).wait() } } } diff --git a/src/macros.rs b/src/macros.rs index 768e345..74bf919 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -468,13 +468,17 @@ macro_rules! service { -> ::std::io::Result<::std::net::SocketAddr> where L: ::std::net::ToSocketAddrs { - let __tarpc_service_service = __SyncServer { + let __tarpc_service = __SyncServer { service: self, }; - return $crate::futures::Future::wait(FutureServiceExt::listen( - __tarpc_service_service, - $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?, - options)); + + let __tarpc_service_addr = + $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; + + // Wrapped in a lazy future to ensure execution occurs when a task is present. + return $crate::futures::Future::wait($crate::futures::future::lazy(move || { + FutureServiceExt::listen(__tarpc_service, __tarpc_service_addr, options) + })); #[derive(Clone)] struct __SyncServer { @@ -501,12 +505,12 @@ macro_rules! service { } let (__tarpc_service_complete, __tarpc_service_promise) = $crate::futures::oneshot(); - let mut __tarpc_service_service = self.clone(); + let mut __tarpc_service = self.clone(); const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error = unimplemented; ::std::thread::spawn(move || { let __tarpc_service_reply = SyncService::$fn_name( - &mut __tarpc_service_service.service, $($arg),*); + &mut __tarpc_service.service, $($arg),*); __tarpc_service_complete.complete( $crate::futures::IntoFuture::into_future( __tarpc_service_reply)); @@ -530,16 +534,17 @@ macro_rules! service { pub struct SyncClient(FutureClient); impl $crate::client::sync::Connect for SyncClient { - fn connect(addr: A, options: $crate::client::Options) + fn connect(_addr: A, _opts: $crate::client::Options) -> ::std::result::Result where A: ::std::net::ToSocketAddrs, { - let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?; - let client = ::connect( - addr, options); - let client = $crate::futures::Future::wait(client)?; - let client = SyncClient(client); - ::std::result::Result::Ok(client) + let _addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&_addr)?; + // Wrapped in a lazy future to ensure execution occurs when a task is present. + let _client = $crate::futures::Future::wait($crate::futures::future::lazy(move || { + ::connect(_addr, _opts) + }))?; + let _client = SyncClient(_client); + ::std::result::Result::Ok(_client) } } @@ -550,8 +555,10 @@ macro_rules! service { pub fn $fn_name(&self, $($arg: $in_),*) -> ::std::result::Result<$out, $crate::Error<$error>> { - let rpc = (self.0).$fn_name($($arg),*); - $crate::futures::Future::wait(rpc) + // Wrapped in a lazy future to ensure execution occurs when a task is present. + $crate::futures::Future::wait($crate::futures::future::lazy(move || { + (self.0).$fn_name($($arg),*) + })) } )* } @@ -565,9 +572,10 @@ macro_rules! service { #[allow(non_camel_case_types)] /// Implementation detail: Pending connection. pub struct __tarpc_service_ConnectFuture { - inner: $crate::futures::Map<$crate::client::future::ConnectFuture<__tarpc_service_Request, - __tarpc_service_Response, - __tarpc_service_Error>, + inner: $crate::futures::Map<$crate::client::future::ConnectFuture< + __tarpc_service_Request, + __tarpc_service_Response, + __tarpc_service_Error>, fn(__tarpc_service_Client) -> T>, }