From fcdb0d9375c4b3002f2e674037135753b9d101d2 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 22 Feb 2016 23:50:34 -0800 Subject: [PATCH 01/23] Add support for arbitrary transports. Quite a bit of machinery added: * Listener trait * Dialer trait * Stream trait * Transport trait --- tarpc/src/lib.rs | 3 +- tarpc/src/macros.rs | 48 ++++++------ tarpc/src/protocol/client.rs | 34 +++++---- tarpc/src/protocol/mod.rs | 139 ++++++++++++++++++++++++++++++++--- tarpc/src/protocol/server.rs | 73 ++++++++++-------- 5 files changed, 221 insertions(+), 76 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index efb2684..5fce4e8 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -60,4 +60,5 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -pub use protocol::{Config, Error, Result, ServeHandle}; +pub use protocol::{Config, Dialer, Error, Listener, Result, ServeHandle, Stream, TcpDialer, + TcpDialerExt, TcpTransport, Transport}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 02fac66..f068736 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -315,17 +315,18 @@ macro_rules! service_inner { )* #[doc="Spawn a running service."] - fn spawn(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle> + fn spawn(self, addr: A) + -> $crate::Result<$crate::protocol::ServeHandle<$crate::TcpDialer<::std::net::SocketAddr>>> where A: ::std::net::ToSocketAddrs, Self: 'static, { - self.spawn_with_config(addr, $crate::Config::default()) + self.spawn_with_config($crate::TcpTransport(addr), $crate::Config::default()) } #[doc="Spawn a running service."] - fn spawn_with_config(self, addr: A, config: $crate::Config) - -> $crate::Result<$crate::protocol::ServeHandle> - where A: ::std::net::ToSocketAddrs, + fn spawn_with_config(self, addr: T, config: $crate::Config) + -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + where T: $crate::Transport, Self: 'static, { let server = ::std::sync::Arc::new(__Server(self)); @@ -385,25 +386,27 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes RPC calls to the server."] - pub struct Client($crate::protocol::Client<__Request, __Reply>); + pub struct Client($crate::protocol::Client<__Request, __Reply, S>); - impl Client { - #[allow(unused)] - #[doc="Create a new client with default configuration that connects to the given \ - address."] + impl Client<::std::net::TcpStream> { pub fn new(addr: A) -> $crate::Result where A: ::std::net::ToSocketAddrs, { - Self::with_config(addr, $crate::Config::default()) + Self::with_config($crate::TcpDialer(addr), $crate::Config::default()) } + } + impl Client { + #[allow(unused)] + #[doc="Create a new client with default configuration that connects to the given \ + address."] #[allow(unused)] #[doc="Create a new client with the specified configuration that connects to the \ given address."] - pub fn with_config(addr: A, config: $crate::Config) -> $crate::Result - where A: ::std::net::ToSocketAddrs, + pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result + where D: $crate::Dialer, { - let inner = try!($crate::protocol::Client::with_config(addr, config)); + let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(Client(inner)) } @@ -424,25 +427,26 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes asynchronous RPC calls to the server."] - pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>); + pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>); - impl AsyncClient { + impl AsyncClient<::std::net::TcpStream> { #[allow(unused)] #[doc="Create a new asynchronous client with default configuration that connects to \ the given address."] - pub fn new(addr: A) -> $crate::Result + pub fn new(addr: A) -> $crate::Result> where A: ::std::net::ToSocketAddrs, { - Self::with_config(addr, $crate::Config::default()) + Self::with_config($crate::TcpDialer(addr), $crate::Config::default()) } + } + impl AsyncClient { #[allow(unused)] #[doc="Create a new asynchronous client that connects to the given address."] - pub fn with_config(addr: A, config: $crate::Config) - -> $crate::Result - where A: ::std::net::ToSocketAddrs, + pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result + where D: $crate::Dialer { - let inner = try!($crate::protocol::Client::with_config(addr, config)); + let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(AsyncClient(inner)) } diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index edb0d36..38701cb 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -13,33 +13,41 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; -use super::{Config, Deserialize, Error, Packet, Result, Serialize}; +use super::{Config, Deserialize, Dialer, Error, Packet, Result, Serialize, Stream, TcpDialer}; /// A client stub that connects to a server to run rpcs. -pub struct Client +pub struct Client where Request: serde::ser::Serialize { // The guard is in an option so it can be joined in the drop fn reader_guard: Arc>>, outbound: Sender<(Request, Sender>)>, requests: Arc>>, - shutdown: TcpStream, + shutdown: S, } -impl Client + +impl Client where Request: serde::ser::Serialize + Send + 'static, Reply: serde::de::Deserialize + Send + 'static { /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. pub fn new(addr: A) -> io::Result { - Self::with_config(addr, Config::default()) + Self::with_config(TcpDialer(addr), Config::default()) } +} +impl Client + where Request: serde::ser::Serialize + Send + 'static, + Reply: serde::de::Deserialize + Send + 'static +{ /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. - pub fn with_config(addr: A, config: Config) -> io::Result { - let stream = try!(TcpStream::connect(addr)); + pub fn with_config(dialer: D, config: Config) -> io::Result + where D: Dialer, + { + let stream = try!(dialer.dial()); try!(stream.set_read_timeout(config.timeout)); try!(stream.set_write_timeout(config.timeout)); let reader_stream = try!(stream.try_clone()); @@ -59,7 +67,7 @@ impl Client } /// Clones the Client so that it can be shared across threads. - pub fn try_clone(&self) -> io::Result> { + pub fn try_clone(&self) -> io::Result { Ok(Client { reader_guard: self.reader_guard.clone(), outbound: self.outbound.clone(), @@ -97,14 +105,14 @@ impl Client } } -impl Drop for Client +impl Drop for Client where Request: serde::ser::Serialize { fn drop(&mut self) { debug!("Dropping Client."); if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) { debug!("Attempting to shut down writer and reader threads."); - if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) { + if let Err(e) = self.shutdown.shutdown() { warn!("Client: couldn't shutdown writer and reader threads: {:?}", e); } else { @@ -185,9 +193,9 @@ impl RpcFutures { } } -fn write(outbound: Receiver<(Request, Sender>)>, +fn write(outbound: Receiver<(Request, Sender>)>, requests: Arc>>, - stream: TcpStream) + stream: S) where Request: serde::Serialize, Reply: serde::Deserialize { @@ -238,7 +246,7 @@ fn write(outbound: Receiver<(Request, Sender>)>, } -fn read(requests: Arc>>, stream: TcpStream) +fn read(requests: Arc>>, stream: S) where Reply: serde::Deserialize { let mut stream = BufReader::new(stream); diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 3d9cd44..77bcd67 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -8,6 +8,7 @@ use bincode::serde::{deserialize_from, serialize_into}; use serde; use std::io::{self, Read, Write}; use std::convert; +use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::sync::Arc; use std::time::Duration; @@ -62,6 +63,123 @@ pub struct Config { pub timeout: Option, } +/// A factory for creating a listener on a given address. +pub trait Transport { + /// The type of listener that binds to the given address. + type Listener: Listener; + /// Return a listener on the given address, and a dialer to that address. + fn bind(&self) -> io::Result; +} + +/// A transport for TCP. +pub struct TcpTransport(pub A); +impl Transport for TcpTransport { + type Listener = TcpListener; + fn bind(&self) -> io::Result { + TcpListener::bind(&self.0) + } +} + +/// Accepts incoming connections from dialers. +pub trait Listener: Send + 'static { + /// The type of address being listened on. + type Dialer: Dialer; + /// The type of stream this listener accepts. + type Stream: Stream; + /// Accept an incoming stream. + fn accept(&self) -> io::Result; + /// Returns the local address being listened on. + fn dialer(&self) -> io::Result; + /// Iterate over incoming connections. + fn incoming(&self) -> Incoming { + Incoming { + listener: self, + } + } +} + +impl Listener for TcpListener { + type Dialer = TcpDialer; + type Stream = TcpStream; + fn accept(&self) -> io::Result { + self.accept().map(|(stream, _)| stream) + } + fn dialer(&self) -> io::Result> { + self.local_addr().map(|addr| TcpDialer(addr)) + } +} + +/// A cloneable Reader/Writer. +pub trait Stream: Read + Write + Send + Sized + 'static { + /// Clone that can fail. + fn try_clone(&self) -> io::Result; + /// Sets a read timeout. + fn set_read_timeout(&self, dur: Option) -> io::Result<()>; + /// Sets a write timeout. + fn set_write_timeout(&self, dur: Option) -> io::Result<()>; + /// Shuts down both ends of the stream. + fn shutdown(&self) -> io::Result<()>; +} + +impl Stream for TcpStream { + fn try_clone(&self) -> io::Result { + self.try_clone() + } + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.set_read_timeout(dur) + } + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.set_write_timeout(dur) + } + fn shutdown(&self) -> io::Result<()> { + self.shutdown(::std::net::Shutdown::Both) + } +} + +/// A `Stream` factory. +pub trait Dialer { + /// The type of `Stream` this can create. + type Stream: Stream; + /// Open a stream. + fn dial(&self) -> io::Result; +} + +/// Allows retrieving the address when the Dialer is known to be a TcpDialer. +pub trait TcpDialerExt { + /// Type of the address. + type Addr: ToSocketAddrs; + /// Return the address the Dialer connects to. + fn addr(&self) -> &Self::Addr; +} + +/// Connects to a socket address. +pub struct TcpDialer(pub A); +impl Dialer for TcpDialer { + type Stream = TcpStream; + fn dial(&self) -> io::Result { + TcpStream::connect(&self.0) + } +} +impl TcpDialerExt for TcpDialer { + type Addr = A; + fn addr(&self) -> &A { + &self.0 + } +} + +/// Iterates over incoming connections. +pub struct Incoming<'a, L: Listener + ?Sized + 'a> { + listener: &'a L, +} + +impl<'a, L: Listener> Iterator for Incoming<'a, L> { + type Item = io::Result; + + fn next(&mut self) -> Option { + Some(self.listener.accept()) + } +} + /// Return type of rpc calls: either the successful return value, or a client error. pub type Result = ::std::result::Result; @@ -86,8 +204,9 @@ impl Serialize for W {} #[cfg(test)] mod test { extern crate env_logger; - use super::{Client, Config, Serve}; + use super::{Client, Config, Serve, TcpTransport}; use scoped_pool::Pool; + use std::net::TcpStream; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::time::Duration; @@ -127,7 +246,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); - let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap(); + let client: Client<(), u64, TcpStream> = Client::new(serve_handle.local_addr()).unwrap(); drop(client); serve_handle.shutdown(); } @@ -139,7 +258,7 @@ mod test { let serve_handle = server.clone().spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); // The explicit type is required so that it doesn't deserialize a u32 instead of u64 - let client: Client<(), u64> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(addr).unwrap(); assert_eq!(0, client.rpc(()).unwrap()); assert_eq!(1, server.count()); assert_eq!(1, client.rpc(()).unwrap()); @@ -179,13 +298,13 @@ mod test { fn force_shutdown() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = server.spawn_with_config("localhost:0", + let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"), Config { - timeout: Some(Duration::new(0, 10)) + timeout: Some(Duration::new(0, 10)), }) .unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(addr).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(())); thread.join().unwrap(); @@ -195,13 +314,13 @@ mod test { fn client_failed_rpc() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = server.spawn_with_config("localhost:0", + let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"), Config { timeout: test_timeout(), }) .unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr).unwrap()); + let client: Arc> = Arc::new(Client::new(addr).unwrap()); client.rpc(()).unwrap(); serve_handle.shutdown(); match client.rpc(()) { @@ -219,7 +338,7 @@ mod test { let server = Arc::new(BarrierServer::new(concurrency)); let serve_handle = server.clone().spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(addr).unwrap(); pool.scoped(|scope| { for _ in 0..concurrency { let client = client.try_clone().unwrap(); @@ -239,7 +358,7 @@ mod test { let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(addr).unwrap(); // Drop future immediately; does the reader channel panic when sending? client.rpc_async(()); diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 0631712..87b866a 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -7,24 +7,27 @@ use serde; use scoped_pool::{Pool, Scope}; use std::fmt; use std::io::{self, BufReader, BufWriter}; -use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; +use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; -use super::{Config, Deserialize, Error, Packet, Result, Serialize}; +use super::{Config, Deserialize, Dialer, Error, Listener, Packet, Result, Serialize, Stream, + TcpDialer, TcpDialerExt, TcpTransport, Transport}; -struct ConnectionHandler<'a, S> - where S: Serve +struct ConnectionHandler<'a, S, St> + where S: Serve, + St: Stream, { - read_stream: BufReader, - write_stream: BufWriter, + read_stream: BufReader, + write_stream: BufWriter, server: S, shutdown: &'a AtomicBool, } -impl<'a, S> ConnectionHandler<'a, S> - where S: Serve +impl<'a, S, St> ConnectionHandler<'a, S, St> + where S: Serve, + St: Stream, { fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> { let ConnectionHandler { @@ -83,7 +86,7 @@ impl<'a, S> ConnectionHandler<'a, S> } } - fn write(rx: Receiver::Reply>>, stream: &mut BufWriter) { + fn write(rx: Receiver::Reply>>, stream: &mut BufWriter) { loop { match rx.recv() { Err(e) => { @@ -101,21 +104,30 @@ impl<'a, S> ConnectionHandler<'a, S> } /// Provides methods for blocking until the server completes, -pub struct ServeHandle { +pub struct ServeHandle + where D: Dialer +{ tx: Sender<()>, join_handle: JoinHandle<()>, - addr: SocketAddr, + dialer: D, } -impl ServeHandle { +impl ServeHandle + where D: Dialer +{ /// Block until the server completes pub fn wait(self) { self.join_handle.join().expect(pos!()); } - /// Returns the address the server is bound to - pub fn local_addr(&self) -> &SocketAddr { - &self.addr + /// Returns the dialer to the server. + pub fn dialer(&self) -> &D { + &self.dialer + } + + /// Returns the socket being listened on when the dialer is a `TcpDialer`. + pub fn local_addr(&self) -> &D::Addr where D: TcpDialerExt { + self.dialer().addr() } /// Shutdown the server. Gracefully shuts down the serve thread but currently does not @@ -123,7 +135,7 @@ impl ServeHandle { pub fn shutdown(self) { info!("ServeHandle: attempting to shut down the server."); self.tx.send(()).expect(pos!()); - if let Ok(_) = TcpStream::connect(self.addr) { + if let Ok(_) = self.dialer.dial() { self.join_handle.join().expect(pos!()); } else { warn!("ServeHandle: best effort shutdown of serve thread failed"); @@ -131,15 +143,15 @@ impl ServeHandle { } } -struct Server<'a, S: 'a> { +struct Server<'a, S: 'a, L: Listener> { server: &'a S, - listener: TcpListener, + listener: L, read_timeout: Option, die_rx: Receiver<()>, shutdown: &'a AtomicBool, } -impl<'a, S: 'a> Server<'a, S> +impl<'a, S: 'a, L: Listener> Server<'a, S, L> where S: Serve + 'static { fn serve<'b>(self, scope: &Scope<'b>) @@ -194,7 +206,7 @@ impl<'a, S: 'a> Server<'a, S> } } -impl<'a, S> Drop for Server<'a, S> { +impl<'a, S, L: Listener> Drop for Server<'a, S, L> { fn drop(&mut self) { debug!("Shutting down connection handlers."); self.shutdown.store(true, Ordering::SeqCst); @@ -212,29 +224,30 @@ pub trait Serve: Send + Sync + Sized { fn serve(&self, request: Self::Request) -> Self::Reply; /// spawn - fn spawn(self, addr: A) -> io::Result + fn spawn(self, addr: A) -> io::Result>> where A: ToSocketAddrs, Self: 'static, { - self.spawn_with_config(addr, Config::default()) + self.spawn_with_config(TcpTransport(addr), Config::default()) } /// spawn - fn spawn_with_config(self, addr: A, config: Config) -> io::Result - where A: ToSocketAddrs, - Self: 'static, + fn spawn_with_config(self, transport: T, config: Config) + -> io::Result::Dialer>> + where Self: 'static, { - let listener = try!(TcpListener::bind(&addr)); - let addr = try!(listener.local_addr()); - info!("spawn_with_config: spinning up server on {:?}", addr); + let listener = try!(transport.bind()); + let dialer = try!(listener.dialer()); + info!("spawn_with_config: spinning up server."); let (die_tx, die_rx) = channel(); + let timeout = config.timeout; let join_handle = thread::spawn(move || { let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads let shutdown = AtomicBool::new(false); let server = Server { server: &self, listener: listener, - read_timeout: config.timeout, + read_timeout: timeout, die_rx: die_rx, shutdown: &shutdown, }; @@ -245,7 +258,7 @@ pub trait Serve: Send + Sync + Sized { Ok(ServeHandle { tx: die_tx, join_handle: join_handle, - addr: addr.clone(), + dialer: dialer, }) } From a0e61474829ddfa6ffe71dc1b91c9f454352f23a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 23 Feb 2016 00:07:03 -0800 Subject: [PATCH 02/23] Make Tcp* default types --- tarpc/src/macros.rs | 16 +++++++++++----- tarpc/src/protocol/mod.rs | 3 ++- tarpc/src/protocol/server.rs | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index f068736..e310918 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -386,7 +386,8 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes RPC calls to the server."] - pub struct Client($crate::protocol::Client<__Request, __Reply, S>); + pub struct Client($crate::protocol::Client<__Request, __Reply, S>) + where S: $crate::Stream; impl Client<::std::net::TcpStream> { pub fn new(addr: A) -> $crate::Result @@ -396,7 +397,9 @@ macro_rules! service_inner { } } - impl Client { + impl Client + where S: $crate::Stream + { #[allow(unused)] #[doc="Create a new client with default configuration that connects to the given \ address."] @@ -427,7 +430,8 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes asynchronous RPC calls to the server."] - pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>); + pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>) + where S: $crate::Stream; impl AsyncClient<::std::net::TcpStream> { #[allow(unused)] @@ -440,7 +444,8 @@ macro_rules! service_inner { } } - impl AsyncClient { + impl AsyncClient + where S: $crate::Stream { #[allow(unused)] #[doc="Create a new asynchronous client that connects to the given address."] pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result @@ -466,7 +471,8 @@ macro_rules! service_inner { } #[allow(unused)] - struct __Server(S); + struct __Server(S) + where S: 'static + Service; impl $crate::protocol::Serve for __Server where S: 'static + Service diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 77bcd67..dda751f 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -153,7 +153,8 @@ pub trait TcpDialerExt { } /// Connects to a socket address. -pub struct TcpDialer(pub A); +pub struct TcpDialer(pub A) + where A: ToSocketAddrs; impl Dialer for TcpDialer { type Stream = TcpStream; fn dial(&self) -> io::Result { diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 87b866a..7d237c9 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -104,7 +104,7 @@ impl<'a, S, St> ConnectionHandler<'a, S, St> } /// Provides methods for blocking until the server completes, -pub struct ServeHandle +pub struct ServeHandle where D: Dialer { tx: Sender<()>, From 28c6c333e5c9f77552ecd7de865cbf373ba0f71f Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 23 Feb 2016 01:12:58 -0800 Subject: [PATCH 03/23] Reorgnize modules --- tarpc/Cargo.toml | 1 + tarpc/src/lib.rs | 7 +- tarpc/src/macros.rs | 24 +++---- tarpc/src/protocol/client.rs | 4 +- tarpc/src/protocol/mod.rs | 122 +---------------------------------- tarpc/src/protocol/server.rs | 7 +- tarpc/src/transport/mod.rs | 70 ++++++++++++++++++++ tarpc/src/transport/tcp.rs | 52 +++++++++++++++ tarpc/src/transport/unix.rs | 60 +++++++++++++++++ 9 files changed, 209 insertions(+), 138 deletions(-) create mode 100644 tarpc/src/transport/mod.rs create mode 100644 tarpc/src/transport/tcp.rs create mode 100644 tarpc/src/transport/unix.rs diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index c2453d9..510f62d 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -15,6 +15,7 @@ bincode = "^0.4.0" log = "^0.3.5" scoped-pool = "^0.1.5" serde = "^0.6.14" +unix_socket = "^0.5.0" [dev-dependencies] lazy_static = "^0.1.15" diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 5fce4e8..5edbd80 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -48,6 +48,7 @@ extern crate bincode; #[macro_use] extern crate log; extern crate scoped_pool; +extern crate unix_socket; macro_rules! pos { () => (concat!(file!(), ":", line!())) @@ -60,5 +61,7 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -pub use protocol::{Config, Dialer, Error, Listener, Result, ServeHandle, Stream, TcpDialer, - TcpDialerExt, TcpTransport, Transport}; +/// Provides transport traits and implementations. +pub mod transport; + +pub use protocol::{Config, Error, Result, ServeHandle}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index e310918..8d41ea6 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -316,17 +316,17 @@ macro_rules! service_inner { #[doc="Spawn a running service."] fn spawn(self, addr: A) - -> $crate::Result<$crate::protocol::ServeHandle<$crate::TcpDialer<::std::net::SocketAddr>>> + -> $crate::Result<$crate::protocol::ServeHandle<$crate::transport::tcp::TcpDialer<::std::net::SocketAddr>>> where A: ::std::net::ToSocketAddrs, Self: 'static, { - self.spawn_with_config($crate::TcpTransport(addr), $crate::Config::default()) + self.spawn_with_config($crate::transport::tcp::TcpTransport(addr), $crate::Config::default()) } #[doc="Spawn a running service."] fn spawn_with_config(self, addr: T, config: $crate::Config) - -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> - where T: $crate::Transport, + -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + where T: $crate::transport::Transport, Self: 'static, { let server = ::std::sync::Arc::new(__Server(self)); @@ -387,18 +387,18 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes RPC calls to the server."] pub struct Client($crate::protocol::Client<__Request, __Reply, S>) - where S: $crate::Stream; + where S: $crate::transport::Stream; impl Client<::std::net::TcpStream> { pub fn new(addr: A) -> $crate::Result where A: ::std::net::ToSocketAddrs, { - Self::with_config($crate::TcpDialer(addr), $crate::Config::default()) + Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) } } impl Client - where S: $crate::Stream + where S: $crate::transport::Stream { #[allow(unused)] #[doc="Create a new client with default configuration that connects to the given \ @@ -407,7 +407,7 @@ macro_rules! service_inner { #[doc="Create a new client with the specified configuration that connects to the \ given address."] pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::Dialer, + where D: $crate::transport::Dialer, { let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(Client(inner)) @@ -431,7 +431,7 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes asynchronous RPC calls to the server."] pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>) - where S: $crate::Stream; + where S: $crate::transport::Stream; impl AsyncClient<::std::net::TcpStream> { #[allow(unused)] @@ -440,16 +440,16 @@ macro_rules! service_inner { pub fn new(addr: A) -> $crate::Result> where A: ::std::net::ToSocketAddrs, { - Self::with_config($crate::TcpDialer(addr), $crate::Config::default()) + Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) } } impl AsyncClient - where S: $crate::Stream { + where S: $crate::transport::Stream { #[allow(unused)] #[doc="Create a new asynchronous client that connects to the given address."] pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::Dialer + where D: $crate::transport::Dialer { let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(AsyncClient(inner)) diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index 38701cb..0ac732f 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -13,7 +13,9 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; -use super::{Config, Deserialize, Dialer, Error, Packet, Result, Serialize, Stream, TcpDialer}; +use super::{Config, Deserialize, Error, Packet, Result, Serialize}; +use transport::{Dialer, Stream}; +use transport::tcp::TcpDialer; /// A client stub that connects to a server to run rpcs. pub struct Client diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index dda751f..7158243 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -8,7 +8,6 @@ use bincode::serde::{deserialize_from, serialize_into}; use serde; use std::io::{self, Read, Write}; use std::convert; -use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::sync::Arc; use std::time::Duration; @@ -63,124 +62,6 @@ pub struct Config { pub timeout: Option, } -/// A factory for creating a listener on a given address. -pub trait Transport { - /// The type of listener that binds to the given address. - type Listener: Listener; - /// Return a listener on the given address, and a dialer to that address. - fn bind(&self) -> io::Result; -} - -/// A transport for TCP. -pub struct TcpTransport(pub A); -impl Transport for TcpTransport { - type Listener = TcpListener; - fn bind(&self) -> io::Result { - TcpListener::bind(&self.0) - } -} - -/// Accepts incoming connections from dialers. -pub trait Listener: Send + 'static { - /// The type of address being listened on. - type Dialer: Dialer; - /// The type of stream this listener accepts. - type Stream: Stream; - /// Accept an incoming stream. - fn accept(&self) -> io::Result; - /// Returns the local address being listened on. - fn dialer(&self) -> io::Result; - /// Iterate over incoming connections. - fn incoming(&self) -> Incoming { - Incoming { - listener: self, - } - } -} - -impl Listener for TcpListener { - type Dialer = TcpDialer; - type Stream = TcpStream; - fn accept(&self) -> io::Result { - self.accept().map(|(stream, _)| stream) - } - fn dialer(&self) -> io::Result> { - self.local_addr().map(|addr| TcpDialer(addr)) - } -} - -/// A cloneable Reader/Writer. -pub trait Stream: Read + Write + Send + Sized + 'static { - /// Clone that can fail. - fn try_clone(&self) -> io::Result; - /// Sets a read timeout. - fn set_read_timeout(&self, dur: Option) -> io::Result<()>; - /// Sets a write timeout. - fn set_write_timeout(&self, dur: Option) -> io::Result<()>; - /// Shuts down both ends of the stream. - fn shutdown(&self) -> io::Result<()>; -} - -impl Stream for TcpStream { - fn try_clone(&self) -> io::Result { - self.try_clone() - } - fn set_read_timeout(&self, dur: Option) -> io::Result<()> { - self.set_read_timeout(dur) - } - fn set_write_timeout(&self, dur: Option) -> io::Result<()> { - self.set_write_timeout(dur) - } - fn shutdown(&self) -> io::Result<()> { - self.shutdown(::std::net::Shutdown::Both) - } -} - -/// A `Stream` factory. -pub trait Dialer { - /// The type of `Stream` this can create. - type Stream: Stream; - /// Open a stream. - fn dial(&self) -> io::Result; -} - -/// Allows retrieving the address when the Dialer is known to be a TcpDialer. -pub trait TcpDialerExt { - /// Type of the address. - type Addr: ToSocketAddrs; - /// Return the address the Dialer connects to. - fn addr(&self) -> &Self::Addr; -} - -/// Connects to a socket address. -pub struct TcpDialer(pub A) - where A: ToSocketAddrs; -impl Dialer for TcpDialer { - type Stream = TcpStream; - fn dial(&self) -> io::Result { - TcpStream::connect(&self.0) - } -} -impl TcpDialerExt for TcpDialer { - type Addr = A; - fn addr(&self) -> &A { - &self.0 - } -} - -/// Iterates over incoming connections. -pub struct Incoming<'a, L: Listener + ?Sized + 'a> { - listener: &'a L, -} - -impl<'a, L: Listener> Iterator for Incoming<'a, L> { - type Item = io::Result; - - fn next(&mut self) -> Option { - Some(self.listener.accept()) - } -} - /// Return type of rpc calls: either the successful return value, or a client error. pub type Result = ::std::result::Result; @@ -205,12 +86,13 @@ impl Serialize for W {} #[cfg(test)] mod test { extern crate env_logger; - use super::{Client, Config, Serve, TcpTransport}; + use super::{Client, Config, Serve}; use scoped_pool::Pool; use std::net::TcpStream; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::time::Duration; + use transport::tcp::TcpTransport; fn test_timeout() -> Option { Some(Duration::from_secs(1)) diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 7d237c9..3c8634b 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -12,8 +12,9 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; -use super::{Config, Deserialize, Dialer, Error, Listener, Packet, Result, Serialize, Stream, - TcpDialer, TcpDialerExt, TcpTransport, Transport}; +use super::{Config, Deserialize, Error, Packet, Result, Serialize}; +use transport::{Dialer, Listener, Stream, Transport}; +use transport::tcp::{TcpDialer, TcpTransport}; struct ConnectionHandler<'a, S, St> where S: Serve, @@ -126,7 +127,7 @@ impl ServeHandle } /// Returns the socket being listened on when the dialer is a `TcpDialer`. - pub fn local_addr(&self) -> &D::Addr where D: TcpDialerExt { + pub fn local_addr(&self) -> &D::Addr { self.dialer().addr() } diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs new file mode 100644 index 0000000..c620255 --- /dev/null +++ b/tarpc/src/transport/mod.rs @@ -0,0 +1,70 @@ +use std::io::{self, Read, Write}; +use std::time::Duration; + +/// A factory for creating a listener on a given address. +pub trait Transport { + /// The type of listener that binds to the given address. + type Listener: Listener; + /// Return a listener on the given address, and a dialer to that address. + fn bind(&self) -> io::Result; +} + +/// Accepts incoming connections from dialers. +pub trait Listener: Send + 'static { + /// The type of address being listened on. + type Dialer: Dialer; + /// The type of stream this listener accepts. + type Stream: Stream; + /// Accept an incoming stream. + fn accept(&self) -> io::Result; + /// Returns the local address being listened on. + fn dialer(&self) -> io::Result; + /// Iterate over incoming connections. + fn incoming(&self) -> Incoming { + Incoming { + listener: self, + } + } +} + +/// A cloneable Reader/Writer. +pub trait Stream: Read + Write + Send + Sized + 'static { + /// Clone that can fail. + fn try_clone(&self) -> io::Result; + /// Sets a read timeout. + fn set_read_timeout(&self, dur: Option) -> io::Result<()>; + /// Sets a write timeout. + fn set_write_timeout(&self, dur: Option) -> io::Result<()>; + /// Shuts down both ends of the stream. + fn shutdown(&self) -> io::Result<()>; +} + +/// A `Stream` factory. +pub trait Dialer { + /// The type of `Stream` this can create. + type Stream: Stream; + /// The type of address being connected to. + type Addr; + /// Open a stream. + fn dial(&self) -> io::Result; + /// Return the address being dialed. + fn addr(&self) -> &Self::Addr; +} + +/// Iterates over incoming connections. +pub struct Incoming<'a, L: Listener + ?Sized + 'a> { + listener: &'a L, +} + +impl<'a, L: Listener> Iterator for Incoming<'a, L> { + type Item = io::Result; + + fn next(&mut self) -> Option { + Some(self.listener.accept()) + } +} + +/// Provides a TCP transport. +pub mod tcp; +/// Provides a unix socket transport. +pub mod unix; diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs new file mode 100644 index 0000000..f147625 --- /dev/null +++ b/tarpc/src/transport/tcp.rs @@ -0,0 +1,52 @@ +use std::io; +use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; +use std::time::Duration; + +/// A transport for TCP. +pub struct TcpTransport(pub A); +impl super::Transport for TcpTransport { + type Listener = TcpListener; + fn bind(&self) -> io::Result { + TcpListener::bind(&self.0) + } +} + +impl super::Listener for TcpListener { + type Dialer = TcpDialer; + type Stream = TcpStream; + fn accept(&self) -> io::Result { + self.accept().map(|(stream, _)| stream) + } + fn dialer(&self) -> io::Result> { + self.local_addr().map(|addr| TcpDialer(addr)) + } +} + +impl super::Stream for TcpStream { + fn try_clone(&self) -> io::Result { + self.try_clone() + } + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.set_read_timeout(dur) + } + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.set_write_timeout(dur) + } + fn shutdown(&self) -> io::Result<()> { + self.shutdown(::std::net::Shutdown::Both) + } +} + +/// Connects to a socket address. +pub struct TcpDialer(pub A) + where A: ToSocketAddrs; +impl super::Dialer for TcpDialer { + type Stream = TcpStream; + type Addr = A; + fn dial(&self) -> io::Result { + TcpStream::connect(&self.0) + } + fn addr(&self) -> &A { + &self.0 + } +} diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs new file mode 100644 index 0000000..d6c9c89 --- /dev/null +++ b/tarpc/src/transport/unix.rs @@ -0,0 +1,60 @@ +use std::io; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use unix_socket::{UnixListener, UnixStream}; + +/// A transport for unix sockets. +pub struct UnixTransport

(pub P) + where P: AsRef; + +impl

super::Transport for UnixTransport

+ where P: AsRef +{ + type Listener = UnixListener; + fn bind(&self) -> io::Result { + UnixListener::bind(&self.0) + } +} + +/// Connects to a unix socket address. +pub struct UnixDialer

(pub P) + where P: AsRef; + +impl

super::Dialer for UnixDialer

+ where P: AsRef +{ + type Stream = UnixStream; + type Addr = P; + fn dial(&self) -> io::Result { + UnixStream::connect(&self.0) + } + fn addr(&self) -> &P { + &self.0 + } +} + +impl super::Listener for UnixListener { + type Stream = UnixStream; + type Dialer = UnixDialer; + fn accept(&self) -> io::Result { + self.accept().map(|(stream, _)| stream) + } + fn dialer(&self) -> io::Result> { + self.local_addr().map(|addr| UnixDialer(addr.as_pathname().unwrap().to_owned())) + } +} + +impl super::Stream for UnixStream { + fn try_clone(&self) -> io::Result { + self.try_clone() + } + fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { + self.set_read_timeout(timeout) + } + fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { + self.set_write_timeout(timeout) + } + fn shutdown(&self) -> io::Result<()> { + self.shutdown(::std::net::Shutdown::Both) + } +} From 396aec3c2f9d13a292d1ff28881ad71ee0e46ac3 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 23 Feb 2016 01:53:20 -0800 Subject: [PATCH 04/23] Add a test --- tarpc/src/macros.rs | 16 ++++++++++++++++ tarpc/src/transport/mod.rs | 14 ++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 8d41ea6..1c31d07 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -522,6 +522,8 @@ mod syntax_test { #[cfg(test)] mod functional_test { extern crate env_logger; + use Config; + use transport::unix::UnixTransport; service! { rpc add(x: i32, y: i32) -> i32; @@ -579,6 +581,20 @@ mod functional_test { assert_eq!(3, client2.add(1, 2).get().unwrap()); } + #[test] + fn async_try_clone_unix() { + let handle = Server.spawn_with_config(UnixTransport("/tmp/test"), + Config::default()).unwrap(); + let client1 = AsyncClient::with_config(handle.dialer(), + Config::default()).unwrap(); + let client2 = client1.try_clone().unwrap(); + assert_eq!(3, client1.add(1, 2).get().unwrap()); + assert_eq!(3, client2.add(1, 2).get().unwrap()); + drop(client1); + drop(client2); + handle.shutdown(); + } + // Tests that a server can be wrapped in an Arc; no need to run, just compile #[allow(dead_code)] fn serve_arc_server() { diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index c620255..98d9406 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -51,6 +51,20 @@ pub trait Dialer { fn addr(&self) -> &Self::Addr; } +impl Dialer for P + where P: ::std::ops::Deref, + D: Dialer + 'static +{ + type Stream = D::Stream; + type Addr = D::Addr; + fn dial(&self) -> io::Result { + (**self).dial() + } + fn addr(&self) -> &Self::Addr { + (**self).addr() + } +} + /// Iterates over incoming connections. pub struct Incoming<'a, L: Listener + ?Sized + 'a> { listener: &'a L, From d5b2f23f748305bb12ec186ee83112bd6056a3be Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 23 Feb 2016 08:26:56 -0800 Subject: [PATCH 05/23] Move generic bounds to where clause --- tarpc/src/protocol/client.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index 0ac732f..5f86e61 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -18,8 +18,9 @@ use transport::{Dialer, Stream}; use transport::tcp::TcpDialer; /// A client stub that connects to a server to run rpcs. -pub struct Client - where Request: serde::ser::Serialize +pub struct Client + where Request: serde::ser::Serialize, + S: Stream, { // The guard is in an option so it can be joined in the drop fn reader_guard: Arc>>, @@ -40,9 +41,10 @@ impl Client } } -impl Client +impl Client where Request: serde::ser::Serialize + Send + 'static, - Reply: serde::de::Deserialize + Send + 'static + Reply: serde::de::Deserialize + Send + 'static, + S: Stream, { /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. @@ -107,8 +109,9 @@ impl Client } } -impl Drop for Client - where Request: serde::ser::Serialize +impl Drop for Client + where Request: serde::ser::Serialize, + S: Stream, { fn drop(&mut self) { debug!("Dropping Client."); @@ -195,11 +198,12 @@ impl RpcFutures { } } -fn write(outbound: Receiver<(Request, Sender>)>, +fn write(outbound: Receiver<(Request, Sender>)>, requests: Arc>>, stream: S) where Request: serde::Serialize, - Reply: serde::Deserialize + Reply: serde::Deserialize, + S: Stream, { let mut next_id = 0; let mut stream = BufWriter::new(stream); @@ -248,8 +252,9 @@ fn write(outbound: Receiver<(Request, Sender(requests: Arc>>, stream: S) - where Reply: serde::Deserialize +fn read(requests: Arc>>, stream: S) + where Reply: serde::Deserialize, + S: Stream, { let mut stream = BufReader::new(stream); loop { From 774411c63674c4b4cc439a409e3e6b880997b065 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 24 Feb 2016 20:26:49 -0800 Subject: [PATCH 06/23] Create temp file using tempdir in test --- tarpc/Cargo.toml | 1 + tarpc/src/macros.rs | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 510f62d..5b65672 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -20,3 +20,4 @@ unix_socket = "^0.5.0" [dev-dependencies] lazy_static = "^0.1.15" env_logger = "^0.3.2" +tempdir = "^0.3.4" diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 1c31d07..11a9b45 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -522,6 +522,7 @@ mod syntax_test { #[cfg(test)] mod functional_test { extern crate env_logger; + extern crate tempdir; use Config; use transport::unix::UnixTransport; @@ -583,16 +584,15 @@ mod functional_test { #[test] fn async_try_clone_unix() { - let handle = Server.spawn_with_config(UnixTransport("/tmp/test"), + let temp_dir = tempdir::TempDir::new(module_path!()).unwrap(); + let temp_file = temp_dir.path().join("async_try_clone_unix.tmp"); + let handle = Server.spawn_with_config(UnixTransport(temp_file), Config::default()).unwrap(); let client1 = AsyncClient::with_config(handle.dialer(), Config::default()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap()); - drop(client1); - drop(client2); - handle.shutdown(); } // Tests that a server can be wrapped in an Arc; no need to run, just compile From 8307c708a3b7a650a723a4e942106567b1dc6b10 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 24 Feb 2016 20:32:15 -0800 Subject: [PATCH 07/23] Better documentation for Stream. Basically copied from TcpStream verbatim. --- tarpc/src/transport/mod.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index 98d9406..867c8b2 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -29,13 +29,27 @@ pub trait Listener: Send + 'static { /// A cloneable Reader/Writer. pub trait Stream: Read + Write + Send + Sized + 'static { - /// Clone that can fail. + /// Creates a new independently owned handle to the underlying socket. + /// + /// The returned TcpStream should reference the same stream that this + /// object references. Both handles should read and write the same + /// stream of data, and options set on one stream should be propagated + /// to the other stream. fn try_clone(&self) -> io::Result; /// Sets a read timeout. + /// + /// If the value specified is `None`, then read calls will block indefinitely. + /// It is an error to pass the zero `Duration` to this method. fn set_read_timeout(&self, dur: Option) -> io::Result<()>; /// Sets a write timeout. + /// + /// If the value specified is `None`, then write calls will block indefinitely. + /// It is an error to pass the zero `Duration` to this method. fn set_write_timeout(&self, dur: Option) -> io::Result<()>; /// Shuts down both ends of the stream. + /// + /// Implementations should cause all pending and future I/O on the specified + /// portions to return immediately with an appropriate value. fn shutdown(&self) -> io::Result<()>; } From 03dc512e252221f9bf0bb517576907c42fe011d8 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 24 Feb 2016 21:59:21 -0800 Subject: [PATCH 08/23] Remove Addr associated type of Dialer. Also, make spawn() take a Dialer, but impl Dialer for str, defaulting to TCP transport. --- tarpc/src/lib.rs | 6 ++-- tarpc/src/macros.rs | 67 +++++++++++++++++++----------------- tarpc/src/protocol/client.rs | 22 +++++------- tarpc/src/protocol/mod.rs | 17 ++++----- tarpc/src/protocol/server.rs | 20 +++++------ tarpc/src/transport/mod.rs | 10 +----- tarpc/src/transport/tcp.rs | 20 ++++++++--- tarpc/src/transport/unix.rs | 4 --- 8 files changed, 77 insertions(+), 89 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 5edbd80..3169564 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -31,13 +31,13 @@ //! //! fn main() { //! let addr = "127.0.0.1:9000"; -//! let shutdown = Server.spawn(addr).unwrap(); -//! let client = Client::new(addr).unwrap(); +//! let serve_handle = Server.spawn(addr).unwrap(); +//! let client = Client::new(serve_handle.dialer()).unwrap(); //! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!("Hello, Mom!".to_string(), //! client.hello("Mom".to_string()).unwrap()); //! drop(client); -//! shutdown.shutdown(); +//! serve_handle.shutdown(); //! } //! ``` diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 11a9b45..1e28084 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -315,22 +315,22 @@ macro_rules! service_inner { )* #[doc="Spawn a running service."] - fn spawn(self, addr: A) - -> $crate::Result<$crate::protocol::ServeHandle<$crate::transport::tcp::TcpDialer<::std::net::SocketAddr>>> - where A: ::std::net::ToSocketAddrs, + fn spawn(self, transport: T) + -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + where T: $crate::transport::Transport, Self: 'static, { - self.spawn_with_config($crate::transport::tcp::TcpTransport(addr), $crate::Config::default()) + self.spawn_with_config(transport, $crate::Config::default()) } #[doc="Spawn a running service."] - fn spawn_with_config(self, addr: T, config: $crate::Config) + fn spawn_with_config(self, transport: T, config: $crate::Config) -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> where T: $crate::transport::Transport, Self: 'static, { let server = ::std::sync::Arc::new(__Server(self)); - let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config)); + let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config)); ::std::result::Result::Ok(handle) } } @@ -389,20 +389,18 @@ macro_rules! service_inner { pub struct Client($crate::protocol::Client<__Request, __Reply, S>) where S: $crate::transport::Stream; - impl Client<::std::net::TcpStream> { - pub fn new(addr: A) -> $crate::Result - where A: ::std::net::ToSocketAddrs, - { - Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) - } - } - impl Client where S: $crate::transport::Stream { #[allow(unused)] #[doc="Create a new client with default configuration that connects to the given \ address."] + pub fn new(dialer: D) -> $crate::Result + where D: $crate::transport::Dialer, + { + Self::with_config(dialer, $crate::Config::default()) + } + #[allow(unused)] #[doc="Create a new client with the specified configuration that connects to the \ given address."] @@ -433,23 +431,21 @@ macro_rules! service_inner { pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>) where S: $crate::transport::Stream; - impl AsyncClient<::std::net::TcpStream> { - #[allow(unused)] - #[doc="Create a new asynchronous client with default configuration that connects to \ - the given address."] - pub fn new(addr: A) -> $crate::Result> - where A: ::std::net::ToSocketAddrs, - { - Self::with_config($crate::transport::tcp::TcpDialer(addr), $crate::Config::default()) - } - } - impl AsyncClient where S: $crate::transport::Stream { + #[allow(unused)] + #[doc="Create a new asynchronous client with default configuration that connects to \ + the given address."] + pub fn new(dialer: D) -> $crate::Result + where D: $crate::transport::Dialer, + { + Self::with_config(dialer, $crate::Config::default()) + } + #[allow(unused)] #[doc="Create a new asynchronous client that connects to the given address."] pub fn with_config(dialer: D, config: $crate::Config) -> $crate::Result - where D: $crate::transport::Dialer + where D: $crate::transport::Dialer, { let inner = try!($crate::protocol::Client::with_config(dialer, config)); ::std::result::Result::Ok(AsyncClient(inner)) @@ -546,7 +542,7 @@ mod functional_test { fn simple() { let _ = env_logger::init(); let handle = Server.spawn("localhost:0").unwrap(); - let client = Client::new(handle.local_addr()).unwrap(); + let client = Client::new(handle.dialer()).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap()); drop(client); @@ -557,7 +553,7 @@ mod functional_test { fn simple_async() { let _ = env_logger::init(); let handle = Server.spawn("localhost:0").unwrap(); - let client = AsyncClient::new(handle.local_addr()).unwrap(); + let client = AsyncClient::new(handle.dialer()).unwrap(); assert_eq!(3, client.add(1, 2).get().unwrap()); assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap()); drop(client); @@ -567,7 +563,7 @@ mod functional_test { #[test] fn try_clone() { let handle = Server.spawn("localhost:0").unwrap(); - let client1 = Client::new(handle.local_addr()).unwrap(); + let client1 = Client::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).unwrap()); assert_eq!(3, client2.add(1, 2).unwrap()); @@ -576,7 +572,7 @@ mod functional_test { #[test] fn async_try_clone() { let handle = Server.spawn("localhost:0").unwrap(); - let client1 = AsyncClient::new(handle.local_addr()).unwrap(); + let client1 = AsyncClient::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap()); @@ -584,8 +580,9 @@ mod functional_test { #[test] fn async_try_clone_unix() { - let temp_dir = tempdir::TempDir::new(module_path!()).unwrap(); - let temp_file = temp_dir.path().join("async_try_clone_unix.tmp"); + let temp_dir = tempdir::TempDir::new("tarpc").unwrap(); + let temp_file = temp_dir.path() + .join("async_try_clone_unix.tmp"); let handle = Server.spawn_with_config(UnixTransport(temp_file), Config::default()).unwrap(); let client1 = AsyncClient::with_config(handle.dialer(), @@ -601,6 +598,12 @@ mod functional_test { let _ = ::std::sync::Arc::new(Server).spawn("localhost:0"); } + // Tests that a tcp client can be created from &str + #[allow(dead_code)] + fn test_client_str() { + let _ = Client::new("localhost:0"); + } + #[test] fn serde() { use bincode; diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index 5f86e61..b0bea70 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -8,14 +8,12 @@ use std::fmt; use std::io::{self, BufReader, BufWriter, Read}; use std::collections::HashMap; use std::mem; -use std::net::{TcpStream, ToSocketAddrs}; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use transport::{Dialer, Stream}; -use transport::tcp::TcpDialer; /// A client stub that connects to a server to run rpcs. pub struct Client @@ -29,23 +27,19 @@ pub struct Client shutdown: S, } - -impl Client - where Request: serde::ser::Serialize + Send + 'static, - Reply: serde::de::Deserialize + Send + 'static -{ - /// Create a new client that connects to `addr`. The client uses the given timeout - /// for both reads and writes. - pub fn new(addr: A) -> io::Result { - Self::with_config(TcpDialer(addr), Config::default()) - } -} - impl Client where Request: serde::ser::Serialize + Send + 'static, Reply: serde::de::Deserialize + Send + 'static, S: Stream, { + /// Create a new client that connects to `addr`. The client uses the given timeout + /// for both reads and writes. + pub fn new(dialer: D) -> io::Result + where D: Dialer, + { + Self::with_config(dialer, Config::default()) + } + /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. pub fn with_config(dialer: D, config: Config) -> io::Result diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 7158243..f995ace 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -129,7 +129,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); - let client: Client<(), u64, TcpStream> = Client::new(serve_handle.local_addr()).unwrap(); + let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap(); drop(client); serve_handle.shutdown(); } @@ -139,9 +139,8 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.clone().spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); // The explicit type is required so that it doesn't deserialize a u32 instead of u64 - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); assert_eq!(0, client.rpc(()).unwrap()); assert_eq!(1, server.count()); assert_eq!(1, client.rpc(()).unwrap()); @@ -186,8 +185,7 @@ mod test { timeout: Some(Duration::new(0, 10)), }) .unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(())); thread.join().unwrap(); @@ -202,8 +200,7 @@ mod test { timeout: test_timeout(), }) .unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr).unwrap()); + let client: Arc> = Arc::new(Client::new(serve_handle.dialer()).unwrap()); client.rpc(()).unwrap(); serve_handle.shutdown(); match client.rpc(()) { @@ -220,8 +217,7 @@ mod test { let pool = Pool::new(concurrency); let server = Arc::new(BarrierServer::new(concurrency)); let serve_handle = server.clone().spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); pool.scoped(|scope| { for _ in 0..concurrency { let client = client.try_clone().unwrap(); @@ -240,8 +236,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn("localhost:0").unwrap(); - let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64, _> = Client::new(addr).unwrap(); + let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); // Drop future immediately; does the reader channel panic when sending? client.rpc_async(()); diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 3c8634b..e7251ce 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -7,14 +7,13 @@ use serde; use scoped_pool::{Pool, Scope}; use std::fmt; use std::io::{self, BufReader, BufWriter}; -use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use transport::{Dialer, Listener, Stream, Transport}; -use transport::tcp::{TcpDialer, TcpTransport}; +use transport::tcp::TcpDialer; struct ConnectionHandler<'a, S, St> where S: Serve, @@ -126,11 +125,6 @@ impl ServeHandle &self.dialer } - /// Returns the socket being listened on when the dialer is a `TcpDialer`. - pub fn local_addr(&self) -> &D::Addr { - self.dialer().addr() - } - /// Shutdown the server. Gracefully shuts down the serve thread but currently does not /// gracefully close open connections. pub fn shutdown(self) { @@ -225,17 +219,19 @@ pub trait Serve: Send + Sync + Sized { fn serve(&self, request: Self::Request) -> Self::Reply; /// spawn - fn spawn(self, addr: A) -> io::Result>> - where A: ToSocketAddrs, + fn spawn(self, transport: T) + -> io::Result::Dialer>> + where T: Transport, Self: 'static, { - self.spawn_with_config(TcpTransport(addr), Config::default()) + self.spawn_with_config(transport, Config::default()) } /// spawn - fn spawn_with_config(self, transport: T, config: Config) + fn spawn_with_config(self, transport: T, config: Config) -> io::Result::Dialer>> - where Self: 'static, + where T: Transport, + Self: 'static, { let listener = try!(transport.bind()); let dialer = try!(listener.dialer()); diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index 867c8b2..fdb83ef 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -57,26 +57,18 @@ pub trait Stream: Read + Write + Send + Sized + 'static { pub trait Dialer { /// The type of `Stream` this can create. type Stream: Stream; - /// The type of address being connected to. - type Addr; /// Open a stream. fn dial(&self) -> io::Result; - /// Return the address being dialed. - fn addr(&self) -> &Self::Addr; } -impl Dialer for P +impl Dialer for P where P: ::std::ops::Deref, D: Dialer + 'static { type Stream = D::Stream; - type Addr = D::Addr; fn dial(&self) -> io::Result { (**self).dial() } - fn addr(&self) -> &Self::Addr { - (**self).addr() - } } /// Iterates over incoming connections. diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index f147625..a5b78e7 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -11,6 +11,13 @@ impl super::Transport for TcpTransport { } } +impl super::Transport for A { + type Listener = TcpListener; + fn bind(&self) -> io::Result { + TcpListener::bind(self) + } +} + impl super::Listener for TcpListener { type Dialer = TcpDialer; type Stream = TcpStream; @@ -40,13 +47,18 @@ impl super::Stream for TcpStream { /// Connects to a socket address. pub struct TcpDialer(pub A) where A: ToSocketAddrs; -impl super::Dialer for TcpDialer { +impl super::Dialer for TcpDialer + where A: ToSocketAddrs +{ type Stream = TcpStream; - type Addr = A; fn dial(&self) -> io::Result { TcpStream::connect(&self.0) } - fn addr(&self) -> &A { - &self.0 +} +impl super::Dialer for str +{ + type Stream = TcpStream; + fn dial(&self) -> io::Result { + TcpStream::connect(self) } } diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index d6c9c89..8571ee5 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -24,13 +24,9 @@ impl

super::Dialer for UnixDialer

where P: AsRef { type Stream = UnixStream; - type Addr = P; fn dial(&self) -> io::Result { UnixStream::connect(&self.0) } - fn addr(&self) -> &P { - &self.0 - } } impl super::Listener for UnixListener { From c398e2389beb5d02fbe79174e3f88c40f716a978 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 24 Feb 2016 23:25:50 -0800 Subject: [PATCH 09/23] Why were we wrapping the service in an arc? --- tarpc/src/macros.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 1e28084..f2aaa0b 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -329,7 +329,7 @@ macro_rules! service_inner { where T: $crate::transport::Transport, Self: 'static, { - let server = ::std::sync::Arc::new(__Server(self)); + let server = __Server(self); let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config)); ::std::result::Result::Ok(handle) } From 9827f7545989caf2cc9835cbb36e164182d36dcb Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 24 Feb 2016 23:33:03 -0800 Subject: [PATCH 10/23] Fix examples --- README.md | 2 +- tarpc_examples/src/lib.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 68d6f3a..22d7d6d 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ impl HelloService for HelloServer { fn main() { let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap(); - let client = hello_service::Client::new(server_handle.local_addr()).unwrap(); + let client = hello_service::Client::new(server_handle.dialer()).unwrap(); assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap()); drop(client); server_handle.shutdown(); diff --git a/tarpc_examples/src/lib.rs b/tarpc_examples/src/lib.rs index 9842c57..72bf45a 100644 --- a/tarpc_examples/src/lib.rs +++ b/tarpc_examples/src/lib.rs @@ -41,8 +41,9 @@ mod benchmark { Arc::new(Mutex::new(handle)) }; static ref CLIENT: Arc> = { - let addr = HANDLE.lock().unwrap().local_addr().clone(); - let client = AsyncClient::new(addr).unwrap(); + let lock = HANDLE.lock().unwrap(); + let dialer = lock.dialer(); + let client = AsyncClient::new(dialer).unwrap(); Arc::new(Mutex::new(client)) }; } From 6273ebefa7254bea1d4923b52ed73eb9a939d414 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 00:04:35 -0800 Subject: [PATCH 11/23] rustfmt --- tarpc/src/macros.rs | 40 +++++++++++++++++++++--------------- tarpc/src/protocol/client.rs | 18 ++++++++-------- tarpc/src/protocol/mod.rs | 8 ++------ tarpc/src/protocol/server.rs | 17 +++++++-------- tarpc/src/transport/mod.rs | 6 ++---- tarpc/src/transport/tcp.rs | 6 ++---- tarpc/src/transport/unix.rs | 6 ++---- 7 files changed, 50 insertions(+), 51 deletions(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index f2aaa0b..00399c7 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -261,7 +261,7 @@ macro_rules! service { #[doc(hidden)] #[macro_export] macro_rules! service_inner { - // Pattern for when the next rpc has an implicit unit return type +// Pattern for when the next rpc has an implicit unit return type ( { $(#[$attr:meta])* @@ -280,7 +280,7 @@ macro_rules! service_inner { rpc $fn_name( $( $arg : $in_ ),* ) -> (); } }; - // Pattern for when the next rpc has an explicit return type +// Pattern for when the next rpc has an explicit return type ( { $(#[$attr:meta])* @@ -299,7 +299,7 @@ macro_rules! service_inner { rpc $fn_name( $( $arg : $in_ ),* ) -> $out; } }; - // Pattern when all return types have been expanded +// Pattern when all return types have been expanded ( { } // none left to expand $( @@ -315,8 +315,11 @@ macro_rules! service_inner { )* #[doc="Spawn a running service."] - fn spawn(self, transport: T) - -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + fn spawn(self, + transport: T) + -> $crate::Result< + $crate::protocol::ServeHandle< + ::Dialer>> where T: $crate::transport::Transport, Self: 'static, { @@ -324,13 +327,18 @@ macro_rules! service_inner { } #[doc="Spawn a running service."] - fn spawn_with_config(self, transport: T, config: $crate::Config) - -> $crate::Result<$crate::protocol::ServeHandle<::Dialer>> + fn spawn_with_config(self, + transport: T, + config: $crate::Config) + -> $crate::Result< + $crate::protocol::ServeHandle< + ::Dialer>> where T: $crate::transport::Transport, Self: 'static, { let server = __Server(self); - let handle = try!($crate::protocol::Serve::spawn_with_config(server, transport, config)); + let result = $crate::protocol::Serve::spawn_with_config(server, transport, config); + let handle = try!(result); ::std::result::Result::Ok(handle) } } @@ -386,8 +394,9 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes RPC calls to the server."] - pub struct Client($crate::protocol::Client<__Request, __Reply, S>) - where S: $crate::transport::Stream; + pub struct Client( + $crate::protocol::Client<__Request, __Reply, S> + ) where S: $crate::transport::Stream; impl Client where S: $crate::transport::Stream @@ -428,8 +437,9 @@ macro_rules! service_inner { #[allow(unused)] #[doc="The client stub that makes asynchronous RPC calls to the server."] - pub struct AsyncClient($crate::protocol::Client<__Request, __Reply, S>) - where S: $crate::transport::Stream; + pub struct AsyncClient( + $crate::protocol::Client<__Request, __Reply, S> + ) where S: $crate::transport::Stream; impl AsyncClient where S: $crate::transport::Stream { @@ -583,10 +593,8 @@ mod functional_test { let temp_dir = tempdir::TempDir::new("tarpc").unwrap(); let temp_file = temp_dir.path() .join("async_try_clone_unix.tmp"); - let handle = Server.spawn_with_config(UnixTransport(temp_file), - Config::default()).unwrap(); - let client1 = AsyncClient::with_config(handle.dialer(), - Config::default()).unwrap(); + let handle = Server.spawn(UnixTransport(temp_file)).unwrap(); + let client1 = AsyncClient::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap()); diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index b0bea70..dd2ed56 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -18,7 +18,7 @@ use transport::{Dialer, Stream}; /// A client stub that connects to a server to run rpcs. pub struct Client where Request: serde::ser::Serialize, - S: Stream, + S: Stream { // The guard is in an option so it can be joined in the drop fn reader_guard: Arc>>, @@ -30,12 +30,12 @@ pub struct Client impl Client where Request: serde::ser::Serialize + Send + 'static, Reply: serde::de::Deserialize + Send + 'static, - S: Stream, + S: Stream { /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. pub fn new(dialer: D) -> io::Result - where D: Dialer, + where D: Dialer { Self::with_config(dialer, Config::default()) } @@ -43,7 +43,7 @@ impl Client /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. pub fn with_config(dialer: D, config: Config) -> io::Result - where D: Dialer, + where D: Dialer { let stream = try!(dialer.dial()); try!(stream.set_read_timeout(config.timeout)); @@ -105,7 +105,7 @@ impl Client impl Drop for Client where Request: serde::ser::Serialize, - S: Stream, + S: Stream { fn drop(&mut self) { debug!("Dropping Client."); @@ -193,11 +193,11 @@ impl RpcFutures { } fn write(outbound: Receiver<(Request, Sender>)>, - requests: Arc>>, - stream: S) + requests: Arc>>, + stream: S) where Request: serde::Serialize, Reply: serde::Deserialize, - S: Stream, + S: Stream { let mut next_id = 0; let mut stream = BufWriter::new(stream); @@ -248,7 +248,7 @@ fn write(outbound: Receiver<(Request, Sender>)> fn read(requests: Arc>>, stream: S) where Reply: serde::Deserialize, - S: Stream, + S: Stream { let mut stream = BufReader::new(stream); loop { diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index f995ace..df55f32 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -181,9 +181,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"), - Config { - timeout: Some(Duration::new(0, 10)), - }) + Config { timeout: Some(Duration::new(0, 10)) }) .unwrap(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); @@ -196,9 +194,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn_with_config(TcpTransport("localhost:0"), - Config { - timeout: test_timeout(), - }) + Config { timeout: test_timeout() }) .unwrap(); let client: Arc> = Arc::new(Client::new(serve_handle.dialer()).unwrap()); client.rpc(()).unwrap(); diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index e7251ce..0a2523f 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -17,7 +17,7 @@ use transport::tcp::TcpDialer; struct ConnectionHandler<'a, S, St> where S: Serve, - St: Stream, + St: Stream { read_stream: BufReader, write_stream: BufWriter, @@ -27,7 +27,7 @@ struct ConnectionHandler<'a, S, St> impl<'a, S, St> ConnectionHandler<'a, S, St> where S: Serve, - St: Stream, + St: Stream { fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> { let ConnectionHandler { @@ -219,19 +219,20 @@ pub trait Serve: Send + Sync + Sized { fn serve(&self, request: Self::Request) -> Self::Reply; /// spawn - fn spawn(self, transport: T) - -> io::Result::Dialer>> + fn spawn(self, transport: T) -> io::Result::Dialer>> where T: Transport, - Self: 'static, + Self: 'static { self.spawn_with_config(transport, Config::default()) } /// spawn - fn spawn_with_config(self, transport: T, config: Config) - -> io::Result::Dialer>> + fn spawn_with_config(self, + transport: T, + config: Config) + -> io::Result::Dialer>> where T: Transport, - Self: 'static, + Self: 'static { let listener = try!(transport.bind()); let dialer = try!(listener.dialer()); diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index fdb83ef..41119fc 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -21,9 +21,7 @@ pub trait Listener: Send + 'static { fn dialer(&self) -> io::Result; /// Iterate over incoming connections. fn incoming(&self) -> Incoming { - Incoming { - listener: self, - } + Incoming { listener: self } } } @@ -62,7 +60,7 @@ pub trait Dialer { } impl Dialer for P - where P: ::std::ops::Deref, + where P: ::std::ops::Deref, D: Dialer + 'static { type Stream = D::Stream; diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index a5b78e7..f7e078d 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -45,8 +45,7 @@ impl super::Stream for TcpStream { } /// Connects to a socket address. -pub struct TcpDialer(pub A) - where A: ToSocketAddrs; +pub struct TcpDialer(pub A) where A: ToSocketAddrs; impl super::Dialer for TcpDialer where A: ToSocketAddrs { @@ -55,8 +54,7 @@ impl super::Dialer for TcpDialer TcpStream::connect(&self.0) } } -impl super::Dialer for str -{ +impl super::Dialer for str { type Stream = TcpStream; fn dial(&self) -> io::Result { TcpStream::connect(self) diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index 8571ee5..1e2e7ad 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -4,8 +4,7 @@ use std::time::Duration; use unix_socket::{UnixListener, UnixStream}; /// A transport for unix sockets. -pub struct UnixTransport

(pub P) - where P: AsRef; +pub struct UnixTransport

(pub P) where P: AsRef; impl

super::Transport for UnixTransport

where P: AsRef @@ -17,8 +16,7 @@ impl

super::Transport for UnixTransport

} /// Connects to a unix socket address. -pub struct UnixDialer

(pub P) - where P: AsRef; +pub struct UnixDialer

(pub P) where P: AsRef; impl

super::Dialer for UnixDialer

where P: AsRef From a1f529f7947ab34ce9ff33d0381458918c645365 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 00:58:48 -0800 Subject: [PATCH 12/23] Reformat some generic bounds --- tarpc/src/macros.rs | 1 - tarpc/src/protocol/server.rs | 13 +++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 00399c7..879a8d9 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -529,7 +529,6 @@ mod syntax_test { mod functional_test { extern crate env_logger; extern crate tempdir; - use Config; use transport::unix::UnixTransport; service! { diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 0a2523f..1e6e1ee 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -138,7 +138,9 @@ impl ServeHandle } } -struct Server<'a, S: 'a, L: Listener> { +struct Server<'a, S: 'a, L> + where L: Listener +{ server: &'a S, listener: L, read_timeout: Option, @@ -146,8 +148,9 @@ struct Server<'a, S: 'a, L: Listener> { shutdown: &'a AtomicBool, } -impl<'a, S: 'a, L: Listener> Server<'a, S, L> - where S: Serve + 'static +impl<'a, S, L> Server<'a, S, L> + where S: Serve + 'static, + L: Listener, { fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b @@ -201,7 +204,9 @@ impl<'a, S: 'a, L: Listener> Server<'a, S, L> } } -impl<'a, S, L: Listener> Drop for Server<'a, S, L> { +impl<'a, S, L> Drop for Server<'a, S, L> + where L: Listener +{ fn drop(&mut self) { debug!("Shutting down connection handlers."); self.shutdown.store(true, Ordering::SeqCst); From 714541a7a4384cd3ae6d93d82c2aa05456a47872 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 01:05:38 -0800 Subject: [PATCH 13/23] Don't unwrap in Listener::dialer --- tarpc/src/transport/unix.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index 1e2e7ad..f7e9f54 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -34,7 +34,11 @@ impl super::Listener for UnixListener { self.accept().map(|(stream, _)| stream) } fn dialer(&self) -> io::Result> { - self.local_addr().map(|addr| UnixDialer(addr.as_pathname().unwrap().to_owned())) + self.local_addr().and_then(|addr| match addr.as_pathname() { + Some(path) => Ok(UnixDialer(path.to_owned())), + None => Err(io::Error::new(io::ErrorKind::AddrNotAvailable, + "Couldn't get a path to bound unix socket")), + }) } } From 8e5a44b423296fb61534cd84db1989ba01c66fa9 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 22:28:39 -0800 Subject: [PATCH 14/23] Update README to list arbitrary transports as a feature. --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 22d7d6d..1aac6db 100644 --- a/README.md +++ b/README.md @@ -56,17 +56,18 @@ fn main() { The `service!` macro expands to a collection of items that collectively form an rpc service. In the above example, the macro is called within the `hello_service` module. This module will contain a -`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for -starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp -port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it -easy and ergonomic to write servers without dealing with sockets or serialization directly. See the -tarpc_examples package for more sophisticated examples. +`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for +starting the service: `spawn` and `spawn_with_config`, which start the service listening over an +arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated +types make it easy and ergonomic to write servers without dealing with sockets or serialization +directly. See the tarpc_examples package for more sophisticated examples. ## Documentation Use `cargo doc` as you normally would to see the documentation created for all items expanded by a `service!` invocation. ## Additional Features +- Connect over any transport that `impl`s the `Transport` trait. - Concurrent requests from a single client. - Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures. - Attributes can be specified on rpc methods. These will be included on both the `Service` trait From 72a9f8f70dde96ac06eb405f51758db542405ab2 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 22:49:22 -0800 Subject: [PATCH 15/23] Update deps versions. --- tarpc/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 5b65672..b47c0d2 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -13,8 +13,8 @@ description = "An RPC framework for Rust with a focus on ease of use." [dependencies] bincode = "^0.4.0" log = "^0.3.5" -scoped-pool = "^0.1.5" -serde = "^0.6.14" +scoped-pool = "^0.1.8" +serde = "^0.6.15" unix_socket = "^0.5.0" [dev-dependencies] From 5ac4b710a50f11209cb848f80b2e0b89228bc2c1 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 25 Feb 2016 23:30:00 -0800 Subject: [PATCH 16/23] Simplify lib.rs example --- tarpc/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 3169564..dafa622 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -30,8 +30,7 @@ //! } //! //! fn main() { -//! let addr = "127.0.0.1:9000"; -//! let serve_handle = Server.spawn(addr).unwrap(); +//! let serve_handle = Server.spawn("localhost:0").unwrap(); //! let client = Client::new(serve_handle.dialer()).unwrap(); //! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!("Hello, Mom!".to_string(), From 5d27d34bd301fcf7fa5aef42ecc7543f911a6bd4 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:36:54 -0700 Subject: [PATCH 17/23] Add a documentation note on addresses --- tarpc/src/transport/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index 41119fc..f666d10 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -2,6 +2,8 @@ use std::io::{self, Read, Write}; use std::time::Duration; /// A factory for creating a listener on a given address. +/// For TCP, an address might be an IPv4 address; for Unix sockets, it +/// is just a file name. pub trait Transport { /// The type of listener that binds to the given address. type Listener: Listener; From aec1574824d00134d9c9d18fa04939a9f828e831 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:43:22 -0700 Subject: [PATCH 18/23] Add a line in between struct and impl --- tarpc/src/transport/tcp.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index f7e078d..9c1ed7e 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -4,6 +4,7 @@ use std::time::Duration; /// A transport for TCP. pub struct TcpTransport(pub A); + impl super::Transport for TcpTransport { type Listener = TcpListener; fn bind(&self) -> io::Result { From b0495ebc0073c4844b4e23fe56a7c36ae11da635 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:43:36 -0700 Subject: [PATCH 19/23] Cargo fmt --- tarpc/src/protocol/server.rs | 2 +- tarpc/src/transport/unix.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 1e6e1ee..f635145 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -150,7 +150,7 @@ struct Server<'a, S: 'a, L> impl<'a, S, L> Server<'a, S, L> where S: Serve + 'static, - L: Listener, + L: Listener { fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index f7e9f54..328053a 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -34,10 +34,14 @@ impl super::Listener for UnixListener { self.accept().map(|(stream, _)| stream) } fn dialer(&self) -> io::Result> { - self.local_addr().and_then(|addr| match addr.as_pathname() { - Some(path) => Ok(UnixDialer(path.to_owned())), - None => Err(io::Error::new(io::ErrorKind::AddrNotAvailable, - "Couldn't get a path to bound unix socket")), + self.local_addr().and_then(|addr| { + match addr.as_pathname() { + Some(path) => Ok(UnixDialer(path.to_owned())), + None => { + Err(io::Error::new(io::ErrorKind::AddrNotAvailable, + "Couldn't get a path to bound unix socket")) + } + } }) } } From 6a6832cfbceb4533e0e46229445190c2d1edba68 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:45:55 -0700 Subject: [PATCH 20/23] Generify doc comment --- tarpc/src/transport/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index f666d10..6b367ee 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -29,9 +29,9 @@ pub trait Listener: Send + 'static { /// A cloneable Reader/Writer. pub trait Stream: Read + Write + Send + Sized + 'static { - /// Creates a new independently owned handle to the underlying socket. + /// Creates a new independently owned handle to the Stream. /// - /// The returned TcpStream should reference the same stream that this + /// The returned Stream should reference the same stream that this /// object references. Both handles should read and write the same /// stream of data, and options set on one stream should be propagated /// to the other stream. From f33cb3bd5314be04d42c4323d8d784dbb206181d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:46:23 -0700 Subject: [PATCH 21/23] Add a line between impl and struct --- tarpc/src/transport/tcp.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index 9c1ed7e..d015374 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -47,6 +47,7 @@ impl super::Stream for TcpStream { /// Connects to a socket address. pub struct TcpDialer(pub A) where A: ToSocketAddrs; + impl super::Dialer for TcpDialer where A: ToSocketAddrs { From bbfb4325d22c4bbf25753f296ba5fde487deed06 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 20:49:38 -0700 Subject: [PATCH 22/23] Simplify readme example --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1aac6db..4ecaa93 100644 --- a/README.md +++ b/README.md @@ -46,8 +46,9 @@ impl HelloService for HelloServer { } fn main() { - let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap(); - let client = hello_service::Client::new(server_handle.dialer()).unwrap(); + let addr = "localhost:10000"; + let server_handle = HelloServer.spawn(addr).unwrap(); + let client = hello_service::Client::new(addr).unwrap(); assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap()); drop(client); server_handle.shutdown(); From 709f4ab1ac7588f71433ec43a86eaa65a59cf066 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 16 Mar 2016 21:46:14 -0700 Subject: [PATCH 23/23] Add spaces between items in impls. --- tarpc/src/transport/mod.rs | 1 + tarpc/src/transport/tcp.rs | 11 +++++++++++ tarpc/src/transport/unix.rs | 8 ++++++++ 3 files changed, 20 insertions(+) diff --git a/tarpc/src/transport/mod.rs b/tarpc/src/transport/mod.rs index 6b367ee..f70b2bb 100644 --- a/tarpc/src/transport/mod.rs +++ b/tarpc/src/transport/mod.rs @@ -66,6 +66,7 @@ impl Dialer for P D: Dialer + 'static { type Stream = D::Stream; + fn dial(&self) -> io::Result { (**self).dial() } diff --git a/tarpc/src/transport/tcp.rs b/tarpc/src/transport/tcp.rs index d015374..2a78b65 100644 --- a/tarpc/src/transport/tcp.rs +++ b/tarpc/src/transport/tcp.rs @@ -7,6 +7,7 @@ pub struct TcpTransport(pub A); impl super::Transport for TcpTransport { type Listener = TcpListener; + fn bind(&self) -> io::Result { TcpListener::bind(&self.0) } @@ -14,6 +15,7 @@ impl super::Transport for TcpTransport { impl super::Transport for A { type Listener = TcpListener; + fn bind(&self) -> io::Result { TcpListener::bind(self) } @@ -21,10 +23,13 @@ impl super::Transport for A { impl super::Listener for TcpListener { type Dialer = TcpDialer; + type Stream = TcpStream; + fn accept(&self) -> io::Result { self.accept().map(|(stream, _)| stream) } + fn dialer(&self) -> io::Result> { self.local_addr().map(|addr| TcpDialer(addr)) } @@ -34,12 +39,15 @@ impl super::Stream for TcpStream { fn try_clone(&self) -> io::Result { self.try_clone() } + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { self.set_read_timeout(dur) } + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { self.set_write_timeout(dur) } + fn shutdown(&self) -> io::Result<()> { self.shutdown(::std::net::Shutdown::Both) } @@ -52,12 +60,15 @@ impl super::Dialer for TcpDialer where A: ToSocketAddrs { type Stream = TcpStream; + fn dial(&self) -> io::Result { TcpStream::connect(&self.0) } } + impl super::Dialer for str { type Stream = TcpStream; + fn dial(&self) -> io::Result { TcpStream::connect(self) } diff --git a/tarpc/src/transport/unix.rs b/tarpc/src/transport/unix.rs index 328053a..9a4b590 100644 --- a/tarpc/src/transport/unix.rs +++ b/tarpc/src/transport/unix.rs @@ -10,6 +10,7 @@ impl

super::Transport for UnixTransport

where P: AsRef { type Listener = UnixListener; + fn bind(&self) -> io::Result { UnixListener::bind(&self.0) } @@ -22,6 +23,7 @@ impl

super::Dialer for UnixDialer

where P: AsRef { type Stream = UnixStream; + fn dial(&self) -> io::Result { UnixStream::connect(&self.0) } @@ -29,10 +31,13 @@ impl

super::Dialer for UnixDialer

impl super::Listener for UnixListener { type Stream = UnixStream; + type Dialer = UnixDialer; + fn accept(&self) -> io::Result { self.accept().map(|(stream, _)| stream) } + fn dialer(&self) -> io::Result> { self.local_addr().and_then(|addr| { match addr.as_pathname() { @@ -50,12 +55,15 @@ impl super::Stream for UnixStream { fn try_clone(&self) -> io::Result { self.try_clone() } + fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { self.set_read_timeout(timeout) } + fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { self.set_write_timeout(timeout) } + fn shutdown(&self) -> io::Result<()> { self.shutdown(::std::net::Shutdown::Both) }