mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-07 03:56:48 +01:00
Make future::Connect take a lifetime param so that connect_with doesn't have to clone Handle.
This commit is contained in:
@@ -5,10 +5,14 @@
|
||||
|
||||
use WireError;
|
||||
use bincode::serde::DeserializeError;
|
||||
use framed::Framed;
|
||||
use futures::{self, Async, Future};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use tokio_proto::easy::{EasyClient, EasyResponse};
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor;
|
||||
use tokio_proto::easy::{EasyClient, EasyResponse, multiplex};
|
||||
use tokio_service::Service;
|
||||
|
||||
/// A client `Service` that writes and reads bytes.
|
||||
@@ -43,6 +47,16 @@ impl<Req, Resp, E> Service for Client<Req, Resp, E>
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Client<Req, Resp, E> {
|
||||
fn new(tcp: TcpStream, handle: &reactor::Handle) -> Self
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
Client {
|
||||
inner: multiplex::connect(Framed::new(tcp), handle),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err(resp: WireResponse<Resp, E>) -> Result<Resp, ::Error<E>> {
|
||||
resp.map(|r| r.map_err(::Error::from))
|
||||
.map_err(::Error::ClientDeserialize)
|
||||
@@ -59,7 +73,6 @@ impl<Req, Resp, E> fmt::Debug for Client<Req, Resp, E> {
|
||||
/// Exposes a trait for connecting asynchronously to servers.
|
||||
pub mod future {
|
||||
use REMOTE;
|
||||
use framed::Framed;
|
||||
use futures::{self, Async, Future};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io;
|
||||
@@ -68,21 +81,21 @@ pub mod future {
|
||||
use super::Client;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::{self, reactor};
|
||||
use tokio_proto::easy::multiplex;
|
||||
|
||||
/// Types that can connect to a server asynchronously.
|
||||
pub trait Connect: Sized {
|
||||
pub trait Connect<'a>: Sized {
|
||||
/// The type of the future returned when calling `connect`.
|
||||
type ConnectFut: Future<Item = Self, Error = io::Error>;
|
||||
type ConnectFut: Future<Item = Self, Error = io::Error> + 'static;
|
||||
|
||||
/// The type of the future returned when calling `connect_with`.
|
||||
type ConnectWithFut: Future<Item = Self, Error = io::Error>;
|
||||
type ConnectWithFut: Future<Item = Self, Error = io::Error> + 'a;
|
||||
|
||||
/// Connects to a server located at the given address, using a remote to the default
|
||||
/// reactor.
|
||||
fn connect(addr: &SocketAddr) -> Self::ConnectFut;
|
||||
|
||||
/// Connects to a server located at the given address, using the given reactor handle.
|
||||
fn connect_with(addr: &SocketAddr, handle: &reactor::Handle) -> Self::ConnectWithFut;
|
||||
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut;
|
||||
}
|
||||
|
||||
/// A future that resolves to a `Client` or an `io::Error`.
|
||||
@@ -105,12 +118,12 @@ pub mod future {
|
||||
}
|
||||
|
||||
/// A future that resolves to a `Client` or an `io::Error`.
|
||||
pub struct ConnectWithFuture<Req, Resp, E> {
|
||||
pub struct ConnectWithFuture<'a, Req, Resp, E> {
|
||||
inner: futures::Map<tokio_core::net::TcpStreamNew,
|
||||
MultiplexConnect<Req, Resp, E>>,
|
||||
MultiplexConnect<'a, Req, Resp, E>>,
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Future for ConnectWithFuture<Req, Resp, E>
|
||||
impl<'a, Req, Resp, E> Future for ConnectWithFuture<'a, Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
@@ -126,15 +139,15 @@ pub mod future {
|
||||
}
|
||||
}
|
||||
|
||||
struct MultiplexConnect<Req, Resp, E>(reactor::Handle, PhantomData<(Req, Resp, E)>);
|
||||
struct MultiplexConnect<'a, Req, Resp, E>(&'a reactor::Handle, PhantomData<(Req, Resp, E)>);
|
||||
|
||||
impl<Req, Resp, E> MultiplexConnect<Req, Resp, E> {
|
||||
fn new(handle: reactor::Handle) -> Self {
|
||||
impl<'a, Req, Resp, E> MultiplexConnect<'a, Req, Resp, E> {
|
||||
fn new(handle: &'a reactor::Handle) -> Self {
|
||||
MultiplexConnect(handle, PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<Req, Resp, E>
|
||||
impl<'a, Req, Resp, E> FnOnce<(TcpStream,)> for MultiplexConnect<'a, Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
@@ -142,19 +155,17 @@ pub mod future {
|
||||
type Output = Client<Req, Resp, E>;
|
||||
|
||||
extern "rust-call" fn call_once(self, (tcp,): (TcpStream,)) -> Client<Req, Resp, E> {
|
||||
Client {
|
||||
inner: multiplex::connect(Framed::new(tcp), &self.0),
|
||||
}
|
||||
Client::new(tcp, self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Resp, E> Connect for Client<Req, Resp, E>
|
||||
impl<'a, Req, Resp, E> Connect<'a> for Client<Req, Resp, E>
|
||||
where Req: Serialize + Send + 'static,
|
||||
Resp: Deserialize + Send + 'static,
|
||||
E: Deserialize + Send + 'static
|
||||
{
|
||||
type ConnectFut = ConnectFuture<Req, Resp, E>;
|
||||
type ConnectWithFut = ConnectWithFuture<Req, Resp, E>;
|
||||
type ConnectWithFut = ConnectWithFuture<'a, Req, Resp, E>;
|
||||
|
||||
/// Starts an event loop on a thread and registers a new client
|
||||
/// connected to the given address.
|
||||
@@ -162,18 +173,20 @@ pub mod future {
|
||||
let addr = *addr;
|
||||
let (tx, rx) = futures::oneshot();
|
||||
REMOTE.spawn(move |handle| {
|
||||
Self::connect_with(&addr, handle).then(|client| {
|
||||
tx.complete(client);
|
||||
Ok(())
|
||||
})
|
||||
let handle2 = handle.clone();
|
||||
TcpStream::connect(&addr, handle)
|
||||
.map(move |tcp| Client::new(tcp, &handle2))
|
||||
.then(move |result| {
|
||||
tx.complete(result);
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
ConnectFuture { inner: rx }
|
||||
}
|
||||
|
||||
fn connect_with(addr: &SocketAddr, handle: &reactor::Handle) -> Self::ConnectWithFut
|
||||
{
|
||||
fn connect_with(addr: &SocketAddr, handle: &'a reactor::Handle) -> Self::ConnectWithFut {
|
||||
ConnectWithFuture {
|
||||
inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle.clone()))
|
||||
inner: TcpStream::connect(addr, handle).map(MultiplexConnect::new(handle))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -659,14 +659,15 @@ macro_rules! service {
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
/// Implementation detail: Pending connection.
|
||||
pub struct __tarpc_service_ConnectWithFuture<T> {
|
||||
inner: $crate::futures::Map<$crate::ConnectWithFuture<__tarpc_service_Request,
|
||||
pub struct __tarpc_service_ConnectWithFuture<'a, T> {
|
||||
inner: $crate::futures::Map<$crate::ConnectWithFuture<'a,
|
||||
__tarpc_service_Request,
|
||||
__tarpc_service_Response,
|
||||
__tarpc_service_Error>,
|
||||
fn(__tarpc_service_Client) -> T>,
|
||||
}
|
||||
|
||||
impl<T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<T> {
|
||||
impl<'a, T> $crate::futures::Future for __tarpc_service_ConnectWithFuture<'a, T> {
|
||||
type Item = T;
|
||||
type Error = ::std::io::Error;
|
||||
|
||||
@@ -680,9 +681,9 @@ macro_rules! service {
|
||||
/// The client stub that makes RPC calls to the server. Exposes a Future interface.
|
||||
pub struct FutureClient(__tarpc_service_Client);
|
||||
|
||||
impl $crate::future::Connect for FutureClient {
|
||||
impl<'a> $crate::future::Connect<'a> for FutureClient {
|
||||
type ConnectFut = __tarpc_service_ConnectFuture<Self>;
|
||||
type ConnectWithFut = __tarpc_service_ConnectWithFuture<Self>;
|
||||
type ConnectWithFut = __tarpc_service_ConnectWithFuture<'a, Self>;
|
||||
|
||||
fn connect(__tarpc_service_addr: &::std::net::SocketAddr) -> Self::ConnectFut {
|
||||
let client = <__tarpc_service_Client as $crate::future::Connect>::connect(
|
||||
@@ -694,7 +695,7 @@ macro_rules! service {
|
||||
}
|
||||
|
||||
fn connect_with(__tarpc_service_addr: &::std::net::SocketAddr,
|
||||
__tarpc_service_handle: &$crate::tokio_core::reactor::Handle)
|
||||
__tarpc_service_handle: &'a $crate::tokio_core::reactor::Handle)
|
||||
-> Self::ConnectWithFut
|
||||
{
|
||||
let client = <__tarpc_service_Client as $crate::future::Connect>::connect_with(
|
||||
|
||||
Reference in New Issue
Block a user