Merge pull request #20 from tikue/config

Merge tikue/config into master.
This commit is contained in:
shaladdle
2016-02-19 23:16:15 -08:00
9 changed files with 168 additions and 118 deletions

View File

@@ -45,7 +45,7 @@ impl hello_service::Service for HelloService {
}
fn main() {
let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap();
let server_handle = HelloService.spawn("0.0.0.0:0").unwrap();
let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap();
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
drop(client);
@@ -53,14 +53,12 @@ fn main() {
}
```
The `service!` macro expands to a collection of items that collectively form an
rpc service. In the above example, the macro is called within the
`hello_service` module. This module will contain a `Client` type, a `Service`
trait, and a `serve` function. `serve` can be used to start a server listening
on a tcp port. A `Client` (or `AsyncClient`) can connect to such a service. Any
type implementing the `Service` trait can be passed to `serve`. These generated
types are specific to the echo service, and make it easy and ergonomic to write
servers without dealing with sockets or serialization directly. See the
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
above example, the macro is called within the `hello_service` module. This module will contain a
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp
port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it
easy and ergonomic to write servers without dealing with sockets or serialization directly. See the
tarpc_examples package for more sophisticated examples.
## Documentation
@@ -69,6 +67,7 @@ items expanded by a `service!` invocation.
## Additional Features
- Concurrent requests from a single client.
- Any type that `impl`s `serde`'s Serialize` and `Deserialize` can be used in the rpc signatures.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
methods as well as on the `Client`'s stub methods.
- Just like regular fns, the return type can be left off when it's `-> ()`.

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.2.0"
version = "0.3.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://google.github.io/tarpc"

View File

@@ -8,7 +8,7 @@
//! Example usage:
//!
//! ```
//! # #[macro_use] extern crate tarpc;
//! #[macro_use] extern crate tarpc;
//! mod my_server {
//! service! {
//! rpc hello(name: String) -> String;
@@ -31,11 +31,8 @@
//!
//! fn main() {
//! let addr = "127.0.0.1:9000";
//! let shutdown = my_server::serve(addr,
//! Server,
//! Some(Duration::from_secs(30)))
//! .unwrap();
//! let client = Client::new(addr, None).unwrap();
//! let shutdown = Server.spawn(addr).unwrap();
//! let client = Client::new(addr).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(),
//! client.hello("Mom".to_string()).unwrap());
@@ -63,4 +60,4 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
pub use protocol::{Error, Result, ServeHandle};
pub use protocol::{Config, Error, Result, ServeHandle};

View File

@@ -38,7 +38,10 @@ macro_rules! client_methods {
if let __Reply::$fn_name(reply) = reply {
::std::result::Result::Ok(reply)
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, \
but got {:?}",
stringify!($fn_name),
reply);
}
}
)*);
@@ -76,7 +79,10 @@ macro_rules! async_client_methods {
if let __Reply::$fn_name(reply) = reply {
reply
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, but got \
{:?}",
stringify!($fn_name),
reply);
}
}
let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*)));
@@ -212,17 +218,22 @@ macro_rules! impl_deserialize {
/// # }
/// ```
///
/// There are two rpc names reserved for the default fns `spawn` and `spawn_with_config`.
///
/// Attributes can be attached to each rpc. These attributes
/// will then be attached to the generated `Service` trait's
/// corresponding method, as well as to the `Client` stub's rpcs methods.
///
/// The following items are expanded in the enclosing module:
///
/// * `Service` -- the trait defining the RPC service
/// * `Service` -- the trait defining the RPC service. It comes with two default methods for
/// starting the server:
/// 1. `spawn` starts the service in another thread using default configuration.
/// 2. `spawn_with_config` starts the service in another thread using the specified
/// `Config`.
/// * `Client` -- a client that makes synchronous requests to the RPC server
/// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server
/// * `Future` -- a handle for asynchronously retrieving the result of an RPC
/// * `serve` -- the function that starts the RPC server
///
/// **Warning**: In addition to the above items, there are a few expanded items that
/// are considered implementation details. As with the above items, shadowing
@@ -293,15 +304,34 @@ macro_rules! service_inner {
)*
) => {
#[doc="Defines the RPC service"]
pub trait Service: Send + Sync {
pub trait Service: Send + Sync + Sized {
$(
$(#[$attr])*
fn $fn_name(&self, $($arg:$in_),*) -> $out;
)*
#[doc="Spawn a running service."]
fn spawn<A>(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
Self: 'static,
{
self.spawn_with_config(addr, $crate::Config::default())
}
#[doc="Spawn a running service."]
fn spawn_with_config<A>(self, addr: A, config: $crate::Config)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
Self: 'static,
{
let server = ::std::sync::Arc::new(__Server(self));
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config));
::std::result::Result::Ok(handle)
}
}
impl<P, S> Service for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + Sized + 'static + ::std::ops::Deref<Target=S>,
S: Service
{
$(
@@ -334,14 +364,14 @@ macro_rules! service_inner {
impl_serialize!(__Reply, $($fn_name($out))*);
impl_deserialize!(__Reply, $($fn_name($out))*);
/// An asynchronous RPC call
#[doc="An asynchronous RPC call"]
pub struct Future<T> {
future: $crate::protocol::Future<__Reply>,
mapper: fn(__Reply) -> T,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
#[doc="Block until the result of the RPC call is available"]
pub fn get(self) -> $crate::Result<T> {
self.future.get().map(self.mapper)
}
@@ -351,12 +381,20 @@ macro_rules! service_inner {
pub struct Client($crate::protocol::Client<__Request, __Reply>);
impl Client {
#[doc="Create a new client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<Self>
#[doc="Create a new client with default configuration that connects to the given \
address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
Self::with_config(addr, $crate::Config::default())
}
#[doc="Create a new client with the specified configuration that connects to the \
given address."]
pub fn with_config<A>(addr: A, config: $crate::Config) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::with_config(addr, config));
::std::result::Result::Ok(Client(inner))
}
@@ -378,12 +416,20 @@ macro_rules! service_inner {
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
impl AsyncClient {
#[doc="Create a new asynchronous client with default configuration that connects to \
the given address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
Self::with_config(addr, $crate::Config::default())
}
#[doc="Create a new asynchronous client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
pub fn with_config<A>(addr: A, config: $crate::Config)
-> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
let inner = try!($crate::protocol::Client::with_config(addr, config));
::std::result::Result::Ok(AsyncClient(inner))
}
@@ -417,18 +463,6 @@ macro_rules! service_inner {
}
}
}
#[doc="Start a running service."]
pub fn serve<A, S>(addr: A,
service: S,
read_timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
S: 'static + Service
{
let server = ::std::sync::Arc::new(__Server(service));
::std::result::Result::Ok(try!($crate::protocol::serve_async(addr, server, read_timeout)))
}
}
}
@@ -463,11 +497,6 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
extern crate env_logger;
use std::time::Duration;
fn test_timeout() -> Option<Duration> {
Some(Duration::from_secs(5))
}
service! {
rpc add(x: i32, y: i32) -> i32;
@@ -488,8 +517,8 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = Client::new(handle.local_addr()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
drop(client);
@@ -499,8 +528,8 @@ mod functional_test {
#[test]
fn simple_async() {
let _ = env_logger::init();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = AsyncClient::new(handle.local_addr()).unwrap();
assert_eq!(3, client.add(1, 2).get().unwrap());
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
drop(client);
@@ -509,8 +538,8 @@ mod functional_test {
#[test]
fn try_clone() {
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client1 = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = Client::new(handle.local_addr()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
@@ -518,8 +547,8 @@ mod functional_test {
#[test]
fn async_try_clone() {
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client1 = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = AsyncClient::new(handle.local_addr()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
@@ -528,7 +557,7 @@ mod functional_test {
// Tests that a server can be wrapped in an Arc; no need to run, just compile
#[allow(dead_code)]
fn serve_arc_server() {
let _ = serve("localhost:0", ::std::sync::Arc::new(Server), None);
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
}
#[test]

View File

@@ -12,9 +12,8 @@ use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use std::time::Duration;
use super::{Deserialize, Error, Packet, Result, Serialize};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
@@ -33,10 +32,16 @@ impl<Request, Reply> Client<Request, Reply>
{
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Self::with_config(addr, Config::default())
}
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn with_config<A: ToSocketAddrs>(addr: A, config: Config) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
try!(stream.set_read_timeout(timeout));
try!(stream.set_write_timeout(timeout));
try!(stream.set_read_timeout(config.timeout));
try!(stream.set_write_timeout(config.timeout));
let reader_stream = try!(stream.try_clone());
let writer_stream = try!(stream.try_clone());
let requests = Arc::new(Mutex::new(RpcFutures::new()));

View File

@@ -9,6 +9,7 @@ use serde;
use std::io::{self, Read, Write};
use std::convert;
use std::sync::Arc;
use std::time::Duration;
mod client;
mod server;
@@ -16,7 +17,7 @@ mod packet;
pub use self::packet::Packet;
pub use self::client::{Client, Future};
pub use self::server::{Serve, ServeHandle, serve_async};
pub use self::server::{Serve, ServeHandle};
/// Client errors that can occur during rpc calls
#[derive(Debug, Clone)]
@@ -54,6 +55,13 @@ impl convert::From<io::Error> for Error {
}
}
/// Configuration for client and server.
#[derive(Debug, Default)]
pub struct Config {
/// Request/Response timeout between packet delivery.
pub timeout: Option<Duration>,
}
/// Return type of rpc calls: either the successful return value, or a client error.
pub type Result<T> = ::std::result::Result<T, Error>;
@@ -78,7 +86,7 @@ impl<W: Write> Serialize for W {}
#[cfg(test)]
mod test {
extern crate env_logger;
use super::{Client, Serve, serve_async};
use super::{Client, Config, Serve};
use scoped_pool::Pool;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
@@ -118,8 +126,8 @@ mod test {
fn handle() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr(), None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap();
drop(client);
serve_handle.shutdown();
}
@@ -128,10 +136,10 @@ mod test {
fn simple() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
assert_eq!(0, client.rpc(()).unwrap());
assert_eq!(1, server.count());
assert_eq!(1, client.rpc(()).unwrap());
@@ -171,9 +179,13 @@ mod test {
fn force_shutdown() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, Some(Duration::new(0, 10))).unwrap();
let serve_handle = server.spawn_with_config("localhost:0",
Config {
timeout: Some(Duration::new(0, 10))
})
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
thread.join().unwrap();
@@ -183,9 +195,13 @@ mod test {
fn client_failed_rpc() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap();
let serve_handle = server.spawn_with_config("localhost:0",
Config {
timeout: test_timeout(),
})
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr, None).unwrap());
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr).unwrap());
client.rpc(()).unwrap();
serve_handle.shutdown();
match client.rpc(()) {
@@ -201,9 +217,9 @@ mod test {
let concurrency = 10;
let pool = Pool::new(concurrency);
let server = Arc::new(BarrierServer::new(concurrency));
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
pool.scoped(|scope| {
for _ in 0..concurrency {
let client = client.try_clone().unwrap();
@@ -221,9 +237,9 @@ mod test {
fn async() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
// Drop future immediately; does the reader channel panic when sending?
client.rpc_async(());

View File

@@ -32,7 +32,7 @@ struct MapVisitor<'a, T: 'a> {
state: u8,
}
impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
#[inline]
fn visit<S>(&mut self, serializer: &mut S) -> Result<Option<()>, S::Error>
where S: Serializer
@@ -46,9 +46,7 @@ impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
}
_ => {
Ok(None)
}
_ => Ok(None),
}
}

View File

@@ -12,7 +12,7 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::{Deserialize, Error, Packet, Result, Serialize};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
struct ConnectionHandler<'a, S>
where S: Serve
@@ -201,41 +201,8 @@ impl<'a, S> Drop for Server<'a, S> {
}
}
/// Start
pub fn serve_async<A, S>(addr: A,
server: S,
read_timeout: Option<Duration>)
-> io::Result<ServeHandle>
where A: ToSocketAddrs,
S: 'static + Serve
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("serve_async: spinning up server on {:?}", addr);
let (die_tx, die_rx) = channel();
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &server,
listener: listener,
read_timeout: read_timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
})
}
/// A service provided by a server
pub trait Serve: Send + Sync {
pub trait Serve: Send + Sync + Sized {
/// The type of request received by the server
type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
/// The type of reply sent by the server
@@ -243,10 +210,49 @@ pub trait Serve: Send + Sync {
/// Return a reply for a given request
fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn
fn spawn<A>(self, addr: A) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
Self: 'static,
{
self.spawn_with_config(addr, Config::default())
}
/// spawn
fn spawn_with_config<A>(self, addr: A, config: Config) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
Self: 'static,
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("spawn_with_config: spinning up server on {:?}", addr);
let (die_tx, die_rx) = channel();
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &self,
listener: listener,
read_timeout: config.timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
})
}
}
impl<P, S> Serve for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + ::std::ops::Deref<Target = S>,
S: Serve
{
type Request = S::Request;

View File

@@ -37,12 +37,12 @@ mod benchmark {
// Prevents resource exhaustion when benching
lazy_static! {
static ref HANDLE: Arc<Mutex<ServeHandle>> = {
let handle = serve("localhost:0", HelloServer, None).unwrap();
let handle = HelloServer.spawn("localhost:0").unwrap();
Arc::new(Mutex::new(handle))
};
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
let addr = HANDLE.lock().unwrap().local_addr().clone();
let client = AsyncClient::new(addr, None).unwrap();
let client = AsyncClient::new(addr).unwrap();
Arc::new(Mutex::new(client))
};
}