Make sure synchronous RPCs are wrapped in a lazy future.

Some future-returning fns implicitly require the presence of an execution task. Wrapping in a lazy future ensures that by the time the future is polled, there is a task present.
This commit is contained in:
Tim Kuehn
2017-01-22 19:38:25 -08:00
parent c7c18cbaaa
commit 45fa4c7bf1
2 changed files with 33 additions and 22 deletions

View File

@@ -231,8 +231,8 @@ pub mod future {
/// Exposes a trait for connecting synchronously to servers.
pub mod sync {
use client::future;
use futures::Future;
use client::future::Connect as FutureConnect;
use futures::{Future, future};
use serde::{Deserialize, Serialize};
use std::io;
use std::net::ToSocketAddrs;
@@ -253,7 +253,10 @@ pub mod sync {
fn connect<A>(addr: A, options: Options) -> Result<Self, io::Error>
where A: ToSocketAddrs
{
<Self as future::Connect>::connect(addr.try_first_socket_addr()?, options).wait()
let addr = addr.try_first_socket_addr()?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
future::lazy(move || <Self as FutureConnect>::connect(addr, options)).wait()
}
}
}

View File

@@ -468,13 +468,17 @@ macro_rules! service {
-> ::std::io::Result<::std::net::SocketAddr>
where L: ::std::net::ToSocketAddrs
{
let __tarpc_service_service = __SyncServer {
let __tarpc_service = __SyncServer {
service: self,
};
return $crate::futures::Future::wait(FutureServiceExt::listen(
__tarpc_service_service,
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?,
options));
let __tarpc_service_addr =
$crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
return $crate::futures::Future::wait($crate::futures::future::lazy(move || {
FutureServiceExt::listen(__tarpc_service, __tarpc_service_addr, options)
}));
#[derive(Clone)]
struct __SyncServer<S> {
@@ -501,12 +505,12 @@ macro_rules! service {
}
let (__tarpc_service_complete, __tarpc_service_promise) =
$crate::futures::oneshot();
let mut __tarpc_service_service = self.clone();
let mut __tarpc_service = self.clone();
const UNIMPLEMENTED: fn($crate::futures::Canceled) -> $error =
unimplemented;
::std::thread::spawn(move || {
let __tarpc_service_reply = SyncService::$fn_name(
&mut __tarpc_service_service.service, $($arg),*);
&mut __tarpc_service.service, $($arg),*);
__tarpc_service_complete.complete(
$crate::futures::IntoFuture::into_future(
__tarpc_service_reply));
@@ -530,16 +534,17 @@ macro_rules! service {
pub struct SyncClient(FutureClient);
impl $crate::client::sync::Connect for SyncClient {
fn connect<A>(addr: A, options: $crate::client::Options)
fn connect<A>(_addr: A, _opts: $crate::client::Options)
-> ::std::result::Result<Self, ::std::io::Error>
where A: ::std::net::ToSocketAddrs,
{
let addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&addr)?;
let client = <FutureClient as $crate::client::future::Connect>::connect(
addr, options);
let client = $crate::futures::Future::wait(client)?;
let client = SyncClient(client);
::std::result::Result::Ok(client)
let _addr = $crate::util::FirstSocketAddr::try_first_socket_addr(&_addr)?;
// Wrapped in a lazy future to ensure execution occurs when a task is present.
let _client = $crate::futures::Future::wait($crate::futures::future::lazy(move || {
<FutureClient as $crate::client::future::Connect>::connect(_addr, _opts)
}))?;
let _client = SyncClient(_client);
::std::result::Result::Ok(_client)
}
}
@@ -550,8 +555,10 @@ macro_rules! service {
pub fn $fn_name(&self, $($arg: $in_),*)
-> ::std::result::Result<$out, $crate::Error<$error>>
{
let rpc = (self.0).$fn_name($($arg),*);
$crate::futures::Future::wait(rpc)
// Wrapped in a lazy future to ensure execution occurs when a task is present.
$crate::futures::Future::wait($crate::futures::future::lazy(move || {
(self.0).$fn_name($($arg),*)
}))
}
)*
}
@@ -565,9 +572,10 @@ macro_rules! service {
#[allow(non_camel_case_types)]
/// Implementation detail: Pending connection.
pub struct __tarpc_service_ConnectFuture<T> {
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<__tarpc_service_Request,
__tarpc_service_Response,
__tarpc_service_Error>,
inner: $crate::futures::Map<$crate::client::future::ConnectFuture<
__tarpc_service_Request,
__tarpc_service_Response,
__tarpc_service_Error>,
fn(__tarpc_service_Client) -> T>,
}