59 Commits

Author SHA1 Message Date
Tim Kuehn
9193357d60 Fix the README example. It broke again. :( 2016-02-20 01:29:22 -08:00
shaladdle
b777e0bbf7 Merge pull request #24 from tikue/release-notes
Add release notes
2016-02-20 01:15:49 -08:00
Tim Kuehn
04624f054d Forgot the file 2016-02-20 01:11:02 -08:00
Tim Kuehn
f870f832a9 Rename RELEASE_NOTES.md => RELEASES.md to conform to rust-lang/rust. 2016-02-20 01:07:51 -08:00
Tim Kuehn
dc347021d4 Rework a line 2016-02-20 01:05:43 -08:00
Tim Kuehn
5973e54f62 Fix markdown 2016-02-20 01:04:40 -08:00
Tim Kuehn
e5e5c5975c Add release notes 2016-02-20 01:02:13 -08:00
shaladdle
6bb3691a30 Merge pull request #23 from tikue/deps
Update deps versions
2016-02-20 00:53:27 -08:00
Tim Kuehn
e2f1511fb3 Update deps versions 2016-02-19 23:53:27 -08:00
Tim Kuehn
99ba380825 Update readme version 2016-02-19 23:45:01 -08:00
Tim
39235343d6 Merge pull request #22 from shaladdle/allowunused
Allow most things to be unused
2016-02-19 23:40:42 -08:00
Adam Wright
f3afd080f3 Allow most things to be unused 2016-02-19 23:28:34 -08:00
Tim
043d0a1c21 Merge pull request #21 from shaladdle/bashpls
Merge shaladdle/bashpls into master.
2016-02-19 23:23:20 -08:00
shaladdle
be4caeebe8 Merge pull request #20 from tikue/config
Merge tikue/config into master.
2016-02-19 23:16:15 -08:00
Adam Wright
06a2cab31c Move /bin/bash to the top so it works 2016-02-19 23:08:50 -08:00
Tim Kuehn
934c51f4ab Bunch version number 2016-02-19 22:57:47 -08:00
Tim Kuehn
cc8a8e76b0 Remove a bunch of derived traits from Config for backwards compatibility reasons. 2016-02-19 22:51:18 -08:00
Tim Kuehn
b9ba10b8a4 Fix tarpc_examples 2016-02-19 22:42:36 -08:00
Tim Kuehn
1ee1f9274a Fix some log statements 2016-02-19 22:33:13 -08:00
Tim Kuehn
7f354be850 Merge branch 'master' of github.com:google/tarpc into config 2016-02-19 22:32:16 -08:00
Tim Kuehn
c9a63c2a5a Don't derive Hash for Config because Duration is only Hashable on nightly 2016-02-19 22:06:29 -08:00
Tim
ee1143c709 Merge pull request #14 from shaladdle/hooks
Merge shaladdle/hooks into master.
2016-02-19 21:46:00 -08:00
Adam Wright
4ed127b39e NOT FOUND instead of FAILED 2016-02-19 21:31:11 -08:00
Tim Kuehn
66cd136c6a Add a note about serde in the readme 2016-02-18 21:27:22 -08:00
Tim Kuehn
58cbe6f4ea Update README and service macro doc comment 2016-02-18 21:23:42 -08:00
Tim Kuehn
250a7fd7b9 Move 'static bound from trait to spawn fns 2016-02-18 20:48:25 -08:00
Tim Kuehn
a44fd808d9 Merge branch 'master' of github.com:google/tarpc into config 2016-02-18 20:42:33 -08:00
Tim Kuehn
65c4d83c88 [breaking] Renamings and refactorings.
* Add a Config struct. `Client::new(addr, duration)` => `Client::new(addr, config)`
* `serve(addr, server, config)` => `server.spawn_with_config(addr, config)`
* Also added `server.spawn(addr)`
2016-02-18 20:40:08 -08:00
shaladdle
00692fe9a3 Merge pull request #19 from tikue/readme
Make a note of AsyncClient and `cargo doc` in the readme
2016-02-18 15:35:56 -08:00
Tim Kuehn
0968760ef7 Make a note of AsyncClient and cargo doc in the readme 2016-02-18 15:28:41 -08:00
shaladdle
75b2c00b54 Merge pull request #17 from tikue/readme
Define RPC in readme
2016-02-18 14:59:43 -08:00
Tim Kuehn
ffee124526 List dependency info in readme 2016-02-18 08:49:20 -08:00
Tim Kuehn
06a03697c4 Merge branch 'master' of github.com:google/tarpc into readme 2016-02-18 08:43:35 -08:00
Tim Kuehn
a675551a31 Define RPC in README 2016-02-18 08:43:31 -08:00
Adam Wright
d0e9693263 Be consistent with function declarations 2016-02-18 01:40:44 -08:00
Adam Wright
6d23174219 Build on nightly too, as per review comment 2016-02-18 01:36:39 -08:00
Adam Wright
a06b583334 Also check existence of shasum 2016-02-18 01:27:07 -08:00
shaladdle
937e9c2c43 Merge pull request #15 from tikue/readme
Make a line in the readme a bit cleaner
2016-02-18 01:24:11 -08:00
Tim Kuehn
54883d6354 Make a line in the readme a bit cleaner 2016-02-18 01:20:20 -08:00
Tim
86b1470832 Merge pull request #13 from gsquire/readme-fix
Fix README code example
2016-02-18 01:17:52 -08:00
Adam Wright
82762583be Change colors a bit, only exit when check_toolchain fails.. 2016-02-18 01:06:31 -08:00
Adam Wright
3462451256 Check for multirust, rustfmt, clean up some things 2016-02-18 01:06:31 -08:00
Adam Wright
17d800b8a8 Update comments 2016-02-18 01:06:31 -08:00
Adam Wright
403eba201b Group builds/tests by toolchain for speed 2016-02-18 01:06:31 -08:00
Adam Wright
f2328d200e pre-commit doesn't modify files, just checks foramtting 2016-02-18 01:06:31 -08:00
Garrett Squire
51e6bac2dc fix the example in the README 2016-02-17 23:25:33 -08:00
shaladdle
f3fcbbb8d2 Merge pull request #12 from tikue/second-test
Add a second rpc to test service.
2016-02-17 18:08:23 -08:00
Tim Kuehn
05acb97f04 Add a second rpc to test service.
To better exercise the serialization parts -- namely, the enum tags.
2016-02-16 22:17:06 -08:00
Adam Wright
07c052a1c1 Individually format crates, use -q 2016-02-15 18:54:15 -08:00
Adam Wright
34cf0c8172 Updated hook to print messages like prepush 2016-02-15 18:30:26 -08:00
Adam Wright
7b196400b8 Revamp push hook
1. Create environment variables for options
3. Use multirust to test different toolchains. If multirust isn't
   installed, use current toolchain.
2016-02-15 17:28:16 -08:00
Adam Wright
1f30bb9ba6 Remove the install hooks script 2016-02-15 13:58:42 -08:00
Adam Wright
e2756edd72 Test all crates and format on pre-commit 2016-02-15 13:58:04 -08:00
shaladdle
8957d2dac3 Merge pull request #11 from tikue/rustfmt
rustfmt the things
2016-02-15 13:36:16 -08:00
shaladdle
21e5734ef7 Merge pull request #10 from tikue/inlines
Add some missing #[inline]s.
2016-02-15 13:33:11 -08:00
Tim Kuehn
dddeca19a1 .travis.yml: don't set unstable feature when building on nightly 2016-02-15 12:29:13 -08:00
Tim Kuehn
a9b86280b5 Add coveralls deps 2016-02-15 12:17:21 -08:00
Tim Kuehn
7dae99d7b5 rustfmt the things 2016-02-15 02:34:20 -08:00
Tim Kuehn
9dd3d55744 Add some missing #[inline]s.
And add the optional fn len method to Packet's MapVisitor. Also add a link to documentation in the readme.
2016-02-15 02:20:27 -08:00
15 changed files with 498 additions and 218 deletions

View File

@@ -6,6 +6,13 @@ rust:
- beta
- nightly
addons:
apt:
packages:
- libcurl4-openssl-dev
- libelf-dev
- libdw-dev
before_script:
- |
pip install 'travis-cargo<0.2' --user &&
@@ -13,8 +20,13 @@ before_script:
script:
- |
(cd tarpc && cargo build) &&
(cd tarpc && cargo test)
(cd tarpc && travis-cargo build) &&
(cd tarpc && travis-cargo test)
after_success:
- (cd tarpc && travis-cargo coveralls --no-sudo)
env:
global:
# override the default `--features unstable` used for the nightly branch
- TRAVIS_CARGO_NIGHTLY_FEATURE=""

View File

@@ -6,8 +6,27 @@
*Disclaimer*: This is not an official Google product.
tarpc is an RPC framework for rust with a focus on ease of use. Defining and implementing an echo-like server can be done in just a few lines of code:
tarpc is an RPC framework for rust with a focus on ease of use. Defining a service can be done in
just a few lines of code, and most of the boilerplate of writing a server is taken care of for you.
[Documentation](https://google.github.io/tarpc)
## What is an RPC framework?
"RPC" stands for "Remote Procedure Call," a function call where the work of producing the return
value is being done somewhere else. When an rpc function is invoked, behind the scenes the function
contacts some other process somewhere and asks them to compute the function instead. The original
function then returns the value produced by that other server.
[More information](https://www.cs.cf.ac.uk/Dave/C/node33.html)
## Usage
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.3.0"
```
## Example
```rust
#[macro_use]
extern crate tarpc;
@@ -17,18 +36,19 @@ mod hello_service {
rpc hello(name: String) -> String;
}
}
use hello_service::Service as HelloService;
struct HelloService;
impl hello_service::Service for HelloService {
struct HelloServer;
impl HelloService for HelloServer {
fn hello(&self, name: String) -> String {
format!("Hello, {}!", s)
format!("Hello, {}!", name)
}
}
fn main() {
let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap();
let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap();
assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap());
let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap();
let client = hello_service::Client::new(server_handle.local_addr()).unwrap();
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
drop(client);
server_handle.shutdown();
}
@@ -36,14 +56,19 @@ fn main() {
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
above example, the macro is called within the `hello_service` module. This module will contain a
`Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server
listening on a tcp port. A `Client` can connect to such a service. Any type implementing the
`Service` trait can be passed to `serve`. These generated types are specific to the echo service,
and make it easy and ergonomic to write servers without dealing with sockets or serialization
directly. See the tarpc_examples package for more sophisticated examples.
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp
port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it
easy and ergonomic to write servers without dealing with sockets or serialization directly. See the
tarpc_examples package for more sophisticated examples.
## Documentation
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
## Additional Features
- Concurrent requests from a single client.
- Any type that `impl`s `serde`'s Serialize` and `Deserialize` can be used in the rpc signatures.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
methods as well as on the `Client`'s stub methods.
- Just like regular fns, the return type can be left off when it's `-> ()`.

12
RELEASES.md Normal file
View File

@@ -0,0 +1,12 @@
## 1.3 (2016-02-20)
### Breaking Changes
* The timeout arg to `serve` was replaced with a `Config` struct, which
currently only contains one field, but will be expanded in the future
to allow configuring serialization protocol, and other things.
* `serve` was changed to be a default method on the generated `Service` traits,
and it was renamed `spawn_with_config`. A second `default fn` was added:
`spawn`, which takes no `Config` arg.
### Other Changes
* Expanded items will no longer generate unused warnings.

View File

@@ -1,18 +1,41 @@
#!/bin/bash
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
#
# An example hook script to verify what is about to be committed.
# Called by "git commit" with no arguments. The hook should
# exit with non-zero status after issuing an appropriate message if
# it wants to stop the commit.
# Pre-commit hook for the tarpc repository. To use this hook, copy it to .git/hooks in your
# repository root.
#
# To enable this hook, rename this file to "pre-commit".
# This precommit checks the following:
# 1. All filenames are ascii
# 2. There is no bad whitespace
# 3. rustfmt is installed
# 4. rustfmt is a noop on files that are in the index
#
# Options:
#
# - TARPC_SKIP_RUSTFMT, default = 0
#
# Set this to 1 to skip running rustfmt
#
# Note that these options are most useful for testing the hooks themselves. Use git commit
# --no-verify to skip the pre-commit hook altogether.
if git rev-parse --verify HEAD >/dev/null 2>&1
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
PREFIX="${GREEN}[PRECOMMIT]${NC}"
FAILURE="${RED}FAILED${NC}"
WARNING="${RED}[WARNING]${NC}"
SKIPPED="${YELLOW}SKIPPED${NC}"
SUCCESS="${GREEN}ok${NC}"
if git rev-parse --verify HEAD &>/dev/null
then
against=HEAD
else
@@ -20,35 +43,70 @@ else
against=4b825dc642cb6eb9a060e54bf8d69288fbee4904
fi
# If you want to allow non-ASCII filenames set this variable to true.
allownonascii=$(git config --bool hooks.allownonascii)
FAILED=0
# Redirect output to stderr.
exec 1>&2
# Cross platform projects tend to avoid non-ASCII filenames; prevent
# them from being added to the repository. We exploit the fact that the
# printable range starts at the space character and ends with tilde.
if [ "$allownonascii" != "true" ] &&
# Note that the use of brackets around a tr range is ok here, (it's
# even required, for portability to Solaris 10's /usr/bin/tr), since
# the square bracket bytes happen to fall in the designated range.
test $(git diff --cached --name-only --diff-filter=A -z $against |
LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
printf "${PREFIX} Checking that all filenames are ascii ... "
# Note that the use of brackets around a tr range is ok here, (it's
# even required, for portability to Solaris 10's /usr/bin/tr), since
# the square bracket bytes happen to fall in the designated range.
if test $(git diff --cached --name-only --diff-filter=A -z $against | LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
then
cat <<\EOF
Error: Attempt to add a non-ASCII file name.
FAILED=1
printf "${FAILURE}\n"
else
printf "${SUCCESS}\n"
fi
This can cause problems if you want to work with people on other platforms.
printf "${PREFIX} Checking for bad whitespace ... "
git diff-index --check --cached $against -- &>/dev/null
if [ "$?" != 0 ]; then
FAILED=1
printf "${FAILURE}\n"
else
printf "${SUCCESS}\n"
fi
To be portable it is advisable to rename the file.
If you know what you are doing you can disable this check using:
git config hooks.allownonascii true
EOF
printf "${PREFIX} Checking for rustfmt ... "
command -v rustfmt &>/dev/null
if [ $? == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${FAILURE}\n"
exit 1
fi
# If there are whitespace errors, print the offending file names and fail.
exec git diff-index --check --cached $against --
printf "${PREFIX} Checking for shasum ... "
command -v shasum &>/dev/null
if [ $? == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${FAILURE}\n"
exit 1
fi
# Just check that running rustfmt doesn't do anything to the file. I do this instead of
# modifying the file because I don't want to mess with the developer's index, which may
# not only contain discrete files.
printf "${PREFIX} Checking formatting ... "
FMTRESULT=0
for file in $(git diff --name-only --cached);
do
if [ ${file: -3} == ".rs" ]; then
HASH=$(shasum $file)
NEW_HASH=$(rustfmt --write-mode=display $file | shasum)
if [ "${HASH}" != "${NEW_HASH}" ]; then
FMTRESULT=1
fi
fi
done
if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then
printf "${SKIPPED}\n"$?
elif [ ${FMTRESULT} != 0 ]; then
FAILED=1
printf "${FAILURE}\n"
else
printf "${SUCCESS}\n"
fi
exit ${FAILED}

View File

@@ -1,39 +1,127 @@
#!/bin/bash
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
# root.
#
# This hook runs tests to make sure only working code is being pushed. If present, multirust is used
# to build and test the code on the appropriate toolchains. The working copy must not contain
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
#
# Options:
#
# - TARPC_ALLOW_DIRTY, default = 0
#
# Setting this variable to 1 will run tests even though there are code changes in the working
# copy. Set to 0 by default, since the intent is to test the code that's being pushed, not changes
# still in the working copy.
#
# - TARPC_USE_CURRENT_TOOLCHAIN, default = 0
#
# Setting this variable to 1 will just run cargo build and cargo test, rather than running
# stable/beta/nightly.
#
# Note that these options are most useful for testing the hooks themselves. Use git push --no-verify
# to skip the pre-push hook altogether.
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
PREFIX="${GREEN}[PREPUSH]${NC}"
FAILURE="${RED}FAILED${NC}"
WARNING="${YELLOW}[WARNING]${NC}"
SKIPPED="${YELLOW}SKIPPED${NC}"
SUCCESS="${GREEN}ok${NC}"
printf "${PREFIX} Clean working copy ... "
git diff --exit-code &>/dev/null
if [ "$?" != 0 ];
then
echo ${RED}ERROR${NC} You have uncommitted changes please commit or stash them before pushing so that I can run tests!
exit 1
fi
printf "${YELLOW}[PRESUBMIT]${NC} Running tests ... "
TEST_RESULT=0
cargo test --manifest-path tarpc/Cargo.toml &>/dev/null
if [ "$?" != "0" ];
then
printf "${RED}FAILED${NC}"
TEST_RESULT=1
if [ "$?" == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${GREEN}ok${NC}"
fi
printf "\n"
RESULT=0
if [ "$TEST_RESULT" == "1" ];
then
RESULT=1
if [ "${TARPC_ALLOW_DIRTY}" == "1" ]
then
printf "${SKIPPED}\n"
else
printf "${FAILURE}\n"
exit 1
fi
fi
exit $RESULT
PREPUSH_RESULT=0
# args:
# 1 - cargo command to run (build/test)
# 2 - directory name of crate to build
# 3 - rust toolchain (nightly/stable/beta)
run_cargo() {
if [ "$1" == "build" ]; then
VERB=Building
else
VERB=Testing
fi
if [ "$3" != "" ]; then
printf "${PREFIX} $VERB $2 on $3 ... "
multirust run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
else
printf "${PREFIX} $VERB $2 ... "
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
fi
if [ "$?" != "0" ]; then
printf "${FAILURE}\n"
PREPUSH_RESULT=1
else
printf "${SUCCESS}\n"
fi
}
TOOLCHAIN_RESULT=0
check_toolchain() {
printf "${PREFIX} Checking for $1 toolchain ... "
if [[ $(multirust list-toolchain) =~ $1 ]]; then
printf "${SUCCESS}\n"
else
TOOLCHAIN_RESULT=1
PREPUSH_RESULT=1
printf "${FAILURE}\n"
fi
}
printf "${PREFIX} Checking for multirust ... "
command -v multirust &>/dev/null
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
printf "${SUCCESS}\n"
check_toolchain stable
check_toolchain beta
check_toolchain nightly
if [ ${TOOLCHAIN_RESULT} == 1 ]; then
exit 1
fi
run_cargo build tarpc stable
run_cargo build tarpc_examples stable
run_cargo build tarpc beta
run_cargo build tarpc_examples beta
run_cargo build tarpc nightly
run_cargo build tarpc_examples nightly
# We still rely on some nightly stuff for tests
run_cargo test tarpc nightly
run_cargo test tarpc_examples nightly
else
printf "${YELLOW}NOT FOUND${NC}\n"
printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n"
run_cargo test tarpc
run_cargo test tarpc_examples
fi
exit $PREPUSH_RESULT

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.2.0"
version = "0.3.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://google.github.io/tarpc"
@@ -13,8 +13,8 @@ description = "An RPC framework for Rust with a focus on ease of use."
[dependencies]
bincode = "^0.4.0"
log = "^0.3.5"
scoped-pool = "^0.1.4"
serde = "^0.6.13"
scoped-pool = "^0.1.5"
serde = "^0.6.14"
[dev-dependencies]
lazy_static = "^0.1.15"

View File

@@ -1,8 +0,0 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
ln -s ../../hooks/pre-commit .git/hooks/pre-commit
ln -s ../../hooks/pre-push .git/hooks/pre-push

View File

@@ -1 +1,2 @@
ideal_width = 100
reorder_imports = true

View File

@@ -8,7 +8,7 @@
//! Example usage:
//!
//! ```
//! # #[macro_use] extern crate tarpc;
//! #[macro_use] extern crate tarpc;
//! mod my_server {
//! service! {
//! rpc hello(name: String) -> String;
@@ -31,11 +31,8 @@
//!
//! fn main() {
//! let addr = "127.0.0.1:9000";
//! let shutdown = my_server::serve(addr,
//! Server,
//! Some(Duration::from_secs(30)))
//! .unwrap();
//! let client = Client::new(addr, None).unwrap();
//! let shutdown = Server.spawn(addr).unwrap();
//! let client = Client::new(addr).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(),
//! client.hello("Mom".to_string()).unwrap());
@@ -63,4 +60,4 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
pub use protocol::{Error, Result, ServeHandle};
pub use protocol::{Config, Error, Result, ServeHandle};

View File

@@ -8,7 +8,7 @@ pub mod serde {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Deserialization re-exports required by macros. Not for general use.
pub mod de {
pub use serde::de::{EnumVisitor, Error, Visitor, VariantVisitor};
pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor};
}
}
@@ -21,6 +21,7 @@ macro_rules! client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($($arg:ident,)*) : ($($in_:ty,)*) ) -> $out:ty
) => (
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
@@ -32,13 +33,17 @@ macro_rules! client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident,)*) : ($($in_:ty, )*) ) -> $out:ty
)*) => ( $(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
if let __Reply::$fn_name(reply) = reply {
::std::result::Result::Ok(reply)
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, \
but got {:?}",
stringify!($fn_name),
reply);
}
}
)*);
@@ -53,6 +58,7 @@ macro_rules! async_client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
) => (
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
fn mapper(reply: __Reply) -> $out {
@@ -70,13 +76,17 @@ macro_rules! async_client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
)*) => ( $(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
fn mapper(reply: __Reply) -> $out {
if let __Reply::$fn_name(reply) = reply {
reply
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, but got \
{:?}",
stringify!($fn_name),
reply);
}
}
let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*)));
@@ -130,7 +140,7 @@ macro_rules! impl_deserialize {
-> ::std::result::Result<$impler, D::Error>
where D: $crate::macros::serde::Deserializer
{
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
enum __Field {
$($name),*
}
@@ -144,6 +154,7 @@ macro_rules! impl_deserialize {
impl $crate::macros::serde::de::Visitor for __FieldVisitor {
type Value = __Field;
#[inline]
fn visit_usize<E>(&mut self, value: usize)
-> ::std::result::Result<__Field, E>
where E: $crate::macros::serde::de::Error,
@@ -166,6 +177,7 @@ macro_rules! impl_deserialize {
impl $crate::macros::serde::de::EnumVisitor for __Visitor {
type Value = $impler;
#[inline]
fn visit<__V>(&mut self, mut visitor: __V)
-> ::std::result::Result<$impler, __V::Error>
where __V: $crate::macros::serde::de::VariantVisitor
@@ -210,17 +222,22 @@ macro_rules! impl_deserialize {
/// # }
/// ```
///
/// There are two rpc names reserved for the default fns `spawn` and `spawn_with_config`.
///
/// Attributes can be attached to each rpc. These attributes
/// will then be attached to the generated `Service` trait's
/// corresponding method, as well as to the `Client` stub's rpcs methods.
///
/// The following items are expanded in the enclosing module:
///
/// * `Service` -- the trait defining the RPC service
/// * `Service` -- the trait defining the RPC service. It comes with two default methods for
/// starting the server:
/// 1. `spawn` starts the service in another thread using default configuration.
/// 2. `spawn_with_config` starts the service in another thread using the specified
/// `Config`.
/// * `Client` -- a client that makes synchronous requests to the RPC server
/// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server
/// * `Future` -- a handle for asynchronously retrieving the result of an RPC
/// * `serve` -- the function that starts the RPC server
///
/// **Warning**: In addition to the above items, there are a few expanded items that
/// are considered implementation details. As with the above items, shadowing
@@ -291,15 +308,34 @@ macro_rules! service_inner {
)*
) => {
#[doc="Defines the RPC service"]
pub trait Service: Send + Sync {
pub trait Service: Send + Sync + Sized {
$(
$(#[$attr])*
fn $fn_name(&self, $($arg:$in_),*) -> $out;
)*
#[doc="Spawn a running service."]
fn spawn<A>(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
Self: 'static,
{
self.spawn_with_config(addr, $crate::Config::default())
}
#[doc="Spawn a running service."]
fn spawn_with_config<A>(self, addr: A, config: $crate::Config)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
Self: 'static,
{
let server = ::std::sync::Arc::new(__Server(self));
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config));
::std::result::Result::Ok(handle)
}
}
impl<P, S> Service for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + Sized + 'static + ::std::ops::Deref<Target=S>,
S: Service
{
$(
@@ -310,7 +346,7 @@ macro_rules! service_inner {
)*
}
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __Request {
$(
@@ -321,7 +357,7 @@ macro_rules! service_inner {
impl_serialize!(__Request, $($fn_name(($($in_),*)))*);
impl_deserialize!(__Request, $($fn_name(($($in_),*)))*);
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __Reply {
$(
@@ -332,29 +368,42 @@ macro_rules! service_inner {
impl_serialize!(__Reply, $($fn_name($out))*);
impl_deserialize!(__Reply, $($fn_name($out))*);
/// An asynchronous RPC call
#[allow(unused)]
#[doc="An asynchronous RPC call"]
pub struct Future<T> {
future: $crate::protocol::Future<__Reply>,
mapper: fn(__Reply) -> T,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
#[allow(unused)]
#[doc="Block until the result of the RPC call is available"]
pub fn get(self) -> $crate::Result<T> {
self.future.get().map(self.mapper)
}
}
#[allow(unused)]
#[doc="The client stub that makes RPC calls to the server."]
pub struct Client($crate::protocol::Client<__Request, __Reply>);
impl Client {
#[doc="Create a new client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<Self>
#[allow(unused)]
#[doc="Create a new client with default configuration that connects to the given \
address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
Self::with_config(addr, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new client with the specified configuration that connects to the \
given address."]
pub fn with_config<A>(addr: A, config: $crate::Config) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::with_config(addr, config));
::std::result::Result::Ok(Client(inner))
}
@@ -365,6 +414,7 @@ macro_rules! service_inner {
)*
);
#[allow(unused)]
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
clone fails."]
pub fn try_clone(&self) -> ::std::io::Result<Self> {
@@ -372,16 +422,27 @@ macro_rules! service_inner {
}
}
#[allow(unused)]
#[doc="The client stub that makes asynchronous RPC calls to the server."]
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
impl AsyncClient {
#[allow(unused)]
#[doc="Create a new asynchronous client with default configuration that connects to \
the given address."]
pub fn new<A>(addr: A) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
Self::with_config(addr, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new asynchronous client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
pub fn with_config<A>(addr: A, config: $crate::Config)
-> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
let inner = try!($crate::protocol::Client::with_config(addr, config));
::std::result::Result::Ok(AsyncClient(inner))
}
@@ -392,6 +453,7 @@ macro_rules! service_inner {
)*
);
#[allow(unused)]
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
clone fails."]
pub fn try_clone(&self) -> ::std::io::Result<Self> {
@@ -399,6 +461,7 @@ macro_rules! service_inner {
}
}
#[allow(unused)]
struct __Server<S: 'static + Service>(S);
impl<S> $crate::protocol::Serve for __Server<S>
@@ -415,18 +478,6 @@ macro_rules! service_inner {
}
}
}
#[doc="Start a running service."]
pub fn serve<A, S>(addr: A,
service: S,
read_timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
S: 'static + Service
{
let server = ::std::sync::Arc::new(__Server(service));
::std::result::Result::Ok(try!($crate::protocol::serve_async(addr, server, read_timeout)))
}
}
}
@@ -461,14 +512,10 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
extern crate env_logger;
use std::time::Duration;
fn test_timeout() -> Option<Duration> {
Some(Duration::from_secs(5))
}
service! {
rpc add(x: i32, y: i32) -> i32;
rpc hey(name: String) -> String;
}
struct Server;
@@ -477,14 +524,18 @@ mod functional_test {
fn add(&self, x: i32, y: i32) -> i32 {
x + y
}
fn hey(&self, name: String) -> String {
format!("Hey, {}.", name)
}
}
#[test]
fn simple() {
let _ = env_logger::init();
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let client = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = Client::new(handle.local_addr()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
drop(client);
handle.shutdown();
}
@@ -492,17 +543,18 @@ mod functional_test {
#[test]
fn simple_async() {
let _ = env_logger::init();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = AsyncClient::new(handle.local_addr()).unwrap();
assert_eq!(3, client.add(1, 2).get().unwrap());
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
drop(client);
handle.shutdown();
}
#[test]
fn try_clone() {
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let client1 = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = Client::new(handle.local_addr()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
@@ -510,8 +562,8 @@ mod functional_test {
#[test]
fn async_try_clone() {
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client1 = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = AsyncClient::new(handle.local_addr()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
@@ -520,7 +572,7 @@ mod functional_test {
// Tests that a server can be wrapped in an Arc; no need to run, just compile
#[allow(dead_code)]
fn serve_arc_server() {
let _ = serve("localhost:0", ::std::sync::Arc::new(Server), None);
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
}
#[test]

View File

@@ -12,9 +12,8 @@ use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use std::time::Duration;
use super::{Serialize, Deserialize, Error, Packet, Result};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
@@ -33,10 +32,16 @@ impl<Request, Reply> Client<Request, Reply>
{
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Self::with_config(addr, Config::default())
}
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn with_config<A: ToSocketAddrs>(addr: A, config: Config) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
try!(stream.set_read_timeout(timeout));
try!(stream.set_write_timeout(timeout));
try!(stream.set_read_timeout(config.timeout));
try!(stream.set_write_timeout(config.timeout));
let reader_stream = try!(stream.try_clone());
let writer_stream = try!(stream.try_clone());
let requests = Arc::new(Mutex::new(RpcFutures::new()));
@@ -100,15 +105,16 @@ impl<Request, Reply> Drop for Client<Request, Reply>
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
debug!("Attempting to shut down writer and reader threads.");
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
warn!("Client: couldn't shutdown writer and reader threads: {:?}", e);
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
e);
} else {
// We only join if we know the TcpStream was shut down. Otherwise we might never
// finish.
debug!("Joining writer and reader.");
reader_guard.take()
.expect(pos!())
.join()
.expect(pos!());
.expect(pos!())
.join()
.expect(pos!());
debug!("Successfully joined writer and reader.");
}
}
@@ -118,14 +124,15 @@ impl<Request, Reply> Drop for Client<Request, Reply>
/// An asynchronous RPC call
pub struct Future<T> {
rx: Receiver<Result<T>>,
requests: Arc<Mutex<RpcFutures<T>>>
requests: Arc<Mutex<RpcFutures<T>>>,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
pub fn get(self) -> Result<T> {
let requests = self.requests;
self.rx.recv()
self.rx
.recv()
.map_err(|_| requests.lock().expect(pos!()).get_error())
.and_then(|reply| reply)
}
@@ -164,7 +171,8 @@ impl<Reply> RpcFutures<Reply> {
info!("Reader: could not complete reply: {:?}", e);
}
} else {
warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id);
warn!("RpcFutures: expected sender for id {} but got None!",
packet.rpc_id);
}
}
@@ -178,10 +186,10 @@ impl<Reply> RpcFutures<Reply> {
}
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
where Request: serde::Serialize,
Reply: serde::Deserialize,
Reply: serde::Deserialize
{
let mut next_id = 0;
let mut stream = BufWriter::new(stream);
@@ -221,8 +229,8 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
{
// Clone the err so we can log it if sending fails
if let Err(e2) = tx.send(Err(e.clone())) {
debug!("Error encountered while trying to send an error. \
Initial error: {:?}; Send error: {:?}",
debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \
error: {:?}",
e,
e2);
}

View File

@@ -9,6 +9,7 @@ use serde;
use std::io::{self, Read, Write};
use std::convert;
use std::sync::Arc;
use std::time::Duration;
mod client;
mod server;
@@ -16,7 +17,7 @@ mod packet;
pub use self::packet::Packet;
pub use self::client::{Client, Future};
pub use self::server::{Serve, ServeHandle, serve_async};
pub use self::server::{Serve, ServeHandle};
/// Client errors that can occur during rpc calls
#[derive(Debug, Clone)]
@@ -54,13 +55,19 @@ impl convert::From<io::Error> for Error {
}
}
/// Configuration for client and server.
#[derive(Debug, Default)]
pub struct Config {
/// Request/Response timeout between packet delivery.
pub timeout: Option<Duration>,
}
/// Return type of rpc calls: either the successful return value, or a client error.
pub type Result<T> = ::std::result::Result<T, Error>;
trait Deserialize: Read + Sized {
fn deserialize<T: serde::Deserialize>(&mut self) -> Result<T> {
deserialize_from(self, SizeLimit::Infinite)
.map_err(Error::from)
deserialize_from(self, SizeLimit::Infinite).map_err(Error::from)
}
}
@@ -79,7 +86,7 @@ impl<W: Write> Serialize for W {}
#[cfg(test)]
mod test {
extern crate env_logger;
use super::{Client, Serve, serve_async};
use super::{Client, Config, Serve};
use scoped_pool::Pool;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
@@ -119,8 +126,8 @@ mod test {
fn handle() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr(), None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap();
drop(client);
serve_handle.shutdown();
}
@@ -129,10 +136,10 @@ mod test {
fn simple() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
assert_eq!(0, client.rpc(()).unwrap());
assert_eq!(1, server.count());
assert_eq!(1, client.rpc(()).unwrap());
@@ -172,9 +179,13 @@ mod test {
fn force_shutdown() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, Some(Duration::new(0, 10))).unwrap();
let serve_handle = server.spawn_with_config("localhost:0",
Config {
timeout: Some(Duration::new(0, 10))
})
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
thread.join().unwrap();
@@ -184,9 +195,13 @@ mod test {
fn client_failed_rpc() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap();
let serve_handle = server.spawn_with_config("localhost:0",
Config {
timeout: test_timeout(),
})
.unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr, None).unwrap());
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr).unwrap());
client.rpc(()).unwrap();
serve_handle.shutdown();
match client.rpc(()) {
@@ -202,13 +217,15 @@ mod test {
let concurrency = 10;
let pool = Pool::new(concurrency);
let server = Arc::new(BarrierServer::new(concurrency));
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
pool.scoped(|scope| {
for _ in 0..concurrency {
let client = client.try_clone().unwrap();
scope.execute(move || { client.rpc(()).unwrap(); });
scope.execute(move || {
client.rpc(()).unwrap();
});
}
});
assert_eq!(concurrency as u64, server.count());
@@ -220,9 +237,9 @@ mod test {
fn async() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
// Drop future immediately; does the reader channel panic when sending?
client.rpc_async(());

View File

@@ -19,10 +19,11 @@ impl<T: Serialize> Serialize for Packet<T> {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer
{
serializer.visit_struct(PACKET, MapVisitor {
value: self,
state: 0,
})
serializer.visit_struct(PACKET,
MapVisitor {
value: self,
state: 0,
})
}
}
@@ -31,7 +32,8 @@ struct MapVisitor<'a, T: 'a> {
state: u8,
}
impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
#[inline]
fn visit<S>(&mut self, serializer: &mut S) -> Result<Option<()>, S::Error>
where S: Serializer
{
@@ -44,14 +46,18 @@ impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
}
_ => {
Ok(None)
}
_ => Ok(None),
}
}
#[inline]
fn len(&self) -> Option<usize> {
Some(2)
}
}
impl<T: Deserialize> Deserialize for Packet<T> {
#[inline]
fn deserialize<D>(deserializer: &mut D) -> Result<Self, D::Error>
where D: Deserializer
{
@@ -65,6 +71,7 @@ struct Visitor<T>(PhantomData<T>);
impl<T: Deserialize> de::Visitor for Visitor<T> {
type Value = Packet<T>;
#[inline]
fn visit_seq<V>(&mut self, mut visitor: V) -> Result<Packet<T>, V::Error>
where V: de::SeqVisitor
{
@@ -91,7 +98,10 @@ fn serde() {
use bincode;
let _ = env_logger::init();
let packet = Packet { rpc_id: 1, message: () };
let packet = Packet {
rpc_id: 1,
message: (),
};
let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap();
let de = bincode::serde::deserialize(&ser);
assert_eq!(packet, de.unwrap());

View File

@@ -12,7 +12,7 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::{Deserialize, Error, Packet, Result, Serialize};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
struct ConnectionHandler<'a, S>
where S: Serve
@@ -44,7 +44,7 @@ impl<'a, S> ConnectionHandler<'a, S>
let reply = server.serve(message);
let reply_packet = Packet {
rpc_id: rpc_id,
message: reply
message: reply,
};
tx.send(reply_packet).expect(pos!());
});
@@ -55,8 +55,8 @@ impl<'a, S> ConnectionHandler<'a, S>
}
Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => {
if !shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: read timed out ({:?}). Server not \
shutdown, so retrying read.",
info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \
retrying read.",
err);
continue;
} else {
@@ -142,7 +142,9 @@ struct Server<'a, S: 'a> {
impl<'a, S: 'a> Server<'a, S>
where S: Serve + 'static
{
fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b {
fn serve<'b>(self, scope: &Scope<'b>)
where 'a: 'b
{
for conn in self.listener.incoming() {
match self.die_rx.try_recv() {
Ok(_) => {
@@ -169,7 +171,7 @@ impl<'a, S: 'a> Server<'a, S>
let read_conn = match conn.try_clone() {
Err(err) => {
error!("serve: could not clone tcp stream; possibly out of file descriptors? \
Err: {:?}",
Err: {:?}",
err);
continue;
}
@@ -199,41 +201,8 @@ impl<'a, S> Drop for Server<'a, S> {
}
}
/// Start
pub fn serve_async<A, S>(addr: A,
server: S,
read_timeout: Option<Duration>)
-> io::Result<ServeHandle>
where A: ToSocketAddrs,
S: 'static + Serve
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("serve_async: spinning up server on {:?}", addr);
let (die_tx, die_rx) = channel();
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &server,
listener: listener,
read_timeout: read_timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
})
}
/// A service provided by a server
pub trait Serve: Send + Sync {
pub trait Serve: Send + Sync + Sized {
/// The type of request received by the server
type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
/// The type of reply sent by the server
@@ -241,10 +210,49 @@ pub trait Serve: Send + Sync {
/// Return a reply for a given request
fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn
fn spawn<A>(self, addr: A) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
Self: 'static,
{
self.spawn_with_config(addr, Config::default())
}
/// spawn
fn spawn_with_config<A>(self, addr: A, config: Config) -> io::Result<ServeHandle>
where A: ToSocketAddrs,
Self: 'static,
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("spawn_with_config: spinning up server on {:?}", addr);
let (die_tx, die_rx) = channel();
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &self,
listener: listener,
read_timeout: config.timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
})
}
}
impl<P, S> Serve for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + ::std::ops::Deref<Target = S>,
S: Serve
{
type Request = S::Request;

View File

@@ -37,12 +37,12 @@ mod benchmark {
// Prevents resource exhaustion when benching
lazy_static! {
static ref HANDLE: Arc<Mutex<ServeHandle>> = {
let handle = serve("localhost:0", HelloServer, None).unwrap();
let handle = HelloServer.spawn("localhost:0").unwrap();
Arc::new(Mutex::new(handle))
};
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
let addr = HANDLE.lock().unwrap().local_addr().clone();
let client = AsyncClient::new(addr, None).unwrap();
let client = AsyncClient::new(addr).unwrap();
Arc::new(Mutex::new(client))
};
}