This commit is contained in:
Tim Kuehn
2016-02-25 00:04:35 -08:00
parent 9827f75459
commit 6273ebefa7
7 changed files with 50 additions and 51 deletions

View File

@@ -261,7 +261,7 @@ macro_rules! service {
#[doc(hidden)]
#[macro_export]
macro_rules! service_inner {
// Pattern for when the next rpc has an implicit unit return type
// Pattern for when the next rpc has an implicit unit return type
(
{
$(#[$attr:meta])*
@@ -280,7 +280,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
}
};
// Pattern for when the next rpc has an explicit return type
// Pattern for when the next rpc has an explicit return type
(
{
$(#[$attr:meta])*
@@ -299,7 +299,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
}
};
// Pattern when all return types have been expanded
// Pattern when all return types have been expanded
(
{ } // none left to expand
$(
@@ -315,8 +315,11 @@ macro_rules! service_inner {
)*
#[doc="Spawn a running service."]
fn spawn<T>(self, transport: T)
-> $crate::Result<$crate::protocol::ServeHandle<<T::Listener as $crate::transport::Listener>::Dialer>>
fn spawn<T>(self,
transport: T)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
@@ -324,13 +327,18 @@ macro_rules! service_inner {
}
#[doc="Spawn a running service."]
fn spawn_with_config<T>(self, transport: T, config: $crate::Config)
-> $crate::Result<$crate::protocol::ServeHandle<<T::Listener as $crate::transport::Listener>::Dialer>>
fn spawn_with_config<T>(self,
transport: T,
config: $crate::Config)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
let server = __Server(self);
let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config));
let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
let handle = try!(result);
::std::result::Result::Ok(handle)
}
}
@@ -386,8 +394,9 @@ macro_rules! service_inner {
#[allow(unused)]
#[doc="The client stub that makes RPC calls to the server."]
pub struct Client<S = ::std::net::TcpStream>($crate::protocol::Client<__Request, __Reply, S>)
where S: $crate::transport::Stream;
pub struct Client<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl<S> Client<S>
where S: $crate::transport::Stream
@@ -428,8 +437,9 @@ macro_rules! service_inner {
#[allow(unused)]
#[doc="The client stub that makes asynchronous RPC calls to the server."]
pub struct AsyncClient<S = ::std::net::TcpStream>($crate::protocol::Client<__Request, __Reply, S>)
where S: $crate::transport::Stream;
pub struct AsyncClient<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl<S> AsyncClient<S>
where S: $crate::transport::Stream {
@@ -583,10 +593,8 @@ mod functional_test {
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
let temp_file = temp_dir.path()
.join("async_try_clone_unix.tmp");
let handle = Server.spawn_with_config(UnixTransport(temp_file),
Config::default()).unwrap();
let client1 = AsyncClient::with_config(handle.dialer(),
Config::default()).unwrap();
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());

View File

@@ -18,7 +18,7 @@ use transport::{Dialer, Stream};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream,
S: Stream
{
// The guard is in an option so it can be joined in the drop fn
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
@@ -30,12 +30,12 @@ pub struct Client<Request, Reply, S>
impl<Request, Reply, S> Client<Request, Reply, S>
where Request: serde::ser::Serialize + Send + 'static,
Reply: serde::de::Deserialize + Send + 'static,
S: Stream,
S: Stream
{
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<D>(dialer: D) -> io::Result<Self>
where D: Dialer<Stream=S>,
where D: Dialer<Stream = S>
{
Self::with_config(dialer, Config::default())
}
@@ -43,7 +43,7 @@ impl<Request, Reply, S> Client<Request, Reply, S>
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
where D: Dialer<Stream=S>,
where D: Dialer<Stream = S>
{
let stream = try!(dialer.dial());
try!(stream.set_read_timeout(config.timeout));
@@ -105,7 +105,7 @@ impl<Request, Reply, S> Client<Request, Reply, S>
impl<Request, Reply, S> Drop for Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream,
S: Stream
{
fn drop(&mut self) {
debug!("Dropping Client.");
@@ -193,11 +193,11 @@ impl<Reply> RpcFutures<Reply> {
}
fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: S)
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: S)
where Request: serde::Serialize,
Reply: serde::Deserialize,
S: Stream,
S: Stream
{
let mut next_id = 0;
let mut stream = BufWriter::new(stream);
@@ -248,7 +248,7 @@ fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>
fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
where Reply: serde::Deserialize,
S: Stream,
S: Stream
{
let mut stream = BufReader::new(stream);
loop {

View File

@@ -181,9 +181,7 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"),
Config {
timeout: Some(Duration::new(0, 10)),
})
Config { timeout: Some(Duration::new(0, 10)) })
.unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown());
@@ -196,9 +194,7 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"),
Config {
timeout: test_timeout(),
})
Config { timeout: test_timeout() })
.unwrap();
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
client.rpc(()).unwrap();

View File

@@ -17,7 +17,7 @@ use transport::tcp::TcpDialer;
struct ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream,
St: Stream
{
read_stream: BufReader<St>,
write_stream: BufWriter<St>,
@@ -27,7 +27,7 @@ struct ConnectionHandler<'a, S, St>
impl<'a, S, St> ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream,
St: Stream
{
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
let ConnectionHandler {
@@ -219,19 +219,20 @@ pub trait Serve: Send + Sync + Sized {
fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn
fn spawn<T>(self, transport: T)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static,
Self: 'static
{
self.spawn_with_config(transport, Config::default())
}
/// spawn
fn spawn_with_config<T>(self, transport: T, config: Config)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
fn spawn_with_config<T>(self,
transport: T,
config: Config)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static,
Self: 'static
{
let listener = try!(transport.bind());
let dialer = try!(listener.dialer());

View File

@@ -21,9 +21,7 @@ pub trait Listener: Send + 'static {
fn dialer(&self) -> io::Result<Self::Dialer>;
/// Iterate over incoming connections.
fn incoming(&self) -> Incoming<Self> {
Incoming {
listener: self,
}
Incoming { listener: self }
}
}
@@ -62,7 +60,7 @@ pub trait Dialer {
}
impl<P, D: ?Sized> Dialer for P
where P: ::std::ops::Deref<Target=D>,
where P: ::std::ops::Deref<Target = D>,
D: Dialer + 'static
{
type Stream = D::Stream;

View File

@@ -45,8 +45,7 @@ impl super::Stream for TcpStream {
}
/// Connects to a socket address.
pub struct TcpDialer<A = SocketAddr>(pub A)
where A: ToSocketAddrs;
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
impl<A> super::Dialer for TcpDialer<A>
where A: ToSocketAddrs
{
@@ -55,8 +54,7 @@ impl<A> super::Dialer for TcpDialer<A>
TcpStream::connect(&self.0)
}
}
impl super::Dialer for str
{
impl super::Dialer for str {
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(self)

View File

@@ -4,8 +4,7 @@ use std::time::Duration;
use unix_socket::{UnixListener, UnixStream};
/// A transport for unix sockets.
pub struct UnixTransport<P>(pub P)
where P: AsRef<Path>;
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
impl<P> super::Transport for UnixTransport<P>
where P: AsRef<Path>
@@ -17,8 +16,7 @@ impl<P> super::Transport for UnixTransport<P>
}
/// Connects to a unix socket address.
pub struct UnixDialer<P>(pub P)
where P: AsRef<Path>;
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
impl<P> super::Dialer for UnixDialer<P>
where P: AsRef<Path>