diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 5edbd80..3169564 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -31,13 +31,13 @@ //! //! fn main() { //! let addr = "127.0.0.1:9000"; -//! let shutdown = Server.spawn(addr).unwrap(); -//! let client = Client::new(addr).unwrap(); +//! let serve_handle = Server.spawn(addr).unwrap(); +//! let client = Client::new(serve_handle.dialer()).unwrap(); //! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!("Hello, Mom!".to_string(), //! client.hello("Mom".to_string()).unwrap()); //! drop(client); -//! shutdown.shutdown(); +//! serve_handle.shutdown(); //! } //! ``` diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 11a9b45..1e28084 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -315,22 +315,22 @@ macro_rules! service_inner { )* #[doc="Spawn a running service."] - fn spawn(self, addr: A) - -> $crate::Result<$crate::protocol::ServeHandle<$crate::transport::tcp::TcpDialer<::std::net::SocketAddr>>> - where A: ::std::net::ToSocketAddrs, + fn spawn(self, transport: T) + -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + where T: $crate::transport::Transport, Self: 'static, { - self.spawn_with_config($crate::transport::tcp::TcpTransport(addr), $crate::Config::default()) + self.spawn_with_config(transport, $crate::Config::default()) } #[doc="Spawn a running service."] - fn spawn_with_config(self, addr: T, config: $crate::Config) + fn spawn_with_config(self, transport: T, config: $crate::Config) -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> where T: $crate::transport::Transport, Self: 'static, { let server = ::std::sync::Arc::new(__Server(self)); - let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config)); + let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config)); ::std::result::Result::Ok(handle) } } @@ -389,20 +389,18 @@ macro_rules! service_inner { pub struct Client($crate::protocol::Client<__Request, __Reply, S>) where S: $crate::transport::Stream; - impl Client<::std::net::TcpStream> { - pub fn new(addr: A) -> $crate::Result - where A: ::std::net::ToSocketAddrs, - { - Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) - } - } - impl Client where S: $crate::transport::Stream { #[allow(unused)] #[doc="Create a new client with default configuration that connects to the given \ address."] + pub fn new(dialer: D) -> $crate::Result + where D: $crate::transport::Dialer, + { + Self::with_config(dialer, $crate::Config::default()) + } + #[allow(unused)] #[doc="Create a new client with the specified configuration that connects to the \ given address."] @@ -433,23 +431,21 @@ macro_rules! service_inner { pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>) where S: $crate::transport::Stream; - impl AsyncClient<::std::net::TcpStream> { - #[allow(unused)] - #[doc="Create a new asynchronous client with default configuration that connects to \ - the given address."] - pub fn new(addr: A) -> $crate::Result> - where A: ::std::net::ToSocketAddrs, - { - Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) - } - } - impl AsyncClient where S: $crate::transport::Stream { + #[allow(unused)] + #[doc="Create a new asynchronous client with default configuration that connects to \ + the given address."] + pub fn new(dialer: D) -> $crate::Result + where D: $crate::transport::Dialer, + { + Self::with_config(dialer, $crate::Config::default()) + } + #[allow(unused)] #[doc="Create a new asynchronous client that connects to the given address."] pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::transport::Dialer + where D: $crate::transport::Dialer, { let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(AsyncClient(inner)) @@ -546,7 +542,7 @@ mod functional_test { fn simple() { let _ = env_logger::init(); let handle = Server.spawn("localhost:0").unwrap(); - let client = Client::new(handle.local_addr()).unwrap(); + let client = Client::new(handle.dialer()).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap()); drop(client); @@ -557,7 +553,7 @@ mod functional_test { fn simple_async() { let _ = env_logger::init(); let handle = Server.spawn("localhost:0").unwrap(); - let client = AsyncClient::new(handle.local_addr()).unwrap(); + let client = AsyncClient::new(handle.dialer()).unwrap(); assert_eq!(3, client.add(1, 2).get().unwrap()); assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap()); drop(client); @@ -567,7 +563,7 @@ mod functional_test { #[test] fn try_clone() { let handle = Server.spawn("localhost:0").unwrap(); - let client1 = Client::new(handle.local_addr()).unwrap(); + let client1 = Client::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).unwrap()); assert_eq!(3, client2.add(1, 2).unwrap()); @@ -576,7 +572,7 @@ mod functional_test { #[test] fn async_try_clone() { let handle = Server.spawn("localhost:0").unwrap(); - let client1 = AsyncClient::new(handle.local_addr()).unwrap(); + let client1 = AsyncClient::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap()); @@ -584,8 +580,9 @@ mod functional_test { #[test] fn async_try_clone_unix() { - let temp_dir = tempdir::TempDir::new(module_path!()).unwrap(); - let temp_file = temp_dir.path().join("async_try_clone_unix.tmp"); + let temp_dir = tempdir::TempDir::new("tarpc").unwrap(); + let temp_file = temp_dir.path() + .join("async_try_clone_unix.tmp"); let handle = Server.spawn_with_config(UnixTransport(temp_file), Config::default()).unwrap(); let client1 = AsyncClient::with_config(handle.dialer(), @@ -601,6 +598,12 @@ mod functional_test { let _ = ::std::sync::Arc::new(Server).spawn("localhost:0"); } + // Tests that a tcp client can be created from &str + #[allow(dead_code)] + fn test_client_str() { + let _ = Client::new("localhost:0"); + } + #[test] fn serde() { use bincode; diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index 5f86e61..b0bea70 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -8,14 +8,12 @@ use std::fmt; use std::io::{self, BufReader, BufWriter, Read}; use std::collections::HashMap; use std::mem; -use std::net::{TcpStream, ToSocketAddrs}; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use transport::{Dialer, Stream}; -use transport::tcp::TcpDialer; /// A client stub that connects to a server to run rpcs. pub struct Client @@ -29,23 +27,19 @@ pub struct Client shutdown: S, } - -impl Client - where Request: serde::ser::Serialize + Send + 'static, - Reply: serde::de::Deserialize + Send + 'static -{ - /// Create a new client that connects to `addr`. The client uses the given timeout - /// for both reads and writes. - pub fn new(addr: A) -> io::Result { - Self::with_config(TcpDialer(addr), Config::default()) - } -} - impl Client where Request: serde::ser::Serialize + Send + 'static, Reply: serde::de::Deserialize + Send + 'static, S: Stream, { + /// Create a new client that connects to `addr`. The client uses the given timeout + /// for both reads and writes. + pub fn new(dialer: D) -> io::Result + where D: Dialer, + { + Self::with_config(dialer, Config::default()) + } + /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. pub fn with_config(dialer: D, config: Config) -> io::Result diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 7158243..f995ace 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -129,7 +129,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); - let client: Client<(), u64, TcpStream> = Client::new(serve_handle.local_addr()).unwrap(); + let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap(); drop(client); serve_handle.shutdown(); } @@ -139,9 +139,8 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.clone().spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); // The explicit type is required so that it doesn't deserialize a u32 instead of u64 - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); assert_eq!(0, client.rpc(()).unwrap()); assert_eq!(1, server.count()); assert_eq!(1, client.rpc(()).unwrap()); @@ -186,8 +185,7 @@ mod test { timeout: Some(Duration::new(0, 10)), }) .unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(())); thread.join().unwrap(); @@ -202,8 +200,7 @@ mod test { timeout: test_timeout(), }) .unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr).unwrap()); + let client: Arc> = Arc::new(Client::new(serve_handle.dialer()).unwrap()); client.rpc(()).unwrap(); serve_handle.shutdown(); match client.rpc(()) { @@ -220,8 +217,7 @@ mod test { let pool = Pool::new(concurrency); let server = Arc::new(BarrierServer::new(concurrency)); let serve_handle = server.clone().spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); pool.scoped(|scope| { for _ in 0..concurrency { let client = client.try_clone().unwrap(); @@ -240,8 +236,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); // Drop future immediately; does the reader channel panic when sending? client.rpc_async(()); diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 3c8634b..e7251ce 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -7,14 +7,13 @@ use serde; use scoped_pool::{Pool, Scope}; use std::fmt; use std::io::{self, BufReader, BufWriter}; -use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use transport::{Dialer, Listener, Stream, Transport}; -use transport::tcp::{TcpDialer, TcpTransport}; +use transport::tcp::TcpDialer; struct ConnectionHandler<'a, S, St> where S: Serve, @@ -126,11 +125,6 @@ impl ServeHandle &self.dialer } - /// Returns the socket being listened on when the dialer is a `TcpDialer`. - pub fn local_addr(&self) -> &D::Addr { - self.dialer().addr() - } - /// Shutdown the server. Gracefully shuts down the serve thread but currently does not /// gracefully close open connections. pub fn shutdown(self) { @@ -225,17 +219,19 @@ pub trait Serve: Send + Sync + Sized { fn serve(&self, request: Self::Request) -> Self::Reply; /// spawn - fn spawn(self, addr: A) -> io::Result>> - where A: ToSocketAddrs, + fn spawn(self, transport: T) + -> io::Result::Dialer>> + where T: Transport, Self: 'static, { - self.spawn_with_config(TcpTransport(addr), Config::default()) + self.spawn_with_config(transport, Config::default()) } /// spawn - fn spawn_with_config(self, transport: T, config: Config) + fn spawn_with_config(self, transport: T, config: Config) -> io::Result::Dialer>> - where Self: 'static, + where T: Transport, + Self: 'static, { let listener = try!(transport.bind()); let dialer = try!(listener.dialer()); diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index 867c8b2..fdb83ef 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -57,26 +57,18 @@ pub trait Stream: Read + Write + Send + Sized + 'static { pub trait Dialer { /// The type of `Stream` this can create. type Stream: Stream; - /// The type of address being connected to. - type Addr; /// Open a stream. fn dial(&self) -> io::Result; - /// Return the address being dialed. - fn addr(&self) -> &Self::Addr; } -impl Dialer for P +impl Dialer for P where P: ::std::ops::Deref, D: Dialer + 'static { type Stream = D::Stream; - type Addr = D::Addr; fn dial(&self) -> io::Result { (**self).dial() } - fn addr(&self) -> &Self::Addr { - (**self).addr() - } } /// Iterates over incoming connections. diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index f147625..a5b78e7 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -11,6 +11,13 @@ impl super::Transport for TcpTransport { } } +impl super::Transport for A { + type Listener = TcpListener; + fn bind(&self) -> io::Result { + TcpListener::bind(self) + } +} + impl super::Listener for TcpListener { type Dialer = TcpDialer; type Stream = TcpStream; @@ -40,13 +47,18 @@ impl super::Stream for TcpStream { /// Connects to a socket address. pub struct TcpDialer(pub A) where A: ToSocketAddrs; -impl super::Dialer for TcpDialer { +impl super::Dialer for TcpDialer + where A: ToSocketAddrs +{ type Stream = TcpStream; - type Addr = A; fn dial(&self) -> io::Result { TcpStream::connect(&self.0) } - fn addr(&self) -> &A { - &self.0 +} +impl super::Dialer for str +{ + type Stream = TcpStream; + fn dial(&self) -> io::Result { + TcpStream::connect(self) } } diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index d6c9c89..8571ee5 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -24,13 +24,9 @@ impl

super::Dialer for UnixDialer

where P: AsRef { type Stream = UnixStream; - type Addr = P; fn dial(&self) -> io::Result { UnixStream::connect(&self.0) } - fn addr(&self) -> &P { - &self.0 - } } impl super::Listener for UnixListener {