mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-07 03:56:48 +01:00
Remove Addr associated type of Dialer.
Also, make spawn() take a Dialer, but impl Dialer for str, defaulting to TCP transport.
This commit is contained in:
@@ -31,13 +31,13 @@
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let addr = "127.0.0.1:9000";
|
||||
//! let shutdown = Server.spawn(addr).unwrap();
|
||||
//! let client = Client::new(addr).unwrap();
|
||||
//! let serve_handle = Server.spawn(addr).unwrap();
|
||||
//! let client = Client::new(serve_handle.dialer()).unwrap();
|
||||
//! assert_eq!(3, client.add(1, 2).unwrap());
|
||||
//! assert_eq!("Hello, Mom!".to_string(),
|
||||
//! client.hello("Mom".to_string()).unwrap());
|
||||
//! drop(client);
|
||||
//! shutdown.shutdown();
|
||||
//! serve_handle.shutdown();
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
|
||||
@@ -315,22 +315,22 @@ macro_rules! service_inner {
|
||||
)*
|
||||
|
||||
#[doc="Spawn a running service."]
|
||||
fn spawn<A>(self, addr: A)
|
||||
-> $crate::Result<$crate::protocol::ServeHandle<$crate::transport::tcp::TcpDialer<::std::net::SocketAddr>>>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
fn spawn<T>(self, transport: T)
|
||||
-> $crate::Result<$crate::protocol::ServeHandle<<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||
where T: $crate::transport::Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
self.spawn_with_config($crate::transport::tcp::TcpTransport(addr), $crate::Config::default())
|
||||
self.spawn_with_config(transport, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[doc="Spawn a running service."]
|
||||
fn spawn_with_config<T>(self, addr: T, config: $crate::Config)
|
||||
fn spawn_with_config<T>(self, transport: T, config: $crate::Config)
|
||||
-> $crate::Result<$crate::protocol::ServeHandle<<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||
where T: $crate::transport::Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
let server = ::std::sync::Arc::new(__Server(self));
|
||||
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config));
|
||||
let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config));
|
||||
::std::result::Result::Ok(handle)
|
||||
}
|
||||
}
|
||||
@@ -389,20 +389,18 @@ macro_rules! service_inner {
|
||||
pub struct Client<S = ::std::net::TcpStream>($crate::protocol::Client<__Request, __Reply, S>)
|
||||
where S: $crate::transport::Stream;
|
||||
|
||||
impl Client<::std::net::TcpStream> {
|
||||
pub fn new<A>(addr: A) -> $crate::Result<Self>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
{
|
||||
Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Client<S>
|
||||
where S: $crate::transport::Stream
|
||||
{
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new client with default configuration that connects to the given \
|
||||
address."]
|
||||
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
Self::with_config(dialer, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new client with the specified configuration that connects to the \
|
||||
given address."]
|
||||
@@ -433,23 +431,21 @@ macro_rules! service_inner {
|
||||
pub struct AsyncClient<S = ::std::net::TcpStream>($crate::protocol::Client<__Request, __Reply, S>)
|
||||
where S: $crate::transport::Stream;
|
||||
|
||||
impl AsyncClient<::std::net::TcpStream> {
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new asynchronous client with default configuration that connects to \
|
||||
the given address."]
|
||||
pub fn new<A>(addr: A) -> $crate::Result<AsyncClient<::std::net::TcpStream>>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
{
|
||||
Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncClient<S>
|
||||
where S: $crate::transport::Stream {
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new asynchronous client with default configuration that connects to \
|
||||
the given address."]
|
||||
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
Self::with_config(dialer, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new asynchronous client that connects to the given address."]
|
||||
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
let inner = try!($crate::protocol::Client::with_config(dialer, config));
|
||||
::std::result::Result::Ok(AsyncClient(inner))
|
||||
@@ -546,7 +542,7 @@ mod functional_test {
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client = Client::new(handle.local_addr()).unwrap();
|
||||
let client = Client::new(handle.dialer()).unwrap();
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
|
||||
drop(client);
|
||||
@@ -557,7 +553,7 @@ mod functional_test {
|
||||
fn simple_async() {
|
||||
let _ = env_logger::init();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client = AsyncClient::new(handle.local_addr()).unwrap();
|
||||
let client = AsyncClient::new(handle.dialer()).unwrap();
|
||||
assert_eq!(3, client.add(1, 2).get().unwrap());
|
||||
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
|
||||
drop(client);
|
||||
@@ -567,7 +563,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn try_clone() {
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client1 = Client::new(handle.local_addr()).unwrap();
|
||||
let client1 = Client::new(handle.dialer()).unwrap();
|
||||
let client2 = client1.try_clone().unwrap();
|
||||
assert_eq!(3, client1.add(1, 2).unwrap());
|
||||
assert_eq!(3, client2.add(1, 2).unwrap());
|
||||
@@ -576,7 +572,7 @@ mod functional_test {
|
||||
#[test]
|
||||
fn async_try_clone() {
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client1 = AsyncClient::new(handle.local_addr()).unwrap();
|
||||
let client1 = AsyncClient::new(handle.dialer()).unwrap();
|
||||
let client2 = client1.try_clone().unwrap();
|
||||
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
||||
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
||||
@@ -584,8 +580,9 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn async_try_clone_unix() {
|
||||
let temp_dir = tempdir::TempDir::new(module_path!()).unwrap();
|
||||
let temp_file = temp_dir.path().join("async_try_clone_unix.tmp");
|
||||
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
|
||||
let temp_file = temp_dir.path()
|
||||
.join("async_try_clone_unix.tmp");
|
||||
let handle = Server.spawn_with_config(UnixTransport(temp_file),
|
||||
Config::default()).unwrap();
|
||||
let client1 = AsyncClient::with_config(handle.dialer(),
|
||||
@@ -601,6 +598,12 @@ mod functional_test {
|
||||
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
|
||||
}
|
||||
|
||||
// Tests that a tcp client can be created from &str
|
||||
#[allow(dead_code)]
|
||||
fn test_client_str() {
|
||||
let _ = Client::new("localhost:0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde() {
|
||||
use bincode;
|
||||
|
||||
@@ -8,14 +8,12 @@ use std::fmt;
|
||||
use std::io::{self, BufReader, BufWriter, Read};
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::mpsc::{Receiver, Sender, channel};
|
||||
use std::thread;
|
||||
|
||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||
use transport::{Dialer, Stream};
|
||||
use transport::tcp::TcpDialer;
|
||||
|
||||
/// A client stub that connects to a server to run rpcs.
|
||||
pub struct Client<Request, Reply, S>
|
||||
@@ -29,23 +27,19 @@ pub struct Client<Request, Reply, S>
|
||||
shutdown: S,
|
||||
}
|
||||
|
||||
|
||||
impl<Request, Reply> Client<Request, Reply, TcpStream>
|
||||
where Request: serde::ser::Serialize + Send + 'static,
|
||||
Reply: serde::de::Deserialize + Send + 'static
|
||||
{
|
||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||
/// for both reads and writes.
|
||||
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
||||
Self::with_config(TcpDialer(addr), Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Request, Reply, S> Client<Request, Reply, S>
|
||||
where Request: serde::ser::Serialize + Send + 'static,
|
||||
Reply: serde::de::Deserialize + Send + 'static,
|
||||
S: Stream,
|
||||
{
|
||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||
/// for both reads and writes.
|
||||
pub fn new<D>(dialer: D) -> io::Result<Self>
|
||||
where D: Dialer<Stream=S>,
|
||||
{
|
||||
Self::with_config(dialer, Config::default())
|
||||
}
|
||||
|
||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||
/// for both reads and writes.
|
||||
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
|
||||
|
||||
@@ -129,7 +129,7 @@ mod test {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.local_addr()).unwrap();
|
||||
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
|
||||
drop(client);
|
||||
serve_handle.shutdown();
|
||||
}
|
||||
@@ -139,9 +139,8 @@ mod test {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
|
||||
let client: Client<(), u64, _> = Client::new(addr).unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
assert_eq!(0, client.rpc(()).unwrap());
|
||||
assert_eq!(1, server.count());
|
||||
assert_eq!(1, client.rpc(()).unwrap());
|
||||
@@ -186,8 +185,7 @@ mod test {
|
||||
timeout: Some(Duration::new(0, 10)),
|
||||
})
|
||||
.unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64, _> = Client::new(addr).unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
let thread = thread::spawn(move || serve_handle.shutdown());
|
||||
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
|
||||
thread.join().unwrap();
|
||||
@@ -202,8 +200,7 @@ mod test {
|
||||
timeout: test_timeout(),
|
||||
})
|
||||
.unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(addr).unwrap());
|
||||
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
|
||||
client.rpc(()).unwrap();
|
||||
serve_handle.shutdown();
|
||||
match client.rpc(()) {
|
||||
@@ -220,8 +217,7 @@ mod test {
|
||||
let pool = Pool::new(concurrency);
|
||||
let server = Arc::new(BarrierServer::new(concurrency));
|
||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64, _> = Client::new(addr).unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
pool.scoped(|scope| {
|
||||
for _ in 0..concurrency {
|
||||
let client = client.try_clone().unwrap();
|
||||
@@ -240,8 +236,7 @@ mod test {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64, _> = Client::new(addr).unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
|
||||
// Drop future immediately; does the reader channel panic when sending?
|
||||
client.rpc_async(());
|
||||
|
||||
@@ -7,14 +7,13 @@ use serde;
|
||||
use scoped_pool::{Pool, Scope};
|
||||
use std::fmt;
|
||||
use std::io::{self, BufReader, BufWriter};
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::thread::{self, JoinHandle};
|
||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||
use transport::{Dialer, Listener, Stream, Transport};
|
||||
use transport::tcp::{TcpDialer, TcpTransport};
|
||||
use transport::tcp::TcpDialer;
|
||||
|
||||
struct ConnectionHandler<'a, S, St>
|
||||
where S: Serve,
|
||||
@@ -126,11 +125,6 @@ impl<D> ServeHandle<D>
|
||||
&self.dialer
|
||||
}
|
||||
|
||||
/// Returns the socket being listened on when the dialer is a `TcpDialer`.
|
||||
pub fn local_addr(&self) -> &D::Addr {
|
||||
self.dialer().addr()
|
||||
}
|
||||
|
||||
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
|
||||
/// gracefully close open connections.
|
||||
pub fn shutdown(self) {
|
||||
@@ -225,17 +219,19 @@ pub trait Serve: Send + Sync + Sized {
|
||||
fn serve(&self, request: Self::Request) -> Self::Reply;
|
||||
|
||||
/// spawn
|
||||
fn spawn<A: fmt::Debug>(self, addr: A) -> io::Result<ServeHandle<TcpDialer<SocketAddr>>>
|
||||
where A: ToSocketAddrs,
|
||||
fn spawn<T>(self, transport: T)
|
||||
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||
where T: Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
self.spawn_with_config(TcpTransport(addr), Config::default())
|
||||
self.spawn_with_config(transport, Config::default())
|
||||
}
|
||||
|
||||
/// spawn
|
||||
fn spawn_with_config<T: Transport>(self, transport: T, config: Config)
|
||||
fn spawn_with_config<T>(self, transport: T, config: Config)
|
||||
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||
where Self: 'static,
|
||||
where T: Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
let listener = try!(transport.bind());
|
||||
let dialer = try!(listener.dialer());
|
||||
|
||||
@@ -57,26 +57,18 @@ pub trait Stream: Read + Write + Send + Sized + 'static {
|
||||
pub trait Dialer {
|
||||
/// The type of `Stream` this can create.
|
||||
type Stream: Stream;
|
||||
/// The type of address being connected to.
|
||||
type Addr;
|
||||
/// Open a stream.
|
||||
fn dial(&self) -> io::Result<Self::Stream>;
|
||||
/// Return the address being dialed.
|
||||
fn addr(&self) -> &Self::Addr;
|
||||
}
|
||||
|
||||
impl<P, D> Dialer for P
|
||||
impl<P, D: ?Sized> Dialer for P
|
||||
where P: ::std::ops::Deref<Target=D>,
|
||||
D: Dialer + 'static
|
||||
{
|
||||
type Stream = D::Stream;
|
||||
type Addr = D::Addr;
|
||||
fn dial(&self) -> io::Result<Self::Stream> {
|
||||
(**self).dial()
|
||||
}
|
||||
fn addr(&self) -> &Self::Addr {
|
||||
(**self).addr()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over incoming connections.
|
||||
|
||||
@@ -11,6 +11,13 @@ impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: ToSocketAddrs> super::Transport for A {
|
||||
type Listener = TcpListener;
|
||||
fn bind(&self) -> io::Result<TcpListener> {
|
||||
TcpListener::bind(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Listener for TcpListener {
|
||||
type Dialer = TcpDialer<SocketAddr>;
|
||||
type Stream = TcpStream;
|
||||
@@ -40,13 +47,18 @@ impl super::Stream for TcpStream {
|
||||
/// Connects to a socket address.
|
||||
pub struct TcpDialer<A = SocketAddr>(pub A)
|
||||
where A: ToSocketAddrs;
|
||||
impl<A: ToSocketAddrs> super::Dialer for TcpDialer<A> {
|
||||
impl<A> super::Dialer for TcpDialer<A>
|
||||
where A: ToSocketAddrs
|
||||
{
|
||||
type Stream = TcpStream;
|
||||
type Addr = A;
|
||||
fn dial(&self) -> io::Result<TcpStream> {
|
||||
TcpStream::connect(&self.0)
|
||||
}
|
||||
fn addr(&self) -> &A {
|
||||
&self.0
|
||||
}
|
||||
impl super::Dialer for str
|
||||
{
|
||||
type Stream = TcpStream;
|
||||
fn dial(&self) -> io::Result<TcpStream> {
|
||||
TcpStream::connect(self)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,13 +24,9 @@ impl<P> super::Dialer for UnixDialer<P>
|
||||
where P: AsRef<Path>
|
||||
{
|
||||
type Stream = UnixStream;
|
||||
type Addr = P;
|
||||
fn dial(&self) -> io::Result<UnixStream> {
|
||||
UnixStream::connect(&self.0)
|
||||
}
|
||||
fn addr(&self) -> &P {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Listener for UnixListener {
|
||||
|
||||
Reference in New Issue
Block a user