44 Commits

Author SHA1 Message Date
Tim Kuehn
2e02f33fc4 Merge branch 'master' of github.com:google/tarpc 2016-07-28 20:49:12 -07:00
Tim Kuehn
d8472dcd1c Update README to reflect latest version. 2016-07-28 20:48:07 -07:00
Tim
2c5846621f Update dependency versions (#43)
* Update dependency versions

* Bump minor version.
2016-07-28 20:46:30 -07:00
Tim Kuehn
6a6157948a Bump minor version. 2016-07-28 20:37:51 -07:00
Tim Kuehn
1c18a3c4fe Update dependency versions 2016-07-28 20:25:46 -07:00
shaladdle
e8ec295e85 Merge pull request #35 from tikue/missing-debug
Add missing Debug impls.
2016-04-24 21:19:42 -07:00
Tim Kuehn
44eec09418 Add missing Debug impls. 2016-04-24 21:06:42 -07:00
shaladdle
fe116a1b6b Merge pull request #34 from tikue/macro-cleanup
Minor macro implementation cleanup.
2016-04-24 20:36:08 -07:00
Tim Kuehn
ec4fa8636b Minor macro implementation cleanup.
* Fold service into service_inner.
* Rename service_inner => service.
2016-04-24 20:02:46 -07:00
shaladdle
2d58340d16 Merge pull request #33 from tikue/bump-version
Bump version to v0.5.0
2016-04-24 19:37:04 -07:00
Tim Kuehn
801f09e9e6 Bump version to v0.5.0 2016-04-24 19:18:42 -07:00
shaladdle
6ce3a3d943 Merge pull request #27 from tikue/listener
[Breaking] Add support for arbitrary transports.
2016-04-24 18:39:27 -07:00
Tim Kuehn
4d636d2882 Merge master into listener. 2016-04-24 18:28:30 -07:00
shaladdle
3693c95a67 Merge pull request #32 from tikue/update-releases
Updates to docs and pre-push
2016-04-24 17:57:28 -07:00
Tim
43a2df4a13 Make version of serde explicit in release notes 2016-04-24 17:56:41 -07:00
Tim Kuehn
166f1523d6 Update version in README 2016-04-24 17:52:08 -07:00
Tim Kuehn
1cc8cbcdc3 Update pre-push to use rustup in lieu of multirust, because rustup is #thefuture. 2016-04-24 17:48:29 -07:00
Tim Kuehn
9dafc704e9 Update RELEASES.md 2016-04-24 17:19:53 -07:00
Tim Kuehn
709f4ab1ac Add spaces between items in impls. 2016-03-16 21:46:14 -07:00
Tim Kuehn
bbfb4325d2 Simplify readme example 2016-03-16 20:49:38 -07:00
Tim Kuehn
f33cb3bd53 Add a line between impl and struct 2016-03-16 20:46:23 -07:00
Tim Kuehn
6a6832cfbc Generify doc comment 2016-03-16 20:45:55 -07:00
Tim Kuehn
b0495ebc00 Cargo fmt 2016-03-16 20:43:36 -07:00
Tim Kuehn
aec1574824 Add a line in between struct and impl 2016-03-16 20:43:22 -07:00
Tim Kuehn
5d27d34bd3 Add a documentation note on addresses 2016-03-16 20:36:54 -07:00
Tim Kuehn
e995acd4c9 Merge branch 'master' into listener 2016-02-27 14:11:16 -08:00
Tim Kuehn
9dcd38c012 Merge branch 'master' into listener 2016-02-25 23:30:22 -08:00
Tim Kuehn
5ac4b710a5 Simplify lib.rs example 2016-02-25 23:30:00 -08:00
Tim Kuehn
72a9f8f70d Update deps versions. 2016-02-25 22:49:22 -08:00
Tim Kuehn
8e5a44b423 Update README to list arbitrary transports as a feature. 2016-02-25 22:28:39 -08:00
Tim Kuehn
714541a7a4 Don't unwrap in Listener::dialer 2016-02-25 01:05:38 -08:00
Tim Kuehn
a1f529f794 Reformat some generic bounds 2016-02-25 00:58:48 -08:00
Tim Kuehn
6273ebefa7 rustfmt 2016-02-25 00:04:35 -08:00
Tim Kuehn
9827f75459 Fix examples 2016-02-24 23:33:03 -08:00
Tim Kuehn
c398e2389b Why were we wrapping the service in an arc? 2016-02-24 23:25:50 -08:00
Tim Kuehn
03dc512e25 Remove Addr associated type of Dialer.
Also, make spawn() take a Dialer, but impl Dialer for str, defaulting to TCP transport.
2016-02-24 21:59:21 -08:00
Tim Kuehn
8307c708a3 Better documentation for Stream.
Basically copied from TcpStream verbatim.
2016-02-24 20:32:15 -08:00
Tim Kuehn
774411c636 Create temp file using tempdir in test 2016-02-24 20:26:49 -08:00
Tim Kuehn
d5b2f23f74 Move generic bounds to where clause 2016-02-23 08:26:56 -08:00
Tim Kuehn
396aec3c2f Add a test 2016-02-23 01:53:20 -08:00
Tim Kuehn
28c6c333e5 Reorgnize modules 2016-02-23 01:13:11 -08:00
Tim Kuehn
2d1a77ec10 Merge branch 'master' of github.com:google/tarpc into listener 2016-02-23 00:09:53 -08:00
Tim Kuehn
a0e6147482 Make Tcp* default types 2016-02-23 00:07:03 -08:00
Tim Kuehn
fcdb0d9375 Add support for arbitrary transports.
Quite a bit of machinery added:
 * Listener trait
 * Dialer trait
 * Stream trait
 * Transport trait
2016-02-22 23:50:34 -08:00
15 changed files with 480 additions and 182 deletions

View File

@@ -23,7 +23,7 @@ function then returns the value produced by that other server.
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.3.0"
tarpc = "0.5.1"
```
## Example
@@ -46,8 +46,9 @@ impl HelloService for HelloServer {
}
fn main() {
let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap();
let client = hello_service::Client::new(server_handle.local_addr()).unwrap();
let addr = "localhost:10000";
let server_handle = HelloServer.spawn(addr).unwrap();
let client = hello_service::Client::new(addr).unwrap();
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
drop(client);
server_handle.shutdown();
@@ -56,17 +57,18 @@ fn main() {
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
above example, the macro is called within the `hello_service` module. This module will contain a
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp
port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it
easy and ergonomic to write servers without dealing with sockets or serialization directly. See the
tarpc_examples package for more sophisticated examples.
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening over an
arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated
types make it easy and ergonomic to write servers without dealing with sockets or serialization
directly. See the tarpc_examples package for more sophisticated examples.
## Documentation
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
## Additional Features
- Connect over any transport that `impl`s the `Transport` trait.
- Concurrent requests from a single client.
- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait

View File

@@ -1,4 +1,24 @@
## 1.3 (2016-02-20)
## 0.5 (2016-04-24)
### Breaking Changes
0.5 adds support for arbitrary transports via the
[`Transport`](tarpc/src/transport/mod.rs#L7) trait.
Out of the box tarpc provides implementations for:
* Tcp, for types `impl`ing `ToSocketAddrs`.
* Unix sockets via the `UnixTransport` type.
This was a breaking change: `handler.local_addr()` was renamed
`handler.dialer()`.
## 0.4 (2016-04-02)
### Breaking Changes
* Updated to the latest version of serde, 0.7.0. Because tarpc exposes serde in
its API, this forces downstream code to update to the latest version of
serde, as well.
## 0.3 (2016-02-20)
### Breaking Changes
* The timeout arg to `serve` was replaced with a `Config` struct, which

View File

@@ -8,7 +8,7 @@
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
# root.
#
# This hook runs tests to make sure only working code is being pushed. If present, multirust is used
# This hook runs tests to make sure only working code is being pushed. If present, rustup is used
# to build and test the code on the appropriate toolchains. The working copy must not contain
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
#
@@ -67,7 +67,7 @@ run_cargo() {
fi
if [ "$3" != "" ]; then
printf "${PREFIX} $VERB $2 on $3 ... "
multirust run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
else
printf "${PREFIX} $VERB $2 ... "
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
@@ -83,7 +83,7 @@ run_cargo() {
TOOLCHAIN_RESULT=0
check_toolchain() {
printf "${PREFIX} Checking for $1 toolchain ... "
if [[ $(multirust list-toolchain) =~ $1 ]]; then
if [[ $(rustup toolchain list) =~ $1 ]]; then
printf "${SUCCESS}\n"
else
TOOLCHAIN_RESULT=1
@@ -92,8 +92,8 @@ check_toolchain() {
fi
}
printf "${PREFIX} Checking for multirust ... "
command -v multirust &>/dev/null
printf "${PREFIX} Checking for rustup ... "
command -v rustup &>/dev/null
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
printf "${SUCCESS}\n"

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.4.0"
version = "0.5.1"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://google.github.io/tarpc"
@@ -11,11 +11,13 @@ readme = "../README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[dependencies]
bincode = "^0.5"
log = "^0.3"
scoped-pool = "^0.1"
serde = "^0.7"
bincode = "0.6"
log = "0.3"
scoped-pool = "1.0"
serde = "0.8"
unix_socket = "0.5"
[dev-dependencies]
lazy_static = "^0.1"
env_logger = "^0.3"
lazy_static = "0.2"
env_logger = "0.3"
tempdir = "0.3"

View File

@@ -30,14 +30,13 @@
//! }
//!
//! fn main() {
//! let addr = "127.0.0.1:9000";
//! let shutdown = Server.spawn(addr).unwrap();
//! let client = Client::new(addr).unwrap();
//! let serve_handle = Server.spawn("localhost:0").unwrap();
//! let client = Client::new(serve_handle.dialer()).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(),
//! client.hello("Mom".to_string()).unwrap());
//! drop(client);
//! shutdown.shutdown();
//! serve_handle.shutdown();
//! }
//! ```
@@ -48,6 +47,7 @@ extern crate bincode;
#[macro_use]
extern crate log;
extern crate scoped_pool;
extern crate unix_socket;
macro_rules! pos {
() => (concat!(file!(), ":", line!()))
@@ -60,4 +60,7 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
/// Provides transport traits and implementations.
pub mod transport;
pub use protocol::{Config, Error, Result, ServeHandle};

View File

@@ -250,19 +250,21 @@ macro_rules! impl_deserialize {
/// * `__Reply` -- an implementation detail
#[macro_export]
macro_rules! service {
// Entry point
(
$( $tokens:tt )*
$(
$(#[$attr:meta])*
rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) $(-> $out:ty)*;
)*
) => {
service_inner! {{
$( $tokens )*
service! {{
$(
$(#[$attr])*
rpc $fn_name( $( $arg : $in_ ),* ) $(-> $out)*;
)*
}}
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! service_inner {
// Pattern for when the next rpc has an implicit unit return type
};
// Pattern for when the next rpc has an implicit unit return type
(
{
$(#[$attr:meta])*
@@ -272,7 +274,7 @@ macro_rules! service_inner {
}
$( $expanded:tt )*
) => {
service_inner! {
service! {
{ $( $unexpanded )* }
$( $expanded )*
@@ -281,7 +283,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
}
};
// Pattern for when the next rpc has an explicit return type
// Pattern for when the next rpc has an explicit return type
(
{
$(#[$attr:meta])*
@@ -291,7 +293,7 @@ macro_rules! service_inner {
}
$( $expanded:tt )*
) => {
service_inner! {
service! {
{ $( $unexpanded )* }
$( $expanded )*
@@ -300,7 +302,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
}
};
// Pattern when all return types have been expanded
// Pattern for when all return types have been expanded
(
{ } // none left to expand
$(
@@ -316,21 +318,30 @@ macro_rules! service_inner {
)*
#[doc="Spawn a running service."]
fn spawn<A>(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
fn spawn<T>(self,
transport: T)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
self.spawn_with_config(addr, $crate::Config::default())
self.spawn_with_config(transport, $crate::Config::default())
}
#[doc="Spawn a running service."]
fn spawn_with_config<A>(self, addr: A, config: $crate::Config)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
fn spawn_with_config<T>(self,
transport: T,
config: $crate::Config)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
let server = ::std::sync::Arc::new(__Server(self));
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config));
let server = __Server(self);
let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
let handle = try!(result);
::std::result::Result::Ok(handle)
}
}
@@ -386,25 +397,29 @@ macro_rules! service_inner {
#[allow(unused)]
#[doc="The client stub that makes RPC calls to the server."]
pub struct Client($crate::protocol::Client<__Request, __Reply>);
pub struct Client<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl Client {
impl<S> Client<S>
where S: $crate::transport::Stream
{
#[allow(unused)]
#[doc="Create a new client with default configuration that connects to the given \
address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
pub fn new<D>(dialer: D) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
Self::with_config(addr, $crate::Config::default())
Self::with_config(dialer, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new client with the specified configuration that connects to the \
given address."]
pub fn with_config<A>(addr: A, config: $crate::Config) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::with_config(addr, config));
let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(Client(inner))
}
@@ -425,25 +440,27 @@ macro_rules! service_inner {
#[allow(unused)]
#[doc="The client stub that makes asynchronous RPC calls to the server."]
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
pub struct AsyncClient<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl AsyncClient {
impl<S> AsyncClient<S>
where S: $crate::transport::Stream {
#[allow(unused)]
#[doc="Create a new asynchronous client with default configuration that connects to \
the given address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
pub fn new<D>(dialer: D) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
Self::with_config(addr, $crate::Config::default())
Self::with_config(dialer, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new asynchronous client that connects to the given address."]
pub fn with_config<A>(addr: A, config: $crate::Config)
-> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::with_config(addr, config));
let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(AsyncClient(inner))
}
@@ -463,7 +480,8 @@ macro_rules! service_inner {
}
#[allow(unused)]
struct __Server<S: 'static + Service>(S);
struct __Server<S>(S)
where S: 'static + Service;
impl<S> $crate::protocol::Serve for __Server<S>
where S: 'static + Service
@@ -482,7 +500,8 @@ macro_rules! service_inner {
}
}
#[allow(dead_code)] // because we're just testing that the macro expansion compiles
#[allow(dead_code)]
// because we're just testing that the macro expansion compiles
#[cfg(test)]
mod syntax_test {
// Tests a service definition with a fn that takes no args
@@ -513,6 +532,8 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
extern crate env_logger;
extern crate tempdir;
use transport::unix::UnixTransport;
service! {
rpc add(x: i32, y: i32) -> i32;
@@ -534,7 +555,7 @@ mod functional_test {
fn simple() {
let _ = env_logger::init();
let handle = Server.spawn("localhost:0").unwrap();
let client = Client::new(handle.local_addr()).unwrap();
let client = Client::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
drop(client);
@@ -545,7 +566,7 @@ mod functional_test {
fn simple_async() {
let _ = env_logger::init();
let handle = Server.spawn("localhost:0").unwrap();
let client = AsyncClient::new(handle.local_addr()).unwrap();
let client = AsyncClient::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).get().unwrap());
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
drop(client);
@@ -555,7 +576,7 @@ mod functional_test {
#[test]
fn try_clone() {
let handle = Server.spawn("localhost:0").unwrap();
let client1 = Client::new(handle.local_addr()).unwrap();
let client1 = Client::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
@@ -564,7 +585,19 @@ mod functional_test {
#[test]
fn async_try_clone() {
let handle = Server.spawn("localhost:0").unwrap();
let client1 = AsyncClient::new(handle.local_addr()).unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
}
#[test]
fn async_try_clone_unix() {
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
let temp_file = temp_dir.path()
.join("async_try_clone_unix.tmp");
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
@@ -576,6 +609,12 @@ mod functional_test {
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
}
// Tests that a tcp client can be created from &str
#[allow(dead_code)]
fn test_client_str() {
let _ = Client::new("localhost:0");
}
#[test]
fn serde() {
use bincode;

View File

@@ -5,41 +5,47 @@
use serde;
use std::fmt;
use std::io::{self, BufReader, BufWriter, Read};
use std::io::{self, BufReader, BufWriter};
use std::collections::HashMap;
use std::mem;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Stream};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
where Request: serde::ser::Serialize
pub struct Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream
{
// The guard is in an option so it can be joined in the drop fn
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
outbound: Sender<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
shutdown: TcpStream,
shutdown: S,
}
impl<Request, Reply> Client<Request, Reply>
impl<Request, Reply, S> Client<Request, Reply, S>
where Request: serde::ser::Serialize + Send + 'static,
Reply: serde::de::Deserialize + Send + 'static
Reply: serde::de::Deserialize + Send + 'static,
S: Stream
{
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Self::with_config(addr, Config::default())
pub fn new<D>(dialer: D) -> io::Result<Self>
where D: Dialer<Stream = S>
{
Self::with_config(dialer, Config::default())
}
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn with_config<A: ToSocketAddrs>(addr: A, config: Config) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
where D: Dialer<Stream = S>
{
let stream = try!(dialer.dial());
try!(stream.set_read_timeout(config.timeout));
try!(stream.set_write_timeout(config.timeout));
let reader_stream = try!(stream.try_clone());
@@ -59,7 +65,7 @@ impl<Request, Reply> Client<Request, Reply>
}
/// Clones the Client so that it can be shared across threads.
pub fn try_clone(&self) -> io::Result<Client<Request, Reply>> {
pub fn try_clone(&self) -> io::Result<Self> {
Ok(Client {
reader_guard: self.reader_guard.clone(),
outbound: self.outbound.clone(),
@@ -97,14 +103,15 @@ impl<Request, Reply> Client<Request, Reply>
}
}
impl<Request, Reply> Drop for Client<Request, Reply>
where Request: serde::ser::Serialize
impl<Request, Reply, S> Drop for Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream
{
fn drop(&mut self) {
debug!("Dropping Client.");
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
debug!("Attempting to shut down writer and reader threads.");
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
if let Err(e) = self.shutdown.shutdown() {
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
e);
} else {
@@ -112,9 +119,9 @@ impl<Request, Reply> Drop for Client<Request, Reply>
// finish.
debug!("Joining writer and reader.");
reader_guard.take()
.expect(pos!())
.join()
.expect(pos!());
.expect(pos!())
.join()
.expect(pos!());
debug!("Successfully joined writer and reader.");
}
}
@@ -185,11 +192,12 @@ impl<Reply> RpcFutures<Reply> {
}
}
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: S)
where Request: serde::Serialize,
Reply: serde::Deserialize
Reply: serde::Deserialize,
S: Stream
{
let mut next_id = 0;
let mut stream = BufWriter::new(stream);
@@ -238,8 +246,9 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
}
fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
where Reply: serde::Deserialize
fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
where Reply: serde::Deserialize,
S: Stream
{
let mut stream = BufReader::new(stream);
loop {

View File

@@ -93,6 +93,7 @@ mod test {
extern crate env_logger;
use super::{Client, Config, Serve};
use scoped_pool::Pool;
use std::net::TcpStream;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::time::Duration;
@@ -132,7 +133,7 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap();
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
drop(client);
serve_handle.shutdown();
}
@@ -142,9 +143,8 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
let client: Client<(), u64> = Client::new(addr).unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
assert_eq!(0, client.rpc(()).unwrap());
assert_eq!(1, server.count());
assert_eq!(1, client.rpc(()).unwrap());
@@ -185,10 +185,9 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config("localhost:0",
Config { timeout: Some(Duration::new(0, 10)) })
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr).unwrap();
Config { timeout: Some(Duration::new(0, 10)) })
.unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
thread.join().unwrap();
@@ -198,11 +197,10 @@ mod test {
fn client_failed_rpc() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config("localhost:0",
Config { timeout: test_timeout() })
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr).unwrap());
let serve_handle =
server.spawn_with_config("localhost:0", Config { timeout: test_timeout() })
.unwrap();
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
client.rpc(()).unwrap();
serve_handle.shutdown();
match client.rpc(()) {
@@ -219,8 +217,7 @@ mod test {
let pool = Pool::new(concurrency);
let server = Arc::new(BarrierServer::new(concurrency));
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr).unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
pool.scoped(|scope| {
for _ in 0..concurrency {
let client = client.try_clone().unwrap();
@@ -239,8 +236,7 @@ mod test {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = server.spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr).unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
// Drop future immediately; does the reader channel panic when sending?
client.rpc_async(());

View File

@@ -1,4 +1,4 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer, de, ser};
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
use std::marker::PhantomData;
/// Packet shared between client and server.
@@ -19,40 +19,10 @@ impl<T: Serialize> Serialize for Packet<T> {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer
{
serializer.serialize_struct(PACKET,
MapVisitor {
value: self,
state: 0,
})
}
}
struct MapVisitor<'a, T: 'a> {
value: &'a Packet<T>,
state: u8,
}
impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
#[inline]
fn visit<S>(&mut self, serializer: &mut S) -> Result<Option<()>, S::Error>
where S: Serializer
{
match self.state {
0 => {
self.state += 1;
Ok(Some(try!(serializer.serialize_struct_elt(RPC_ID, &self.value.rpc_id))))
}
1 => {
self.state += 1;
Ok(Some(try!(serializer.serialize_struct_elt(MESSAGE, &self.value.message))))
}
_ => Ok(None),
}
}
#[inline]
fn len(&self) -> Option<usize> {
Some(2)
let mut state = try!(serializer.serialize_struct(PACKET, 2));
try!(serializer.serialize_struct_elt(&mut state, RPC_ID, &self.rpc_id));
try!(serializer.serialize_struct_elt(&mut state, MESSAGE, &self.message));
serializer.serialize_struct_end(state)
}
}

View File

@@ -7,24 +7,27 @@ use serde;
use scoped_pool::{Pool, Scope};
use std::fmt;
use std::io::{self, BufReader, BufWriter};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Listener, Stream, Transport};
use transport::tcp::TcpDialer;
struct ConnectionHandler<'a, S>
where S: Serve
struct ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream
{
read_stream: BufReader<TcpStream>,
write_stream: BufWriter<TcpStream>,
read_stream: BufReader<St>,
write_stream: BufWriter<St>,
server: S,
shutdown: &'a AtomicBool,
}
impl<'a, S> ConnectionHandler<'a, S>
where S: Serve
impl<'a, S, St> ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream
{
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
let ConnectionHandler {
@@ -38,7 +41,7 @@ impl<'a, S> ConnectionHandler<'a, S>
scope.execute(move || Self::write(rx, write_stream));
loop {
match read_stream.deserialize() {
Ok(Packet { rpc_id, message, }) => {
Ok(Packet { rpc_id, message }) => {
let tx = tx.clone();
scope.execute(move || {
let reply = server.serve(message);
@@ -78,12 +81,13 @@ impl<'a, S> ConnectionHandler<'a, S>
fn timed_out(error_kind: io::ErrorKind) -> bool {
match error_kind {
io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => true,
io::ErrorKind::TimedOut |
io::ErrorKind::WouldBlock => true,
_ => false,
}
}
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<TcpStream>) {
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<St>) {
loop {
match rx.recv() {
Err(e) => {
@@ -101,21 +105,25 @@ impl<'a, S> ConnectionHandler<'a, S>
}
/// Provides methods for blocking until the server completes,
pub struct ServeHandle {
pub struct ServeHandle<D = TcpDialer>
where D: Dialer
{
tx: Sender<()>,
join_handle: JoinHandle<()>,
addr: SocketAddr,
dialer: D,
}
impl ServeHandle {
impl<D> ServeHandle<D>
where D: Dialer
{
/// Block until the server completes
pub fn wait(self) {
self.join_handle.join().expect(pos!());
}
/// Returns the address the server is bound to
pub fn local_addr(&self) -> &SocketAddr {
&self.addr
/// Returns the dialer to the server.
pub fn dialer(&self) -> &D {
&self.dialer
}
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
@@ -123,7 +131,7 @@ impl ServeHandle {
pub fn shutdown(self) {
info!("ServeHandle: attempting to shut down the server.");
self.tx.send(()).expect(pos!());
if let Ok(_) = TcpStream::connect(self.addr) {
if let Ok(_) = self.dialer.dial() {
self.join_handle.join().expect(pos!());
} else {
warn!("ServeHandle: best effort shutdown of serve thread failed");
@@ -131,16 +139,19 @@ impl ServeHandle {
}
}
struct Server<'a, S: 'a> {
struct Server<'a, S: 'a, L>
where L: Listener
{
server: &'a S,
listener: TcpListener,
listener: L,
read_timeout: Option<Duration>,
die_rx: Receiver<()>,
shutdown: &'a AtomicBool,
}
impl<'a, S: 'a> Server<'a, S>
where S: Serve + 'static
impl<'a, S, L> Server<'a, S, L>
where S: Serve + 'static,
L: Listener
{
fn serve<'b>(self, scope: &Scope<'b>)
where 'a: 'b
@@ -194,7 +205,9 @@ impl<'a, S: 'a> Server<'a, S>
}
}
impl<'a, S> Drop for Server<'a, S> {
impl<'a, S, L> Drop for Server<'a, S, L>
where L: Listener
{
fn drop(&mut self) {
debug!("Shutting down connection handlers.");
self.shutdown.store(true, Ordering::SeqCst);
@@ -212,29 +225,33 @@ pub trait Serve: Send + Sync + Sized {
fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn
fn spawn<A>(self, addr: A) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static
{
self.spawn_with_config(addr, Config::default())
self.spawn_with_config(transport, Config::default())
}
/// spawn
fn spawn_with_config<A>(self, addr: A, config: Config) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
fn spawn_with_config<T>(self,
transport: T,
config: Config)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("spawn_with_config: spinning up server on {:?}", addr);
let listener = try!(transport.bind());
let dialer = try!(listener.dialer());
info!("spawn_with_config: spinning up server.");
let (die_tx, die_rx) = channel();
let timeout = config.timeout;
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &self,
listener: listener,
read_timeout: config.timeout,
read_timeout: timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
@@ -245,10 +262,9 @@ pub trait Serve: Send + Sync + Sized {
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
dialer: dialer,
})
}
}
impl<P, S> Serve for P

View File

@@ -0,0 +1,91 @@
use std::io::{self, Read, Write};
use std::time::Duration;
/// A factory for creating a listener on a given address.
/// For TCP, an address might be an IPv4 address; for Unix sockets, it
/// is just a file name.
pub trait Transport {
/// The type of listener that binds to the given address.
type Listener: Listener;
/// Return a listener on the given address, and a dialer to that address.
fn bind(&self) -> io::Result<Self::Listener>;
}
/// Accepts incoming connections from dialers.
pub trait Listener: Send + 'static {
/// The type of address being listened on.
type Dialer: Dialer;
/// The type of stream this listener accepts.
type Stream: Stream;
/// Accept an incoming stream.
fn accept(&self) -> io::Result<Self::Stream>;
/// Returns the local address being listened on.
fn dialer(&self) -> io::Result<Self::Dialer>;
/// Iterate over incoming connections.
fn incoming(&self) -> Incoming<Self> {
Incoming { listener: self }
}
}
/// A cloneable Reader/Writer.
pub trait Stream: Read + Write + Send + Sized + 'static {
/// Creates a new independently owned handle to the Stream.
///
/// The returned Stream should reference the same stream that this
/// object references. Both handles should read and write the same
/// stream of data, and options set on one stream should be propagated
/// to the other stream.
fn try_clone(&self) -> io::Result<Self>;
/// Sets a read timeout.
///
/// If the value specified is `None`, then read calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Sets a write timeout.
///
/// If the value specified is `None`, then write calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Shuts down both ends of the stream.
///
/// Implementations should cause all pending and future I/O on the specified
/// portions to return immediately with an appropriate value.
fn shutdown(&self) -> io::Result<()>;
}
/// A `Stream` factory.
pub trait Dialer {
/// The type of `Stream` this can create.
type Stream: Stream;
/// Open a stream.
fn dial(&self) -> io::Result<Self::Stream>;
}
impl<P, D: ?Sized> Dialer for P
where P: ::std::ops::Deref<Target = D>,
D: Dialer + 'static
{
type Stream = D::Stream;
fn dial(&self) -> io::Result<Self::Stream> {
(**self).dial()
}
}
/// Iterates over incoming connections.
pub struct Incoming<'a, L: Listener + ?Sized + 'a> {
listener: &'a L,
}
impl<'a, L: Listener> Iterator for Incoming<'a, L> {
type Item = io::Result<L::Stream>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.listener.accept())
}
}
/// Provides a TCP transport.
pub mod tcp;
/// Provides a unix socket transport.
pub mod unix;

View File

@@ -0,0 +1,77 @@
use std::io;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::time::Duration;
/// A transport for TCP.
#[derive(Debug)]
pub struct TcpTransport<A: ToSocketAddrs>(pub A);
impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(&self.0)
}
}
impl<A: ToSocketAddrs> super::Transport for A {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(self)
}
}
impl super::Listener for TcpListener {
type Dialer = TcpDialer<SocketAddr>;
type Stream = TcpStream;
fn accept(&self) -> io::Result<TcpStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<TcpDialer<SocketAddr>> {
self.local_addr().map(|addr| TcpDialer(addr))
}
}
impl super::Stream for TcpStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(dur)
}
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(dur)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}
/// Connects to a socket address.
#[derive(Debug)]
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
impl<A> super::Dialer for TcpDialer<A>
where A: ToSocketAddrs
{
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(&self.0)
}
}
impl super::Dialer for str {
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(self)
}
}

View File

@@ -0,0 +1,72 @@
use std::io;
use std::path::{Path, PathBuf};
use std::time::Duration;
use unix_socket::{UnixListener, UnixStream};
/// A transport for unix sockets.
#[derive(Debug)]
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
impl<P> super::Transport for UnixTransport<P>
where P: AsRef<Path>
{
type Listener = UnixListener;
fn bind(&self) -> io::Result<UnixListener> {
UnixListener::bind(&self.0)
}
}
/// Connects to a unix socket address.
#[derive(Debug)]
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
impl<P> super::Dialer for UnixDialer<P>
where P: AsRef<Path>
{
type Stream = UnixStream;
fn dial(&self) -> io::Result<UnixStream> {
UnixStream::connect(&self.0)
}
}
impl super::Listener for UnixListener {
type Stream = UnixStream;
type Dialer = UnixDialer<PathBuf>;
fn accept(&self) -> io::Result<UnixStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<UnixDialer<PathBuf>> {
self.local_addr().and_then(|addr| {
match addr.as_pathname() {
Some(path) => Ok(UnixDialer(path.to_owned())),
None => {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"Couldn't get a path to bound unix socket"))
}
}
})
}
}
impl super::Stream for UnixStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(timeout)
}
fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(timeout)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}

View File

@@ -5,5 +5,5 @@ authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.k
[dev-dependencies]
tarpc = { path = "../tarpc" }
lazy_static = "^0.1.15"
env_logger = "^0.3.2"
lazy_static = "0.2"
env_logger = "0.3"

View File

@@ -41,8 +41,9 @@ mod benchmark {
Arc::new(Mutex::new(handle))
};
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
let addr = HANDLE.lock().unwrap().local_addr().clone();
let client = AsyncClient::new(addr).unwrap();
let lock = HANDLE.lock().unwrap();
let dialer = lock.dialer();
let client = AsyncClient::new(dialer).unwrap();
Arc::new(Mutex::new(client))
};
}