diff --git a/README.md b/README.md index 74fca06..f025a89 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ impl hello_service::Service for HelloService { } fn main() { - let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap(); + let server_handle = HelloService.spawn("0.0.0.0:0").unwrap(); let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap(); assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap()); drop(client); diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index c46be08..efb2684 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -8,7 +8,7 @@ //! Example usage: //! //! ``` -//! # #[macro_use] extern crate tarpc; +//! #[macro_use] extern crate tarpc; //! mod my_server { //! service! { //! rpc hello(name: String) -> String; @@ -31,11 +31,8 @@ //! //! fn main() { //! let addr = "127.0.0.1:9000"; -//! let shutdown = my_server::serve(addr, -//! Server, -//! Some(Duration::from_secs(30))) -//! .unwrap(); -//! let client = Client::new(addr, None).unwrap(); +//! let shutdown = Server.spawn(addr).unwrap(); +//! let client = Client::new(addr).unwrap(); //! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!("Hello, Mom!".to_string(), //! client.hello("Mom".to_string()).unwrap()); @@ -63,4 +60,4 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -pub use protocol::{Error, Result, ServeHandle}; +pub use protocol::{Config, Error, Result, ServeHandle}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index e2ec2ba..b492f5b 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -38,7 +38,10 @@ macro_rules! client_methods { if let __Reply::$fn_name(reply) = reply { ::std::result::Result::Ok(reply) } else { - panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply); + panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, \ + but got {:?}", + stringify!($fn_name), + reply); } } )*); @@ -76,7 +79,10 @@ macro_rules! async_client_methods { if let __Reply::$fn_name(reply) = reply { reply } else { - panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply); + panic!("Incorrect reply variant returned from protocol::Clientrpc; expected \ + `{}`, but got {:?}", + stringify!($fn_name), + reply); } } let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*))); @@ -222,7 +228,7 @@ macro_rules! impl_deserialize { /// * `Client` -- a client that makes synchronous requests to the RPC server /// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server /// * `Future` -- a handle for asynchronously retrieving the result of an RPC -/// * `serve` -- the function that starts the RPC server +/// * `serve` -- the function that spawns the RPC server /// /// **Warning**: In addition to the above items, there are a few expanded items that /// are considered implementation details. As with the above items, shadowing @@ -293,15 +299,32 @@ macro_rules! service_inner { )* ) => { #[doc="Defines the RPC service"] - pub trait Service: Send + Sync { + pub trait Service: Send + Sync + Sized + 'static { $( $(#[$attr])* fn $fn_name(&self, $($arg:$in_),*) -> $out; )* + + #[doc="Spawn a running service."] + fn spawn(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle> + where A: ::std::net::ToSocketAddrs, + { + self.spawn_with_config(addr, $crate::Config::default()) + } + + #[doc="Spawn a running service."] + fn spawn_with_config(self, addr: A, config: $crate::Config) + -> $crate::Result<$crate::protocol::ServeHandle> + where A: ::std::net::ToSocketAddrs, + { + let server = ::std::sync::Arc::new(__Server(self)); + let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config)); + ::std::result::Result::Ok(handle) + } } impl Service for P - where P: Send + Sync + ::std::ops::Deref, + where P: Send + Sync + Sized + 'static + ::std::ops::Deref, S: Service { $( @@ -334,14 +357,14 @@ macro_rules! service_inner { impl_serialize!(__Reply, $($fn_name($out))*); impl_deserialize!(__Reply, $($fn_name($out))*); - /// An asynchronous RPC call + #[doc="An asynchronous RPC call"] pub struct Future { future: $crate::protocol::Future<__Reply>, mapper: fn(__Reply) -> T, } impl Future { - /// Block until the result of the RPC call is available + #[doc="Block until the result of the RPC call is available"] pub fn get(self) -> $crate::Result { self.future.get().map(self.mapper) } @@ -351,12 +374,20 @@ macro_rules! service_inner { pub struct Client($crate::protocol::Client<__Request, __Reply>); impl Client { - #[doc="Create a new client that connects to the given address."] - pub fn new(addr: A, timeout: ::std::option::Option<::std::time::Duration>) - -> $crate::Result + #[doc="Create a new client with default configuration that connects to the given \ + address."] + pub fn new(addr: A) -> $crate::Result where A: ::std::net::ToSocketAddrs, { - let inner = try!($crate::protocol::Client::new(addr, timeout)); + Self::with_config(addr, $crate::Config::default()) + } + + #[doc="Create a new client with the specified configuration that connects to the \ + given address."] + pub fn with_config(addr: A, config: $crate::Config) -> $crate::Result + where A: ::std::net::ToSocketAddrs, + { + let inner = try!($crate::protocol::Client::with_config(addr, config)); ::std::result::Result::Ok(Client(inner)) } @@ -378,12 +409,20 @@ macro_rules! service_inner { pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>); impl AsyncClient { + #[doc="Create a new asynchronous client with default configuration that connects to \ + the given address."] + pub fn new(addr: A) -> $crate::Result + where A: ::std::net::ToSocketAddrs, + { + Self::with_config(addr, $crate::Config::default()) + } + #[doc="Create a new asynchronous client that connects to the given address."] - pub fn new(addr: A, timeout: ::std::option::Option<::std::time::Duration>) + pub fn with_config(addr: A, config: $crate::Config) -> $crate::Result where A: ::std::net::ToSocketAddrs, { - let inner = try!($crate::protocol::Client::new(addr, timeout)); + let inner = try!($crate::protocol::Client::with_config(addr, config)); ::std::result::Result::Ok(AsyncClient(inner)) } @@ -417,18 +456,6 @@ macro_rules! service_inner { } } } - - #[doc="Start a running service."] - pub fn serve(addr: A, - service: S, - read_timeout: ::std::option::Option<::std::time::Duration>) - -> $crate::Result<$crate::protocol::ServeHandle> - where A: ::std::net::ToSocketAddrs, - S: 'static + Service - { - let server = ::std::sync::Arc::new(__Server(service)); - ::std::result::Result::Ok(try!($crate::protocol::serve_async(addr, server, read_timeout))) - } } } @@ -463,11 +490,6 @@ mod syntax_test { #[cfg(test)] mod functional_test { extern crate env_logger; - use std::time::Duration; - - fn test_timeout() -> Option { - Some(Duration::from_secs(5)) - } service! { rpc add(x: i32, y: i32) -> i32; @@ -488,8 +510,8 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = serve("localhost:0", Server, test_timeout()).unwrap(); - let client = Client::new(handle.local_addr(), None).unwrap(); + let handle = Server.spawn("localhost:0").unwrap(); + let client = Client::new(handle.local_addr()).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap()); drop(client); @@ -499,8 +521,8 @@ mod functional_test { #[test] fn simple_async() { let _ = env_logger::init(); - let handle = serve("localhost:0", Server, test_timeout()).unwrap(); - let client = AsyncClient::new(handle.local_addr(), None).unwrap(); + let handle = Server.spawn("localhost:0").unwrap(); + let client = AsyncClient::new(handle.local_addr()).unwrap(); assert_eq!(3, client.add(1, 2).get().unwrap()); assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap()); drop(client); @@ -509,8 +531,8 @@ mod functional_test { #[test] fn try_clone() { - let handle = serve("localhost:0", Server, test_timeout()).unwrap(); - let client1 = Client::new(handle.local_addr(), None).unwrap(); + let handle = Server.spawn("localhost:0").unwrap(); + let client1 = Client::new(handle.local_addr()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).unwrap()); assert_eq!(3, client2.add(1, 2).unwrap()); @@ -518,8 +540,8 @@ mod functional_test { #[test] fn async_try_clone() { - let handle = serve("localhost:0", Server, test_timeout()).unwrap(); - let client1 = AsyncClient::new(handle.local_addr(), None).unwrap(); + let handle = Server.spawn("localhost:0").unwrap(); + let client1 = AsyncClient::new(handle.local_addr()).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap()); @@ -528,7 +550,7 @@ mod functional_test { // Tests that a server can be wrapped in an Arc; no need to run, just compile #[allow(dead_code)] fn serve_arc_server() { - let _ = serve("localhost:0", ::std::sync::Arc::new(Server), None); + let _ = ::std::sync::Arc::new(Server).spawn("localhost:0"); } #[test] diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index f6098ce..edb0d36 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -12,9 +12,8 @@ use std::net::{TcpStream, ToSocketAddrs}; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; -use std::time::Duration; -use super::{Deserialize, Error, Packet, Result, Serialize}; +use super::{Config, Deserialize, Error, Packet, Result, Serialize}; /// A client stub that connects to a server to run rpcs. pub struct Client @@ -33,10 +32,16 @@ impl Client { /// Create a new client that connects to `addr`. The client uses the given timeout /// for both reads and writes. - pub fn new(addr: A, timeout: Option) -> io::Result { + pub fn new(addr: A) -> io::Result { + Self::with_config(addr, Config::default()) + } + + /// Create a new client that connects to `addr`. The client uses the given timeout + /// for both reads and writes. + pub fn with_config(addr: A, config: Config) -> io::Result { let stream = try!(TcpStream::connect(addr)); - try!(stream.set_read_timeout(timeout)); - try!(stream.set_write_timeout(timeout)); + try!(stream.set_read_timeout(config.timeout)); + try!(stream.set_write_timeout(config.timeout)); let reader_stream = try!(stream.try_clone()); let writer_stream = try!(stream.try_clone()); let requests = Arc::new(Mutex::new(RpcFutures::new())); diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 6995700..aad3673 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -9,6 +9,7 @@ use serde; use std::io::{self, Read, Write}; use std::convert; use std::sync::Arc; +use std::time::Duration; mod client; mod server; @@ -16,7 +17,7 @@ mod packet; pub use self::packet::Packet; pub use self::client::{Client, Future}; -pub use self::server::{Serve, ServeHandle, serve_async}; +pub use self::server::{Serve, ServeHandle}; /// Client errors that can occur during rpc calls #[derive(Debug, Clone)] @@ -54,6 +55,13 @@ impl convert::From for Error { } } +/// Configuration for client and server. +#[derive(Clone, Copy, Debug, Default, Hash, Eq, Ord, PartialEq, PartialOrd)] +pub struct Config { + /// Request/Response timeout between packet delivery. + pub timeout: Option, +} + /// Return type of rpc calls: either the successful return value, or a client error. pub type Result = ::std::result::Result; @@ -78,7 +86,7 @@ impl Serialize for W {} #[cfg(test)] mod test { extern crate env_logger; - use super::{Client, Serve, serve_async}; + use super::{Client, Config, Serve}; use scoped_pool::Pool; use std::sync::{Arc, Barrier, Mutex}; use std::thread; @@ -118,8 +126,8 @@ mod test { fn handle() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap(); - let client: Client<(), u64> = Client::new(serve_handle.local_addr(), None).unwrap(); + let serve_handle = server.spawn("localhost:0").unwrap(); + let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap(); drop(client); serve_handle.shutdown(); } @@ -128,10 +136,10 @@ mod test { fn simple() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap(); + let serve_handle = server.clone().spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); // The explicit type is required so that it doesn't deserialize a u32 instead of u64 - let client: Client<(), u64> = Client::new(addr, None).unwrap(); + let client: Client<(), u64> = Client::new(addr).unwrap(); assert_eq!(0, client.rpc(()).unwrap()); assert_eq!(1, server.count()); assert_eq!(1, client.rpc(()).unwrap()); @@ -171,9 +179,13 @@ mod test { fn force_shutdown() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = serve_async("localhost:0", server, Some(Duration::new(0, 10))).unwrap(); + let serve_handle = server.spawn_with_config("localhost:0", + Config { + timeout: Some(Duration::new(0, 10)) + }) + .unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr, None).unwrap(); + let client: Client<(), u64> = Client::new(addr).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(())); thread.join().unwrap(); @@ -183,9 +195,13 @@ mod test { fn client_failed_rpc() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap(); + let serve_handle = server.spawn_with_config("localhost:0", + Config { + timeout: test_timeout(), + }) + .unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); + let client: Arc> = Arc::new(Client::new(addr).unwrap()); client.rpc(()).unwrap(); serve_handle.shutdown(); match client.rpc(()) { @@ -201,9 +217,9 @@ mod test { let concurrency = 10; let pool = Pool::new(concurrency); let server = Arc::new(BarrierServer::new(concurrency)); - let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap(); + let serve_handle = server.clone().spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr, None).unwrap(); + let client: Client<(), u64> = Client::new(addr).unwrap(); pool.scoped(|scope| { for _ in 0..concurrency { let client = client.try_clone().unwrap(); @@ -221,9 +237,9 @@ mod test { fn async() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = serve_async("localhost:0", server.clone(), None).unwrap(); + let serve_handle = server.spawn("localhost:0").unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Client<(), u64> = Client::new(addr, None).unwrap(); + let client: Client<(), u64> = Client::new(addr).unwrap(); // Drop future immediately; does the reader channel panic when sending? client.rpc_async(()); diff --git a/tarpc/src/protocol/packet.rs b/tarpc/src/protocol/packet.rs index 7ddbd15..33df794 100644 --- a/tarpc/src/protocol/packet.rs +++ b/tarpc/src/protocol/packet.rs @@ -32,7 +32,7 @@ struct MapVisitor<'a, T: 'a> { state: u8, } -impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> { +impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> { #[inline] fn visit(&mut self, serializer: &mut S) -> Result, S::Error> where S: Serializer @@ -46,9 +46,7 @@ impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> { self.state += 1; Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message)))) } - _ => { - Ok(None) - } + _ => Ok(None), } } diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index a6db662..0631712 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -12,7 +12,7 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; -use super::{Deserialize, Error, Packet, Result, Serialize}; +use super::{Config, Deserialize, Error, Packet, Result, Serialize}; struct ConnectionHandler<'a, S> where S: Serve @@ -201,41 +201,8 @@ impl<'a, S> Drop for Server<'a, S> { } } -/// Start -pub fn serve_async(addr: A, - server: S, - read_timeout: Option) - -> io::Result - where A: ToSocketAddrs, - S: 'static + Serve -{ - let listener = try!(TcpListener::bind(&addr)); - let addr = try!(listener.local_addr()); - info!("serve_async: spinning up server on {:?}", addr); - let (die_tx, die_rx) = channel(); - let join_handle = thread::spawn(move || { - let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads - let shutdown = AtomicBool::new(false); - let server = Server { - server: &server, - listener: listener, - read_timeout: read_timeout, - die_rx: die_rx, - shutdown: &shutdown, - }; - pool.scoped(|scope| { - server.serve(scope); - }); - }); - Ok(ServeHandle { - tx: die_tx, - join_handle: join_handle, - addr: addr.clone(), - }) -} - /// A service provided by a server -pub trait Serve: Send + Sync { +pub trait Serve: Send + Sync + Sized { /// The type of request received by the server type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send; /// The type of reply sent by the server @@ -243,10 +210,49 @@ pub trait Serve: Send + Sync { /// Return a reply for a given request fn serve(&self, request: Self::Request) -> Self::Reply; + + /// spawn + fn spawn(self, addr: A) -> io::Result + where A: ToSocketAddrs, + Self: 'static, + { + self.spawn_with_config(addr, Config::default()) + } + + /// spawn + fn spawn_with_config(self, addr: A, config: Config) -> io::Result + where A: ToSocketAddrs, + Self: 'static, + { + let listener = try!(TcpListener::bind(&addr)); + let addr = try!(listener.local_addr()); + info!("spawn_with_config: spinning up server on {:?}", addr); + let (die_tx, die_rx) = channel(); + let join_handle = thread::spawn(move || { + let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads + let shutdown = AtomicBool::new(false); + let server = Server { + server: &self, + listener: listener, + read_timeout: config.timeout, + die_rx: die_rx, + shutdown: &shutdown, + }; + pool.scoped(|scope| { + server.serve(scope); + }); + }); + Ok(ServeHandle { + tx: die_tx, + join_handle: join_handle, + addr: addr.clone(), + }) + } + } impl Serve for P - where P: Send + Sync + ::std::ops::Deref, + where P: Send + Sync + ::std::ops::Deref, S: Serve { type Request = S::Request;