mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
119 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cf8e440f7 | ||
|
|
2e02f33fc4 | ||
|
|
d8472dcd1c | ||
|
|
2c5846621f | ||
|
|
6a6157948a | ||
|
|
1c18a3c4fe | ||
|
|
e8ec295e85 | ||
|
|
44eec09418 | ||
|
|
fe116a1b6b | ||
|
|
ec4fa8636b | ||
|
|
2d58340d16 | ||
|
|
801f09e9e6 | ||
|
|
6ce3a3d943 | ||
|
|
4d636d2882 | ||
|
|
3693c95a67 | ||
|
|
43a2df4a13 | ||
|
|
166f1523d6 | ||
|
|
1cc8cbcdc3 | ||
|
|
9dafc704e9 | ||
|
|
32e0b0d7f8 | ||
|
|
b87c52758e | ||
|
|
9235e12904 | ||
|
|
265fe56fa6 | ||
|
|
7b5b29a9c3 | ||
|
|
709f4ab1ac | ||
|
|
bbfb4325d2 | ||
|
|
f33cb3bd53 | ||
|
|
6a6832cfbc | ||
|
|
b0495ebc00 | ||
|
|
aec1574824 | ||
|
|
5d27d34bd3 | ||
|
|
fe978f2c56 | ||
|
|
44f472c65c | ||
|
|
e995acd4c9 | ||
|
|
e8fcf0e4de | ||
|
|
9dcd38c012 | ||
|
|
5ac4b710a5 | ||
|
|
2eb0b2cc83 | ||
|
|
72a9f8f70d | ||
|
|
8e5a44b423 | ||
|
|
714541a7a4 | ||
|
|
a1f529f794 | ||
|
|
a8766a9200 | ||
|
|
ef96c87226 | ||
|
|
3543b34f2b | ||
|
|
6273ebefa7 | ||
|
|
9827f75459 | ||
|
|
c398e2389b | ||
|
|
03dc512e25 | ||
|
|
8307c708a3 | ||
|
|
774411c636 | ||
|
|
d5b2f23f74 | ||
|
|
396aec3c2f | ||
|
|
28c6c333e5 | ||
|
|
2d1a77ec10 | ||
|
|
a0e6147482 | ||
|
|
fcdb0d9375 | ||
|
|
4c1d15f8ea | ||
|
|
ece1cc60b9 | ||
|
|
7d8a508379 | ||
|
|
9193357d60 | ||
|
|
b777e0bbf7 | ||
|
|
04624f054d | ||
|
|
f870f832a9 | ||
|
|
dc347021d4 | ||
|
|
5973e54f62 | ||
|
|
e5e5c5975c | ||
|
|
6bb3691a30 | ||
|
|
e2f1511fb3 | ||
|
|
99ba380825 | ||
|
|
39235343d6 | ||
|
|
f3afd080f3 | ||
|
|
043d0a1c21 | ||
|
|
be4caeebe8 | ||
|
|
06a2cab31c | ||
|
|
934c51f4ab | ||
|
|
cc8a8e76b0 | ||
|
|
b9ba10b8a4 | ||
|
|
1ee1f9274a | ||
|
|
7f354be850 | ||
|
|
c9a63c2a5a | ||
|
|
ee1143c709 | ||
|
|
4ed127b39e | ||
|
|
66cd136c6a | ||
|
|
58cbe6f4ea | ||
|
|
250a7fd7b9 | ||
|
|
a44fd808d9 | ||
|
|
65c4d83c88 | ||
|
|
00692fe9a3 | ||
|
|
0968760ef7 | ||
|
|
75b2c00b54 | ||
|
|
ffee124526 | ||
|
|
06a03697c4 | ||
|
|
a675551a31 | ||
|
|
d0e9693263 | ||
|
|
6d23174219 | ||
|
|
a06b583334 | ||
|
|
937e9c2c43 | ||
|
|
54883d6354 | ||
|
|
86b1470832 | ||
|
|
82762583be | ||
|
|
3462451256 | ||
|
|
17d800b8a8 | ||
|
|
403eba201b | ||
|
|
f2328d200e | ||
|
|
51e6bac2dc | ||
|
|
f3fcbbb8d2 | ||
|
|
05acb97f04 | ||
|
|
07c052a1c1 | ||
|
|
34cf0c8172 | ||
|
|
7b196400b8 | ||
|
|
1f30bb9ba6 | ||
|
|
e2756edd72 | ||
|
|
8957d2dac3 | ||
|
|
21e5734ef7 | ||
|
|
dddeca19a1 | ||
|
|
a9b86280b5 | ||
|
|
7dae99d7b5 | ||
|
|
9dd3d55744 |
16
.travis.yml
16
.travis.yml
@@ -6,6 +6,13 @@ rust:
|
||||
- beta
|
||||
- nightly
|
||||
|
||||
addons:
|
||||
apt:
|
||||
packages:
|
||||
- libcurl4-openssl-dev
|
||||
- libelf-dev
|
||||
- libdw-dev
|
||||
|
||||
before_script:
|
||||
- |
|
||||
pip install 'travis-cargo<0.2' --user &&
|
||||
@@ -13,8 +20,13 @@ before_script:
|
||||
|
||||
script:
|
||||
- |
|
||||
(cd tarpc && cargo build) &&
|
||||
(cd tarpc && cargo test)
|
||||
(cd tarpc && travis-cargo build) &&
|
||||
(cd tarpc && travis-cargo test)
|
||||
|
||||
after_success:
|
||||
- (cd tarpc && travis-cargo coveralls --no-sudo)
|
||||
|
||||
env:
|
||||
global:
|
||||
# override the default `--features unstable` used for the nightly branch
|
||||
- TRAVIS_CARGO_NIGHTLY_FEATURE=""
|
||||
|
||||
49
README.md
49
README.md
@@ -6,8 +6,27 @@
|
||||
|
||||
*Disclaimer*: This is not an official Google product.
|
||||
|
||||
tarpc is an RPC framework for rust with a focus on ease of use. Defining and implementing an echo-like server can be done in just a few lines of code:
|
||||
tarpc is an RPC framework for rust with a focus on ease of use. Defining a service can be done in
|
||||
just a few lines of code, and most of the boilerplate of writing a server is taken care of for you.
|
||||
|
||||
[Documentation](https://google.github.io/tarpc)
|
||||
|
||||
## What is an RPC framework?
|
||||
"RPC" stands for "Remote Procedure Call," a function call where the work of producing the return
|
||||
value is being done somewhere else. When an rpc function is invoked, behind the scenes the function
|
||||
contacts some other process somewhere and asks them to compute the function instead. The original
|
||||
function then returns the value produced by that other server.
|
||||
|
||||
[More information](https://www.cs.cf.ac.uk/Dave/C/node33.html)
|
||||
|
||||
## Usage
|
||||
Add to your `Cargo.toml` dependencies:
|
||||
|
||||
```toml
|
||||
tarpc = "0.6"
|
||||
```
|
||||
|
||||
## Example
|
||||
```rust
|
||||
#[macro_use]
|
||||
extern crate tarpc;
|
||||
@@ -17,18 +36,20 @@ mod hello_service {
|
||||
rpc hello(name: String) -> String;
|
||||
}
|
||||
}
|
||||
use hello_service::Service as HelloService;
|
||||
|
||||
struct HelloService;
|
||||
impl hello_service::Service for HelloService {
|
||||
struct HelloServer;
|
||||
impl HelloService for HelloServer {
|
||||
fn hello(&self, name: String) -> String {
|
||||
format!("Hello, {}!", s)
|
||||
format!("Hello, {}!", name)
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap();
|
||||
let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap();
|
||||
assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap());
|
||||
let addr = "localhost:10000";
|
||||
let server_handle = HelloServer.spawn(addr).unwrap();
|
||||
let client = hello_service::Client::new(addr).unwrap();
|
||||
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
|
||||
drop(client);
|
||||
server_handle.shutdown();
|
||||
}
|
||||
@@ -36,14 +57,20 @@ fn main() {
|
||||
|
||||
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
|
||||
above example, the macro is called within the `hello_service` module. This module will contain a
|
||||
`Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server
|
||||
listening on a tcp port. A `Client` can connect to such a service. Any type implementing the
|
||||
`Service` trait can be passed to `serve`. These generated types are specific to the echo service,
|
||||
and make it easy and ergonomic to write servers without dealing with sockets or serialization
|
||||
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for
|
||||
starting the service: `spawn` and `spawn_with_config`, which start the service listening over an
|
||||
arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated
|
||||
types make it easy and ergonomic to write servers without dealing with sockets or serialization
|
||||
directly. See the tarpc_examples package for more sophisticated examples.
|
||||
|
||||
## Documentation
|
||||
Use `cargo doc` as you normally would to see the documentation created for all
|
||||
items expanded by a `service!` invocation.
|
||||
|
||||
## Additional Features
|
||||
- Connect over any transport that `impl`s the `Transport` trait.
|
||||
- Concurrent requests from a single client.
|
||||
- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures.
|
||||
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
|
||||
methods as well as on the `Client`'s stub methods.
|
||||
- Just like regular fns, the return type can be left off when it's `-> ()`.
|
||||
|
||||
37
RELEASES.md
Normal file
37
RELEASES.md
Normal file
@@ -0,0 +1,37 @@
|
||||
## 0.6 (20216-08-07)
|
||||
|
||||
### Breaking Changes
|
||||
* Updated serde to 0.8. Requires dependents to update as well.
|
||||
|
||||
## 0.5 (2016-04-24)
|
||||
|
||||
### Breaking Changes
|
||||
0.5 adds support for arbitrary transports via the
|
||||
[`Transport`](tarpc/src/transport/mod.rs#L7) trait.
|
||||
Out of the box tarpc provides implementations for:
|
||||
|
||||
* Tcp, for types `impl`ing `ToSocketAddrs`.
|
||||
* Unix sockets via the `UnixTransport` type.
|
||||
|
||||
This was a breaking change: `handler.local_addr()` was renamed
|
||||
`handler.dialer()`.
|
||||
|
||||
## 0.4 (2016-04-02)
|
||||
|
||||
### Breaking Changes
|
||||
* Updated to the latest version of serde, 0.7.0. Because tarpc exposes serde in
|
||||
its API, this forces downstream code to update to the latest version of
|
||||
serde, as well.
|
||||
|
||||
## 0.3 (2016-02-20)
|
||||
|
||||
### Breaking Changes
|
||||
* The timeout arg to `serve` was replaced with a `Config` struct, which
|
||||
currently only contains one field, but will be expanded in the future
|
||||
to allow configuring serialization protocol, and other things.
|
||||
* `serve` was changed to be a default method on the generated `Service` traits,
|
||||
and it was renamed `spawn_with_config`. A second `default fn` was added:
|
||||
`spawn`, which takes no `Config` arg.
|
||||
|
||||
### Other Changes
|
||||
* Expanded items will no longer generate unused warnings.
|
||||
122
hooks/pre-commit
122
hooks/pre-commit
@@ -1,18 +1,41 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright 2016 Google Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
# This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#!/bin/sh
|
||||
#
|
||||
# An example hook script to verify what is about to be committed.
|
||||
# Called by "git commit" with no arguments. The hook should
|
||||
# exit with non-zero status after issuing an appropriate message if
|
||||
# it wants to stop the commit.
|
||||
# Pre-commit hook for the tarpc repository. To use this hook, copy it to .git/hooks in your
|
||||
# repository root.
|
||||
#
|
||||
# To enable this hook, rename this file to "pre-commit".
|
||||
# This precommit checks the following:
|
||||
# 1. All filenames are ascii
|
||||
# 2. There is no bad whitespace
|
||||
# 3. rustfmt is installed
|
||||
# 4. rustfmt is a noop on files that are in the index
|
||||
#
|
||||
# Options:
|
||||
#
|
||||
# - TARPC_SKIP_RUSTFMT, default = 0
|
||||
#
|
||||
# Set this to 1 to skip running rustfmt
|
||||
#
|
||||
# Note that these options are most useful for testing the hooks themselves. Use git commit
|
||||
# --no-verify to skip the pre-commit hook altogether.
|
||||
|
||||
if git rev-parse --verify HEAD >/dev/null 2>&1
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[0;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
PREFIX="${GREEN}[PRECOMMIT]${NC}"
|
||||
FAILURE="${RED}FAILED${NC}"
|
||||
WARNING="${RED}[WARNING]${NC}"
|
||||
SKIPPED="${YELLOW}SKIPPED${NC}"
|
||||
SUCCESS="${GREEN}ok${NC}"
|
||||
|
||||
if git rev-parse --verify HEAD &>/dev/null
|
||||
then
|
||||
against=HEAD
|
||||
else
|
||||
@@ -20,35 +43,70 @@ else
|
||||
against=4b825dc642cb6eb9a060e54bf8d69288fbee4904
|
||||
fi
|
||||
|
||||
# If you want to allow non-ASCII filenames set this variable to true.
|
||||
allownonascii=$(git config --bool hooks.allownonascii)
|
||||
FAILED=0
|
||||
|
||||
# Redirect output to stderr.
|
||||
exec 1>&2
|
||||
|
||||
# Cross platform projects tend to avoid non-ASCII filenames; prevent
|
||||
# them from being added to the repository. We exploit the fact that the
|
||||
# printable range starts at the space character and ends with tilde.
|
||||
if [ "$allownonascii" != "true" ] &&
|
||||
# Note that the use of brackets around a tr range is ok here, (it's
|
||||
# even required, for portability to Solaris 10's /usr/bin/tr), since
|
||||
# the square bracket bytes happen to fall in the designated range.
|
||||
test $(git diff --cached --name-only --diff-filter=A -z $against |
|
||||
LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
|
||||
printf "${PREFIX} Checking that all filenames are ascii ... "
|
||||
# Note that the use of brackets around a tr range is ok here, (it's
|
||||
# even required, for portability to Solaris 10's /usr/bin/tr), since
|
||||
# the square bracket bytes happen to fall in the designated range.
|
||||
if test $(git diff --cached --name-only --diff-filter=A -z $against | LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
|
||||
then
|
||||
cat <<\EOF
|
||||
Error: Attempt to add a non-ASCII file name.
|
||||
FAILED=1
|
||||
printf "${FAILURE}\n"
|
||||
else
|
||||
printf "${SUCCESS}\n"
|
||||
fi
|
||||
|
||||
This can cause problems if you want to work with people on other platforms.
|
||||
printf "${PREFIX} Checking for bad whitespace ... "
|
||||
git diff-index --check --cached $against -- &>/dev/null
|
||||
if [ "$?" != 0 ]; then
|
||||
FAILED=1
|
||||
printf "${FAILURE}\n"
|
||||
else
|
||||
printf "${SUCCESS}\n"
|
||||
fi
|
||||
|
||||
To be portable it is advisable to rename the file.
|
||||
|
||||
If you know what you are doing you can disable this check using:
|
||||
|
||||
git config hooks.allownonascii true
|
||||
EOF
|
||||
printf "${PREFIX} Checking for rustfmt ... "
|
||||
command -v rustfmt &>/dev/null
|
||||
if [ $? == 0 ]; then
|
||||
printf "${SUCCESS}\n"
|
||||
else
|
||||
printf "${FAILURE}\n"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# If there are whitespace errors, print the offending file names and fail.
|
||||
exec git diff-index --check --cached $against --
|
||||
printf "${PREFIX} Checking for shasum ... "
|
||||
command -v shasum &>/dev/null
|
||||
if [ $? == 0 ]; then
|
||||
printf "${SUCCESS}\n"
|
||||
else
|
||||
printf "${FAILURE}\n"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Just check that running rustfmt doesn't do anything to the file. I do this instead of
|
||||
# modifying the file because I don't want to mess with the developer's index, which may
|
||||
# not only contain discrete files.
|
||||
printf "${PREFIX} Checking formatting ... "
|
||||
FMTRESULT=0
|
||||
for file in $(git diff --name-only --cached);
|
||||
do
|
||||
if [ ${file: -3} == ".rs" ]; then
|
||||
diff=$(rustfmt --skip-children --write-mode=diff $file)
|
||||
if grep --quiet "^Diff at line" <<< "$diff"; then
|
||||
FMTRESULT=1
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then
|
||||
printf "${SKIPPED}\n"$?
|
||||
elif [ ${FMTRESULT} != 0 ]; then
|
||||
FAILED=1
|
||||
printf "${FAILURE}\n"
|
||||
echo "$diff" | sed '/Using rustfmt.*$/d'
|
||||
else
|
||||
printf "${SUCCESS}\n"
|
||||
fi
|
||||
|
||||
exit ${FAILED}
|
||||
|
||||
136
hooks/pre-push
136
hooks/pre-push
@@ -1,39 +1,127 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright 2016 Google Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
# This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#!/bin/sh
|
||||
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
|
||||
# root.
|
||||
#
|
||||
# This hook runs tests to make sure only working code is being pushed. If present, rustup is used
|
||||
# to build and test the code on the appropriate toolchains. The working copy must not contain
|
||||
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
|
||||
#
|
||||
# Options:
|
||||
#
|
||||
# - TARPC_ALLOW_DIRTY, default = 0
|
||||
#
|
||||
# Setting this variable to 1 will run tests even though there are code changes in the working
|
||||
# copy. Set to 0 by default, since the intent is to test the code that's being pushed, not changes
|
||||
# still in the working copy.
|
||||
#
|
||||
# - TARPC_USE_CURRENT_TOOLCHAIN, default = 0
|
||||
#
|
||||
# Setting this variable to 1 will just run cargo build and cargo test, rather than running
|
||||
# stable/beta/nightly.
|
||||
#
|
||||
# Note that these options are most useful for testing the hooks themselves. Use git push --no-verify
|
||||
# to skip the pre-push hook altogether.
|
||||
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[0;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
PREFIX="${GREEN}[PREPUSH]${NC}"
|
||||
FAILURE="${RED}FAILED${NC}"
|
||||
WARNING="${YELLOW}[WARNING]${NC}"
|
||||
SKIPPED="${YELLOW}SKIPPED${NC}"
|
||||
SUCCESS="${GREEN}ok${NC}"
|
||||
|
||||
printf "${PREFIX} Clean working copy ... "
|
||||
git diff --exit-code &>/dev/null
|
||||
if [ "$?" != 0 ];
|
||||
then
|
||||
echo ${RED}ERROR${NC} You have uncommitted changes please commit or stash them before pushing so that I can run tests!
|
||||
exit 1
|
||||
fi
|
||||
|
||||
printf "${YELLOW}[PRESUBMIT]${NC} Running tests ... "
|
||||
|
||||
TEST_RESULT=0
|
||||
cargo test --manifest-path tarpc/Cargo.toml &>/dev/null
|
||||
if [ "$?" != "0" ];
|
||||
then
|
||||
printf "${RED}FAILED${NC}"
|
||||
TEST_RESULT=1
|
||||
if [ "$?" == 0 ]; then
|
||||
printf "${SUCCESS}\n"
|
||||
else
|
||||
printf "${GREEN}ok${NC}"
|
||||
fi
|
||||
printf "\n"
|
||||
|
||||
RESULT=0
|
||||
if [ "$TEST_RESULT" == "1" ];
|
||||
then
|
||||
RESULT=1
|
||||
if [ "${TARPC_ALLOW_DIRTY}" == "1" ]
|
||||
then
|
||||
printf "${SKIPPED}\n"
|
||||
else
|
||||
printf "${FAILURE}\n"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
exit $RESULT
|
||||
PREPUSH_RESULT=0
|
||||
|
||||
# args:
|
||||
# 1 - cargo command to run (build/test)
|
||||
# 2 - directory name of crate to build
|
||||
# 3 - rust toolchain (nightly/stable/beta)
|
||||
run_cargo() {
|
||||
if [ "$1" == "build" ]; then
|
||||
VERB=Building
|
||||
else
|
||||
VERB=Testing
|
||||
fi
|
||||
if [ "$3" != "" ]; then
|
||||
printf "${PREFIX} $VERB $2 on $3 ... "
|
||||
rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
||||
else
|
||||
printf "${PREFIX} $VERB $2 ... "
|
||||
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
||||
fi
|
||||
if [ "$?" != "0" ]; then
|
||||
printf "${FAILURE}\n"
|
||||
PREPUSH_RESULT=1
|
||||
else
|
||||
printf "${SUCCESS}\n"
|
||||
fi
|
||||
}
|
||||
|
||||
TOOLCHAIN_RESULT=0
|
||||
check_toolchain() {
|
||||
printf "${PREFIX} Checking for $1 toolchain ... "
|
||||
if [[ $(rustup toolchain list) =~ $1 ]]; then
|
||||
printf "${SUCCESS}\n"
|
||||
else
|
||||
TOOLCHAIN_RESULT=1
|
||||
PREPUSH_RESULT=1
|
||||
printf "${FAILURE}\n"
|
||||
fi
|
||||
}
|
||||
|
||||
printf "${PREFIX} Checking for rustup ... "
|
||||
command -v rustup &>/dev/null
|
||||
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
|
||||
printf "${SUCCESS}\n"
|
||||
|
||||
check_toolchain stable
|
||||
check_toolchain beta
|
||||
check_toolchain nightly
|
||||
if [ ${TOOLCHAIN_RESULT} == 1 ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
run_cargo build tarpc stable
|
||||
run_cargo build tarpc_examples stable
|
||||
|
||||
run_cargo build tarpc beta
|
||||
run_cargo build tarpc_examples beta
|
||||
|
||||
run_cargo build tarpc nightly
|
||||
run_cargo build tarpc_examples nightly
|
||||
|
||||
# We still rely on some nightly stuff for tests
|
||||
run_cargo test tarpc nightly
|
||||
run_cargo test tarpc_examples nightly
|
||||
else
|
||||
printf "${YELLOW}NOT FOUND${NC}\n"
|
||||
printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n"
|
||||
|
||||
run_cargo test tarpc
|
||||
run_cargo test tarpc_examples
|
||||
fi
|
||||
|
||||
exit $PREPUSH_RESULT
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tarpc"
|
||||
version = "0.2.0"
|
||||
version = "0.6.0"
|
||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||
license = "MIT"
|
||||
documentation = "https://google.github.io/tarpc"
|
||||
@@ -11,11 +11,13 @@ readme = "../README.md"
|
||||
description = "An RPC framework for Rust with a focus on ease of use."
|
||||
|
||||
[dependencies]
|
||||
bincode = "^0.4.0"
|
||||
log = "^0.3.5"
|
||||
scoped-pool = "^0.1.4"
|
||||
serde = "^0.6.13"
|
||||
bincode = "0.6"
|
||||
log = "0.3"
|
||||
scoped-pool = "1.0"
|
||||
serde = "0.8"
|
||||
unix_socket = "0.5"
|
||||
|
||||
[dev-dependencies]
|
||||
lazy_static = "^0.1.15"
|
||||
env_logger = "^0.3.2"
|
||||
lazy_static = "0.2"
|
||||
env_logger = "0.3"
|
||||
tempdir = "0.3"
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
# Copyright 2016 Google Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
|
||||
# This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
#!/bin/sh
|
||||
ln -s ../../hooks/pre-commit .git/hooks/pre-commit
|
||||
ln -s ../../hooks/pre-push .git/hooks/pre-push
|
||||
@@ -1 +1,2 @@
|
||||
ideal_width = 100
|
||||
reorder_imports = true
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
//! Example usage:
|
||||
//!
|
||||
//! ```
|
||||
//! # #[macro_use] extern crate tarpc;
|
||||
//! #[macro_use] extern crate tarpc;
|
||||
//! mod my_server {
|
||||
//! service! {
|
||||
//! rpc hello(name: String) -> String;
|
||||
@@ -30,17 +30,13 @@
|
||||
//! }
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let addr = "127.0.0.1:9000";
|
||||
//! let shutdown = my_server::serve(addr,
|
||||
//! Server,
|
||||
//! Some(Duration::from_secs(30)))
|
||||
//! .unwrap();
|
||||
//! let client = Client::new(addr, None).unwrap();
|
||||
//! let serve_handle = Server.spawn("localhost:0").unwrap();
|
||||
//! let client = Client::new(serve_handle.dialer()).unwrap();
|
||||
//! assert_eq!(3, client.add(1, 2).unwrap());
|
||||
//! assert_eq!("Hello, Mom!".to_string(),
|
||||
//! client.hello("Mom".to_string()).unwrap());
|
||||
//! drop(client);
|
||||
//! shutdown.shutdown();
|
||||
//! serve_handle.shutdown();
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
@@ -51,6 +47,7 @@ extern crate bincode;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate scoped_pool;
|
||||
extern crate unix_socket;
|
||||
|
||||
macro_rules! pos {
|
||||
() => (concat!(file!(), ":", line!()))
|
||||
@@ -63,4 +60,7 @@ pub mod protocol;
|
||||
/// Provides the macro used for constructing rpc services and client stubs.
|
||||
pub mod macros;
|
||||
|
||||
pub use protocol::{Error, Result, ServeHandle};
|
||||
/// Provides transport traits and implementations.
|
||||
pub mod transport;
|
||||
|
||||
pub use protocol::{Config, Error, Result, ServeHandle};
|
||||
|
||||
@@ -8,7 +8,7 @@ pub mod serde {
|
||||
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
/// Deserialization re-exports required by macros. Not for general use.
|
||||
pub mod de {
|
||||
pub use serde::de::{EnumVisitor, Error, Visitor, VariantVisitor};
|
||||
pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ macro_rules! client_methods {
|
||||
{ $(#[$attr:meta])* }
|
||||
$fn_name:ident( ($($arg:ident,)*) : ($($in_:ty,)*) ) -> $out:ty
|
||||
) => (
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
|
||||
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
|
||||
@@ -32,13 +33,17 @@ macro_rules! client_methods {
|
||||
{ $(#[$attr:meta])* }
|
||||
$fn_name:ident( ($( $arg:ident,)*) : ($($in_:ty, )*) ) -> $out:ty
|
||||
)*) => ( $(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
|
||||
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
|
||||
if let __Reply::$fn_name(reply) = reply {
|
||||
::std::result::Result::Ok(reply)
|
||||
} else {
|
||||
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
|
||||
panic!("Incorrect reply variant returned from rpc; expected `{}`, \
|
||||
but got {:?}",
|
||||
stringify!($fn_name),
|
||||
reply);
|
||||
}
|
||||
}
|
||||
)*);
|
||||
@@ -53,6 +58,7 @@ macro_rules! async_client_methods {
|
||||
{ $(#[$attr:meta])* }
|
||||
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
|
||||
) => (
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
|
||||
fn mapper(reply: __Reply) -> $out {
|
||||
@@ -70,13 +76,17 @@ macro_rules! async_client_methods {
|
||||
{ $(#[$attr:meta])* }
|
||||
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
|
||||
)*) => ( $(
|
||||
#[allow(unused)]
|
||||
$(#[$attr])*
|
||||
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
|
||||
fn mapper(reply: __Reply) -> $out {
|
||||
if let __Reply::$fn_name(reply) = reply {
|
||||
reply
|
||||
} else {
|
||||
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
|
||||
panic!("Incorrect reply variant returned from rpc; expected `{}`, but got \
|
||||
{:?}",
|
||||
stringify!($fn_name),
|
||||
reply);
|
||||
}
|
||||
}
|
||||
let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*)));
|
||||
@@ -100,7 +110,7 @@ macro_rules! impl_serialize {
|
||||
match *self {
|
||||
$(
|
||||
$impler::$name(ref field) =>
|
||||
$crate::macros::serde::Serializer::visit_newtype_variant(
|
||||
$crate::macros::serde::Serializer::serialize_newtype_variant(
|
||||
serializer,
|
||||
stringify!($impler),
|
||||
$n,
|
||||
@@ -130,7 +140,7 @@ macro_rules! impl_deserialize {
|
||||
-> ::std::result::Result<$impler, D::Error>
|
||||
where D: $crate::macros::serde::Deserializer
|
||||
{
|
||||
#[allow(non_camel_case_types)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
enum __Field {
|
||||
$($name),*
|
||||
}
|
||||
@@ -144,6 +154,7 @@ macro_rules! impl_deserialize {
|
||||
impl $crate::macros::serde::de::Visitor for __FieldVisitor {
|
||||
type Value = __Field;
|
||||
|
||||
#[inline]
|
||||
fn visit_usize<E>(&mut self, value: usize)
|
||||
-> ::std::result::Result<__Field, E>
|
||||
where E: $crate::macros::serde::de::Error,
|
||||
@@ -154,11 +165,12 @@ macro_rules! impl_deserialize {
|
||||
}
|
||||
)*
|
||||
return ::std::result::Result::Err(
|
||||
$crate::macros::serde::de::Error::syntax("expected a field")
|
||||
$crate::macros::serde::de::Error::custom(
|
||||
format!("No variants have a value of {}!", value))
|
||||
);
|
||||
}
|
||||
}
|
||||
deserializer.visit_struct_field(__FieldVisitor)
|
||||
deserializer.deserialize_struct_field(__FieldVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +178,7 @@ macro_rules! impl_deserialize {
|
||||
impl $crate::macros::serde::de::EnumVisitor for __Visitor {
|
||||
type Value = $impler;
|
||||
|
||||
#[inline]
|
||||
fn visit<__V>(&mut self, mut visitor: __V)
|
||||
-> ::std::result::Result<$impler, __V::Error>
|
||||
where __V: $crate::macros::serde::de::VariantVisitor
|
||||
@@ -185,7 +198,7 @@ macro_rules! impl_deserialize {
|
||||
stringify!($name)
|
||||
),*
|
||||
];
|
||||
deserializer.visit_enum(stringify!($impler), VARIANTS, __Visitor)
|
||||
deserializer.deserialize_enum(stringify!($impler), VARIANTS, __Visitor)
|
||||
}
|
||||
}
|
||||
);
|
||||
@@ -210,17 +223,22 @@ macro_rules! impl_deserialize {
|
||||
/// # }
|
||||
/// ```
|
||||
///
|
||||
/// There are two rpc names reserved for the default fns `spawn` and `spawn_with_config`.
|
||||
///
|
||||
/// Attributes can be attached to each rpc. These attributes
|
||||
/// will then be attached to the generated `Service` trait's
|
||||
/// corresponding method, as well as to the `Client` stub's rpcs methods.
|
||||
///
|
||||
/// The following items are expanded in the enclosing module:
|
||||
///
|
||||
/// * `Service` -- the trait defining the RPC service
|
||||
/// * `Service` -- the trait defining the RPC service. It comes with two default methods for
|
||||
/// starting the server:
|
||||
/// 1. `spawn` starts the service in another thread using default configuration.
|
||||
/// 2. `spawn_with_config` starts the service in another thread using the specified
|
||||
/// `Config`.
|
||||
/// * `Client` -- a client that makes synchronous requests to the RPC server
|
||||
/// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server
|
||||
/// * `Future` -- a handle for asynchronously retrieving the result of an RPC
|
||||
/// * `serve` -- the function that starts the RPC server
|
||||
///
|
||||
/// **Warning**: In addition to the above items, there are a few expanded items that
|
||||
/// are considered implementation details. As with the above items, shadowing
|
||||
@@ -232,19 +250,21 @@ macro_rules! impl_deserialize {
|
||||
/// * `__Reply` -- an implementation detail
|
||||
#[macro_export]
|
||||
macro_rules! service {
|
||||
// Entry point
|
||||
(
|
||||
$( $tokens:tt )*
|
||||
$(
|
||||
$(#[$attr:meta])*
|
||||
rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) $(-> $out:ty)*;
|
||||
)*
|
||||
) => {
|
||||
service_inner! {{
|
||||
$( $tokens )*
|
||||
service! {{
|
||||
$(
|
||||
$(#[$attr])*
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) $(-> $out)*;
|
||||
)*
|
||||
}}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[macro_export]
|
||||
macro_rules! service_inner {
|
||||
// Pattern for when the next rpc has an implicit unit return type
|
||||
};
|
||||
// Pattern for when the next rpc has an implicit unit return type
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -254,7 +274,7 @@ macro_rules! service_inner {
|
||||
}
|
||||
$( $expanded:tt )*
|
||||
) => {
|
||||
service_inner! {
|
||||
service! {
|
||||
{ $( $unexpanded )* }
|
||||
|
||||
$( $expanded )*
|
||||
@@ -263,7 +283,7 @@ macro_rules! service_inner {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
|
||||
}
|
||||
};
|
||||
// Pattern for when the next rpc has an explicit return type
|
||||
// Pattern for when the next rpc has an explicit return type
|
||||
(
|
||||
{
|
||||
$(#[$attr:meta])*
|
||||
@@ -273,7 +293,7 @@ macro_rules! service_inner {
|
||||
}
|
||||
$( $expanded:tt )*
|
||||
) => {
|
||||
service_inner! {
|
||||
service! {
|
||||
{ $( $unexpanded )* }
|
||||
|
||||
$( $expanded )*
|
||||
@@ -282,7 +302,7 @@ macro_rules! service_inner {
|
||||
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
|
||||
}
|
||||
};
|
||||
// Pattern when all return types have been expanded
|
||||
// Pattern for when all return types have been expanded
|
||||
(
|
||||
{ } // none left to expand
|
||||
$(
|
||||
@@ -291,15 +311,43 @@ macro_rules! service_inner {
|
||||
)*
|
||||
) => {
|
||||
#[doc="Defines the RPC service"]
|
||||
pub trait Service: Send + Sync {
|
||||
pub trait Service: Send + Sync + Sized {
|
||||
$(
|
||||
$(#[$attr])*
|
||||
fn $fn_name(&self, $($arg:$in_),*) -> $out;
|
||||
)*
|
||||
|
||||
#[doc="Spawn a running service."]
|
||||
fn spawn<T>(self,
|
||||
transport: T)
|
||||
-> $crate::Result<
|
||||
$crate::protocol::ServeHandle<
|
||||
<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||
where T: $crate::transport::Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
self.spawn_with_config(transport, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[doc="Spawn a running service."]
|
||||
fn spawn_with_config<T>(self,
|
||||
transport: T,
|
||||
config: $crate::Config)
|
||||
-> $crate::Result<
|
||||
$crate::protocol::ServeHandle<
|
||||
<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||
where T: $crate::transport::Transport,
|
||||
Self: 'static,
|
||||
{
|
||||
let server = __Server(self);
|
||||
let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
|
||||
let handle = try!(result);
|
||||
::std::result::Result::Ok(handle)
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, S> Service for P
|
||||
where P: Send + Sync + ::std::ops::Deref<Target=S>,
|
||||
where P: Send + Sync + Sized + 'static + ::std::ops::Deref<Target=S>,
|
||||
S: Service
|
||||
{
|
||||
$(
|
||||
@@ -310,7 +358,7 @@ macro_rules! service_inner {
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __Request {
|
||||
$(
|
||||
@@ -321,7 +369,7 @@ macro_rules! service_inner {
|
||||
impl_serialize!(__Request, $($fn_name(($($in_),*)))*);
|
||||
impl_deserialize!(__Request, $($fn_name(($($in_),*)))*);
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
#[allow(non_camel_case_types, unused)]
|
||||
#[derive(Debug)]
|
||||
enum __Reply {
|
||||
$(
|
||||
@@ -332,29 +380,46 @@ macro_rules! service_inner {
|
||||
impl_serialize!(__Reply, $($fn_name($out))*);
|
||||
impl_deserialize!(__Reply, $($fn_name($out))*);
|
||||
|
||||
/// An asynchronous RPC call
|
||||
#[allow(unused)]
|
||||
#[doc="An asynchronous RPC call"]
|
||||
pub struct Future<T> {
|
||||
future: $crate::protocol::Future<__Reply>,
|
||||
mapper: fn(__Reply) -> T,
|
||||
}
|
||||
|
||||
impl<T> Future<T> {
|
||||
/// Block until the result of the RPC call is available
|
||||
#[allow(unused)]
|
||||
#[doc="Block until the result of the RPC call is available"]
|
||||
pub fn get(self) -> $crate::Result<T> {
|
||||
self.future.get().map(self.mapper)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="The client stub that makes RPC calls to the server."]
|
||||
pub struct Client($crate::protocol::Client<__Request, __Reply>);
|
||||
pub struct Client<S = ::std::net::TcpStream>(
|
||||
$crate::protocol::Client<__Request, __Reply, S>
|
||||
) where S: $crate::transport::Stream;
|
||||
|
||||
impl Client {
|
||||
#[doc="Create a new client that connects to the given address."]
|
||||
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
|
||||
-> $crate::Result<Self>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
impl<S> Client<S>
|
||||
where S: $crate::transport::Stream
|
||||
{
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new client with default configuration that connects to the given \
|
||||
address."]
|
||||
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
let inner = try!($crate::protocol::Client::new(addr, timeout));
|
||||
Self::with_config(dialer, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new client with the specified configuration that connects to the \
|
||||
given address."]
|
||||
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
let inner = try!($crate::protocol::Client::with_config(dialer, config));
|
||||
::std::result::Result::Ok(Client(inner))
|
||||
}
|
||||
|
||||
@@ -365,6 +430,7 @@ macro_rules! service_inner {
|
||||
)*
|
||||
);
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
|
||||
clone fails."]
|
||||
pub fn try_clone(&self) -> ::std::io::Result<Self> {
|
||||
@@ -372,16 +438,29 @@ macro_rules! service_inner {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="The client stub that makes asynchronous RPC calls to the server."]
|
||||
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
|
||||
pub struct AsyncClient<S = ::std::net::TcpStream>(
|
||||
$crate::protocol::Client<__Request, __Reply, S>
|
||||
) where S: $crate::transport::Stream;
|
||||
|
||||
impl AsyncClient {
|
||||
#[doc="Create a new asynchronous client that connects to the given address."]
|
||||
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
|
||||
-> $crate::Result<Self>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
impl<S> AsyncClient<S>
|
||||
where S: $crate::transport::Stream {
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new asynchronous client with default configuration that connects to \
|
||||
the given address."]
|
||||
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
let inner = try!($crate::protocol::Client::new(addr, timeout));
|
||||
Self::with_config(dialer, $crate::Config::default())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Create a new asynchronous client that connects to the given address."]
|
||||
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
|
||||
where D: $crate::transport::Dialer<Stream=S>,
|
||||
{
|
||||
let inner = try!($crate::protocol::Client::with_config(dialer, config));
|
||||
::std::result::Result::Ok(AsyncClient(inner))
|
||||
}
|
||||
|
||||
@@ -392,6 +471,7 @@ macro_rules! service_inner {
|
||||
)*
|
||||
);
|
||||
|
||||
#[allow(unused)]
|
||||
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
|
||||
clone fails."]
|
||||
pub fn try_clone(&self) -> ::std::io::Result<Self> {
|
||||
@@ -399,7 +479,9 @@ macro_rules! service_inner {
|
||||
}
|
||||
}
|
||||
|
||||
struct __Server<S: 'static + Service>(S);
|
||||
#[allow(unused)]
|
||||
struct __Server<S>(S)
|
||||
where S: 'static + Service;
|
||||
|
||||
impl<S> $crate::protocol::Serve for __Server<S>
|
||||
where S: 'static + Service
|
||||
@@ -415,22 +497,11 @@ macro_rules! service_inner {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="Start a running service."]
|
||||
pub fn serve<A, S>(addr: A,
|
||||
service: S,
|
||||
read_timeout: ::std::option::Option<::std::time::Duration>)
|
||||
-> $crate::Result<$crate::protocol::ServeHandle>
|
||||
where A: ::std::net::ToSocketAddrs,
|
||||
S: 'static + Service
|
||||
{
|
||||
let server = ::std::sync::Arc::new(__Server(service));
|
||||
::std::result::Result::Ok(try!($crate::protocol::serve_async(addr, server, read_timeout)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // because we're just testing that the macro expansion compiles
|
||||
#[allow(dead_code)]
|
||||
// because we're just testing that the macro expansion compiles
|
||||
#[cfg(test)]
|
||||
mod syntax_test {
|
||||
// Tests a service definition with a fn that takes no args
|
||||
@@ -461,14 +532,12 @@ mod syntax_test {
|
||||
#[cfg(test)]
|
||||
mod functional_test {
|
||||
extern crate env_logger;
|
||||
use std::time::Duration;
|
||||
|
||||
fn test_timeout() -> Option<Duration> {
|
||||
Some(Duration::from_secs(5))
|
||||
}
|
||||
extern crate tempdir;
|
||||
use transport::unix::UnixTransport;
|
||||
|
||||
service! {
|
||||
rpc add(x: i32, y: i32) -> i32;
|
||||
rpc hey(name: String) -> String;
|
||||
}
|
||||
|
||||
struct Server;
|
||||
@@ -477,14 +546,18 @@ mod functional_test {
|
||||
fn add(&self, x: i32, y: i32) -> i32 {
|
||||
x + y
|
||||
}
|
||||
fn hey(&self, name: String) -> String {
|
||||
format!("Hey, {}.", name)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
|
||||
let client = Client::new(handle.local_addr(), None).unwrap();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client = Client::new(handle.dialer()).unwrap();
|
||||
assert_eq!(3, client.add(1, 2).unwrap());
|
||||
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
|
||||
drop(client);
|
||||
handle.shutdown();
|
||||
}
|
||||
@@ -492,17 +565,18 @@ mod functional_test {
|
||||
#[test]
|
||||
fn simple_async() {
|
||||
let _ = env_logger::init();
|
||||
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
|
||||
let client = AsyncClient::new(handle.local_addr(), None).unwrap();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client = AsyncClient::new(handle.dialer()).unwrap();
|
||||
assert_eq!(3, client.add(1, 2).get().unwrap());
|
||||
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
|
||||
drop(client);
|
||||
handle.shutdown();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_clone() {
|
||||
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
|
||||
let client1 = Client::new(handle.local_addr(), None).unwrap();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client1 = Client::new(handle.dialer()).unwrap();
|
||||
let client2 = client1.try_clone().unwrap();
|
||||
assert_eq!(3, client1.add(1, 2).unwrap());
|
||||
assert_eq!(3, client2.add(1, 2).unwrap());
|
||||
@@ -510,8 +584,20 @@ mod functional_test {
|
||||
|
||||
#[test]
|
||||
fn async_try_clone() {
|
||||
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
|
||||
let client1 = AsyncClient::new(handle.local_addr(), None).unwrap();
|
||||
let handle = Server.spawn("localhost:0").unwrap();
|
||||
let client1 = AsyncClient::new(handle.dialer()).unwrap();
|
||||
let client2 = client1.try_clone().unwrap();
|
||||
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
||||
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn async_try_clone_unix() {
|
||||
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
|
||||
let temp_file = temp_dir.path()
|
||||
.join("async_try_clone_unix.tmp");
|
||||
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
|
||||
let client1 = AsyncClient::new(handle.dialer()).unwrap();
|
||||
let client2 = client1.try_clone().unwrap();
|
||||
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
||||
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
||||
@@ -520,7 +606,13 @@ mod functional_test {
|
||||
// Tests that a server can be wrapped in an Arc; no need to run, just compile
|
||||
#[allow(dead_code)]
|
||||
fn serve_arc_server() {
|
||||
let _ = serve("localhost:0", ::std::sync::Arc::new(Server), None);
|
||||
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
|
||||
}
|
||||
|
||||
// Tests that a tcp client can be created from &str
|
||||
#[allow(dead_code)]
|
||||
fn test_client_str() {
|
||||
let _ = Client::new("localhost:0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -5,38 +5,49 @@
|
||||
|
||||
use serde;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufReader, BufWriter, Read};
|
||||
use std::io::{self, BufReader, BufWriter};
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::mpsc::{Receiver, Sender, channel};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::{Serialize, Deserialize, Error, Packet, Result};
|
||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||
use transport::{Dialer, Stream};
|
||||
|
||||
/// A client stub that connects to a server to run rpcs.
|
||||
pub struct Client<Request, Reply>
|
||||
where Request: serde::ser::Serialize
|
||||
pub struct Client<Request, Reply, S>
|
||||
where Request: serde::ser::Serialize,
|
||||
S: Stream
|
||||
{
|
||||
// The guard is in an option so it can be joined in the drop fn
|
||||
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
|
||||
outbound: Sender<(Request, Sender<Result<Reply>>)>,
|
||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||
shutdown: TcpStream,
|
||||
shutdown: S,
|
||||
}
|
||||
|
||||
impl<Request, Reply> Client<Request, Reply>
|
||||
impl<Request, Reply, S> Client<Request, Reply, S>
|
||||
where Request: serde::ser::Serialize + Send + 'static,
|
||||
Reply: serde::de::Deserialize + Send + 'static
|
||||
Reply: serde::de::Deserialize + Send + 'static,
|
||||
S: Stream
|
||||
{
|
||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||
/// for both reads and writes.
|
||||
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
|
||||
let stream = try!(TcpStream::connect(addr));
|
||||
try!(stream.set_read_timeout(timeout));
|
||||
try!(stream.set_write_timeout(timeout));
|
||||
pub fn new<D>(dialer: D) -> io::Result<Self>
|
||||
where D: Dialer<Stream = S>
|
||||
{
|
||||
Self::with_config(dialer, Config::default())
|
||||
}
|
||||
|
||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||
/// for both reads and writes.
|
||||
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
|
||||
where D: Dialer<Stream = S>
|
||||
{
|
||||
let stream = try!(dialer.dial());
|
||||
try!(stream.set_read_timeout(config.timeout));
|
||||
try!(stream.set_write_timeout(config.timeout));
|
||||
let reader_stream = try!(stream.try_clone());
|
||||
let writer_stream = try!(stream.try_clone());
|
||||
let requests = Arc::new(Mutex::new(RpcFutures::new()));
|
||||
@@ -54,7 +65,7 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
}
|
||||
|
||||
/// Clones the Client so that it can be shared across threads.
|
||||
pub fn try_clone(&self) -> io::Result<Client<Request, Reply>> {
|
||||
pub fn try_clone(&self) -> io::Result<Self> {
|
||||
Ok(Client {
|
||||
reader_guard: self.reader_guard.clone(),
|
||||
outbound: self.outbound.clone(),
|
||||
@@ -92,15 +103,17 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
}
|
||||
}
|
||||
|
||||
impl<Request, Reply> Drop for Client<Request, Reply>
|
||||
where Request: serde::ser::Serialize
|
||||
impl<Request, Reply, S> Drop for Client<Request, Reply, S>
|
||||
where Request: serde::ser::Serialize,
|
||||
S: Stream
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
debug!("Dropping Client.");
|
||||
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
|
||||
debug!("Attempting to shut down writer and reader threads.");
|
||||
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
|
||||
warn!("Client: couldn't shutdown writer and reader threads: {:?}", e);
|
||||
if let Err(e) = self.shutdown.shutdown() {
|
||||
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
|
||||
e);
|
||||
} else {
|
||||
// We only join if we know the TcpStream was shut down. Otherwise we might never
|
||||
// finish.
|
||||
@@ -118,14 +131,15 @@ impl<Request, Reply> Drop for Client<Request, Reply>
|
||||
/// An asynchronous RPC call
|
||||
pub struct Future<T> {
|
||||
rx: Receiver<Result<T>>,
|
||||
requests: Arc<Mutex<RpcFutures<T>>>
|
||||
requests: Arc<Mutex<RpcFutures<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Future<T> {
|
||||
/// Block until the result of the RPC call is available
|
||||
pub fn get(self) -> Result<T> {
|
||||
let requests = self.requests;
|
||||
self.rx.recv()
|
||||
self.rx
|
||||
.recv()
|
||||
.map_err(|_| requests.lock().expect(pos!()).get_error())
|
||||
.and_then(|reply| reply)
|
||||
}
|
||||
@@ -164,7 +178,8 @@ impl<Reply> RpcFutures<Reply> {
|
||||
info!("Reader: could not complete reply: {:?}", e);
|
||||
}
|
||||
} else {
|
||||
warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id);
|
||||
warn!("RpcFutures: expected sender for id {} but got None!",
|
||||
packet.rpc_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,11 +192,12 @@ impl<Reply> RpcFutures<Reply> {
|
||||
}
|
||||
}
|
||||
|
||||
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||
stream: TcpStream)
|
||||
fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||
stream: S)
|
||||
where Request: serde::Serialize,
|
||||
Reply: serde::Deserialize,
|
||||
S: Stream
|
||||
{
|
||||
let mut next_id = 0;
|
||||
let mut stream = BufWriter::new(stream);
|
||||
@@ -221,8 +237,8 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
{
|
||||
// Clone the err so we can log it if sending fails
|
||||
if let Err(e2) = tx.send(Err(e.clone())) {
|
||||
debug!("Error encountered while trying to send an error. \
|
||||
Initial error: {:?}; Send error: {:?}",
|
||||
debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \
|
||||
error: {:?}",
|
||||
e,
|
||||
e2);
|
||||
}
|
||||
@@ -230,8 +246,9 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
|
||||
}
|
||||
|
||||
fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
|
||||
where Reply: serde::Deserialize
|
||||
fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
|
||||
where Reply: serde::Deserialize,
|
||||
S: Stream
|
||||
{
|
||||
let mut stream = BufReader::new(stream);
|
||||
loop {
|
||||
|
||||
@@ -6,9 +6,11 @@
|
||||
use bincode::{self, SizeLimit};
|
||||
use bincode::serde::{deserialize_from, serialize_into};
|
||||
use serde;
|
||||
use serde::de::value::Error::EndOfStream;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::convert;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
mod client;
|
||||
mod server;
|
||||
@@ -16,7 +18,7 @@ mod packet;
|
||||
|
||||
pub use self::packet::Packet;
|
||||
pub use self::client::{Client, Future};
|
||||
pub use self::server::{Serve, ServeHandle, serve_async};
|
||||
pub use self::server::{Serve, ServeHandle};
|
||||
|
||||
/// Client errors that can occur during rpc calls
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -39,10 +41,14 @@ impl convert::From<bincode::serde::SerializeError> for Error {
|
||||
impl convert::From<bincode::serde::DeserializeError> for Error {
|
||||
fn from(err: bincode::serde::DeserializeError) -> Error {
|
||||
match err {
|
||||
bincode::serde::DeserializeError::IoError(ref err)
|
||||
if err.kind() == io::ErrorKind::ConnectionReset => Error::ConnectionBroken,
|
||||
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken,
|
||||
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)),
|
||||
bincode::serde::DeserializeError::Serde(EndOfStream) => Error::ConnectionBroken,
|
||||
bincode::serde::DeserializeError::IoError(err) => {
|
||||
match err.kind() {
|
||||
io::ErrorKind::ConnectionReset |
|
||||
io::ErrorKind::UnexpectedEof => Error::ConnectionBroken,
|
||||
_ => Error::Io(Arc::new(err)),
|
||||
}
|
||||
}
|
||||
err => panic!("Unexpected error during deserialization: {:?}", err),
|
||||
}
|
||||
}
|
||||
@@ -54,13 +60,19 @@ impl convert::From<io::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for client and server.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Config {
|
||||
/// Request/Response timeout between packet delivery.
|
||||
pub timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Return type of rpc calls: either the successful return value, or a client error.
|
||||
pub type Result<T> = ::std::result::Result<T, Error>;
|
||||
|
||||
trait Deserialize: Read + Sized {
|
||||
fn deserialize<T: serde::Deserialize>(&mut self) -> Result<T> {
|
||||
deserialize_from(self, SizeLimit::Infinite)
|
||||
.map_err(Error::from)
|
||||
deserialize_from(self, SizeLimit::Infinite).map_err(Error::from)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,8 +91,9 @@ impl<W: Write> Serialize for W {}
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
extern crate env_logger;
|
||||
use super::{Client, Serve, serve_async};
|
||||
use super::{Client, Config, Serve};
|
||||
use scoped_pool::Pool;
|
||||
use std::net::TcpStream;
|
||||
use std::sync::{Arc, Barrier, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
@@ -119,8 +132,8 @@ mod test {
|
||||
fn handle() {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
|
||||
let client: Client<(), u64> = Client::new(serve_handle.local_addr(), None).unwrap();
|
||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
|
||||
drop(client);
|
||||
serve_handle.shutdown();
|
||||
}
|
||||
@@ -129,10 +142,9 @@ mod test {
|
||||
fn simple() {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
|
||||
let client: Client<(), u64> = Client::new(addr, None).unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
assert_eq!(0, client.rpc(()).unwrap());
|
||||
assert_eq!(1, server.count());
|
||||
assert_eq!(1, client.rpc(()).unwrap());
|
||||
@@ -172,9 +184,10 @@ mod test {
|
||||
fn force_shutdown() {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = serve_async("localhost:0", server, Some(Duration::new(0, 10))).unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64> = Client::new(addr, None).unwrap();
|
||||
let serve_handle = server.spawn_with_config("localhost:0",
|
||||
Config { timeout: Some(Duration::new(0, 10)) })
|
||||
.unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
let thread = thread::spawn(move || serve_handle.shutdown());
|
||||
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
|
||||
thread.join().unwrap();
|
||||
@@ -184,9 +197,10 @@ mod test {
|
||||
fn client_failed_rpc() {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr, None).unwrap());
|
||||
let serve_handle =
|
||||
server.spawn_with_config("localhost:0", Config { timeout: test_timeout() })
|
||||
.unwrap();
|
||||
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
|
||||
client.rpc(()).unwrap();
|
||||
serve_handle.shutdown();
|
||||
match client.rpc(()) {
|
||||
@@ -202,13 +216,14 @@ mod test {
|
||||
let concurrency = 10;
|
||||
let pool = Pool::new(concurrency);
|
||||
let server = Arc::new(BarrierServer::new(concurrency));
|
||||
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64> = Client::new(addr, None).unwrap();
|
||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
pool.scoped(|scope| {
|
||||
for _ in 0..concurrency {
|
||||
let client = client.try_clone().unwrap();
|
||||
scope.execute(move || { client.rpc(()).unwrap(); });
|
||||
scope.execute(move || {
|
||||
client.rpc(()).unwrap();
|
||||
});
|
||||
}
|
||||
});
|
||||
assert_eq!(concurrency as u64, server.count());
|
||||
@@ -220,9 +235,8 @@ mod test {
|
||||
fn async() {
|
||||
let _ = env_logger::init();
|
||||
let server = Arc::new(Server::new());
|
||||
let serve_handle = serve_async("localhost:0", server.clone(), None).unwrap();
|
||||
let addr = serve_handle.local_addr().clone();
|
||||
let client: Client<(), u64> = Client::new(addr, None).unwrap();
|
||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||
|
||||
// Drop future immediately; does the reader channel panic when sending?
|
||||
client.rpc_async(());
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer, de, ser};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// Packet shared between client and server.
|
||||
@@ -19,44 +19,20 @@ impl<T: Serialize> Serialize for Packet<T> {
|
||||
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
|
||||
where S: Serializer
|
||||
{
|
||||
serializer.visit_struct(PACKET, MapVisitor {
|
||||
value: self,
|
||||
state: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct MapVisitor<'a, T: 'a> {
|
||||
value: &'a Packet<T>,
|
||||
state: u8,
|
||||
}
|
||||
|
||||
impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
|
||||
fn visit<S>(&mut self, serializer: &mut S) -> Result<Option<()>, S::Error>
|
||||
where S: Serializer
|
||||
{
|
||||
match self.state {
|
||||
0 => {
|
||||
self.state += 1;
|
||||
Ok(Some(try!(serializer.visit_struct_elt(RPC_ID, &self.value.rpc_id))))
|
||||
}
|
||||
1 => {
|
||||
self.state += 1;
|
||||
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
|
||||
}
|
||||
_ => {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
let mut state = try!(serializer.serialize_struct(PACKET, 2));
|
||||
try!(serializer.serialize_struct_elt(&mut state, RPC_ID, &self.rpc_id));
|
||||
try!(serializer.serialize_struct_elt(&mut state, MESSAGE, &self.message));
|
||||
serializer.serialize_struct_end(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Deserialize> Deserialize for Packet<T> {
|
||||
#[inline]
|
||||
fn deserialize<D>(deserializer: &mut D) -> Result<Self, D::Error>
|
||||
where D: Deserializer
|
||||
{
|
||||
const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE];
|
||||
deserializer.visit_struct(PACKET, FIELDS, Visitor(PhantomData))
|
||||
deserializer.deserialize_struct(PACKET, FIELDS, Visitor(PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +41,7 @@ struct Visitor<T>(PhantomData<T>);
|
||||
impl<T: Deserialize> de::Visitor for Visitor<T> {
|
||||
type Value = Packet<T>;
|
||||
|
||||
#[inline]
|
||||
fn visit_seq<V>(&mut self, mut visitor: V) -> Result<Packet<T>, V::Error>
|
||||
where V: de::SeqVisitor
|
||||
{
|
||||
@@ -91,7 +68,10 @@ fn serde() {
|
||||
use bincode;
|
||||
let _ = env_logger::init();
|
||||
|
||||
let packet = Packet { rpc_id: 1, message: () };
|
||||
let packet = Packet {
|
||||
rpc_id: 1,
|
||||
message: (),
|
||||
};
|
||||
let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap();
|
||||
let de = bincode::serde::deserialize(&ser);
|
||||
assert_eq!(packet, de.unwrap());
|
||||
|
||||
@@ -7,24 +7,27 @@ use serde;
|
||||
use scoped_pool::{Pool, Scope};
|
||||
use std::fmt;
|
||||
use std::io::{self, BufReader, BufWriter};
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
|
||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::thread::{self, JoinHandle};
|
||||
use super::{Deserialize, Error, Packet, Result, Serialize};
|
||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||
use transport::{Dialer, Listener, Stream, Transport};
|
||||
use transport::tcp::TcpDialer;
|
||||
|
||||
struct ConnectionHandler<'a, S>
|
||||
where S: Serve
|
||||
struct ConnectionHandler<'a, S, St>
|
||||
where S: Serve,
|
||||
St: Stream
|
||||
{
|
||||
read_stream: BufReader<TcpStream>,
|
||||
write_stream: BufWriter<TcpStream>,
|
||||
read_stream: BufReader<St>,
|
||||
write_stream: BufWriter<St>,
|
||||
server: S,
|
||||
shutdown: &'a AtomicBool,
|
||||
}
|
||||
|
||||
impl<'a, S> ConnectionHandler<'a, S>
|
||||
where S: Serve
|
||||
impl<'a, S, St> ConnectionHandler<'a, S, St>
|
||||
where S: Serve,
|
||||
St: Stream
|
||||
{
|
||||
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
|
||||
let ConnectionHandler {
|
||||
@@ -38,13 +41,13 @@ impl<'a, S> ConnectionHandler<'a, S>
|
||||
scope.execute(move || Self::write(rx, write_stream));
|
||||
loop {
|
||||
match read_stream.deserialize() {
|
||||
Ok(Packet { rpc_id, message, }) => {
|
||||
Ok(Packet { rpc_id, message }) => {
|
||||
let tx = tx.clone();
|
||||
scope.execute(move || {
|
||||
let reply = server.serve(message);
|
||||
let reply_packet = Packet {
|
||||
rpc_id: rpc_id,
|
||||
message: reply
|
||||
message: reply,
|
||||
};
|
||||
tx.send(reply_packet).expect(pos!());
|
||||
});
|
||||
@@ -55,8 +58,8 @@ impl<'a, S> ConnectionHandler<'a, S>
|
||||
}
|
||||
Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => {
|
||||
if !shutdown.load(Ordering::SeqCst) {
|
||||
info!("ConnectionHandler: read timed out ({:?}). Server not \
|
||||
shutdown, so retrying read.",
|
||||
info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \
|
||||
retrying read.",
|
||||
err);
|
||||
continue;
|
||||
} else {
|
||||
@@ -78,12 +81,13 @@ impl<'a, S> ConnectionHandler<'a, S>
|
||||
|
||||
fn timed_out(error_kind: io::ErrorKind) -> bool {
|
||||
match error_kind {
|
||||
io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => true,
|
||||
io::ErrorKind::TimedOut |
|
||||
io::ErrorKind::WouldBlock => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<TcpStream>) {
|
||||
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<St>) {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
Err(e) => {
|
||||
@@ -101,21 +105,25 @@ impl<'a, S> ConnectionHandler<'a, S>
|
||||
}
|
||||
|
||||
/// Provides methods for blocking until the server completes,
|
||||
pub struct ServeHandle {
|
||||
pub struct ServeHandle<D = TcpDialer>
|
||||
where D: Dialer
|
||||
{
|
||||
tx: Sender<()>,
|
||||
join_handle: JoinHandle<()>,
|
||||
addr: SocketAddr,
|
||||
dialer: D,
|
||||
}
|
||||
|
||||
impl ServeHandle {
|
||||
impl<D> ServeHandle<D>
|
||||
where D: Dialer
|
||||
{
|
||||
/// Block until the server completes
|
||||
pub fn wait(self) {
|
||||
self.join_handle.join().expect(pos!());
|
||||
}
|
||||
|
||||
/// Returns the address the server is bound to
|
||||
pub fn local_addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
/// Returns the dialer to the server.
|
||||
pub fn dialer(&self) -> &D {
|
||||
&self.dialer
|
||||
}
|
||||
|
||||
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
|
||||
@@ -123,7 +131,7 @@ impl ServeHandle {
|
||||
pub fn shutdown(self) {
|
||||
info!("ServeHandle: attempting to shut down the server.");
|
||||
self.tx.send(()).expect(pos!());
|
||||
if let Ok(_) = TcpStream::connect(self.addr) {
|
||||
if let Ok(_) = self.dialer.dial() {
|
||||
self.join_handle.join().expect(pos!());
|
||||
} else {
|
||||
warn!("ServeHandle: best effort shutdown of serve thread failed");
|
||||
@@ -131,18 +139,23 @@ impl ServeHandle {
|
||||
}
|
||||
}
|
||||
|
||||
struct Server<'a, S: 'a> {
|
||||
struct Server<'a, S: 'a, L>
|
||||
where L: Listener
|
||||
{
|
||||
server: &'a S,
|
||||
listener: TcpListener,
|
||||
listener: L,
|
||||
read_timeout: Option<Duration>,
|
||||
die_rx: Receiver<()>,
|
||||
shutdown: &'a AtomicBool,
|
||||
}
|
||||
|
||||
impl<'a, S: 'a> Server<'a, S>
|
||||
where S: Serve + 'static
|
||||
impl<'a, S, L> Server<'a, S, L>
|
||||
where S: Serve + 'static,
|
||||
L: Listener
|
||||
{
|
||||
fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b {
|
||||
fn serve<'b>(self, scope: &Scope<'b>)
|
||||
where 'a: 'b
|
||||
{
|
||||
for conn in self.listener.incoming() {
|
||||
match self.die_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
@@ -169,7 +182,7 @@ impl<'a, S: 'a> Server<'a, S>
|
||||
let read_conn = match conn.try_clone() {
|
||||
Err(err) => {
|
||||
error!("serve: could not clone tcp stream; possibly out of file descriptors? \
|
||||
Err: {:?}",
|
||||
Err: {:?}",
|
||||
err);
|
||||
continue;
|
||||
}
|
||||
@@ -192,48 +205,17 @@ impl<'a, S: 'a> Server<'a, S>
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S> Drop for Server<'a, S> {
|
||||
impl<'a, S, L> Drop for Server<'a, S, L>
|
||||
where L: Listener
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
debug!("Shutting down connection handlers.");
|
||||
self.shutdown.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Start
|
||||
pub fn serve_async<A, S>(addr: A,
|
||||
server: S,
|
||||
read_timeout: Option<Duration>)
|
||||
-> io::Result<ServeHandle>
|
||||
where A: ToSocketAddrs,
|
||||
S: 'static + Serve
|
||||
{
|
||||
let listener = try!(TcpListener::bind(&addr));
|
||||
let addr = try!(listener.local_addr());
|
||||
info!("serve_async: spinning up server on {:?}", addr);
|
||||
let (die_tx, die_rx) = channel();
|
||||
let join_handle = thread::spawn(move || {
|
||||
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
|
||||
let shutdown = AtomicBool::new(false);
|
||||
let server = Server {
|
||||
server: &server,
|
||||
listener: listener,
|
||||
read_timeout: read_timeout,
|
||||
die_rx: die_rx,
|
||||
shutdown: &shutdown,
|
||||
};
|
||||
pool.scoped(|scope| {
|
||||
server.serve(scope);
|
||||
});
|
||||
});
|
||||
Ok(ServeHandle {
|
||||
tx: die_tx,
|
||||
join_handle: join_handle,
|
||||
addr: addr.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// A service provided by a server
|
||||
pub trait Serve: Send + Sync {
|
||||
pub trait Serve: Send + Sync + Sized {
|
||||
/// The type of request received by the server
|
||||
type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
|
||||
/// The type of reply sent by the server
|
||||
@@ -241,10 +223,52 @@ pub trait Serve: Send + Sync {
|
||||
|
||||
/// Return a reply for a given request
|
||||
fn serve(&self, request: Self::Request) -> Self::Reply;
|
||||
|
||||
/// spawn
|
||||
fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||
where T: Transport,
|
||||
Self: 'static
|
||||
{
|
||||
self.spawn_with_config(transport, Config::default())
|
||||
}
|
||||
|
||||
/// spawn
|
||||
fn spawn_with_config<T>(self,
|
||||
transport: T,
|
||||
config: Config)
|
||||
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||
where T: Transport,
|
||||
Self: 'static
|
||||
{
|
||||
let listener = try!(transport.bind());
|
||||
let dialer = try!(listener.dialer());
|
||||
info!("spawn_with_config: spinning up server.");
|
||||
let (die_tx, die_rx) = channel();
|
||||
let timeout = config.timeout;
|
||||
let join_handle = thread::spawn(move || {
|
||||
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
|
||||
let shutdown = AtomicBool::new(false);
|
||||
let server = Server {
|
||||
server: &self,
|
||||
listener: listener,
|
||||
read_timeout: timeout,
|
||||
die_rx: die_rx,
|
||||
shutdown: &shutdown,
|
||||
};
|
||||
pool.scoped(|scope| {
|
||||
server.serve(scope);
|
||||
});
|
||||
});
|
||||
Ok(ServeHandle {
|
||||
tx: die_tx,
|
||||
join_handle: join_handle,
|
||||
dialer: dialer,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<P, S> Serve for P
|
||||
where P: Send + Sync + ::std::ops::Deref<Target=S>,
|
||||
where P: Send + Sync + ::std::ops::Deref<Target = S>,
|
||||
S: Serve
|
||||
{
|
||||
type Request = S::Request;
|
||||
|
||||
91
tarpc/src/transport/mod.rs
Normal file
91
tarpc/src/transport/mod.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use std::io::{self, Read, Write};
|
||||
use std::time::Duration;
|
||||
|
||||
/// A factory for creating a listener on a given address.
|
||||
/// For TCP, an address might be an IPv4 address; for Unix sockets, it
|
||||
/// is just a file name.
|
||||
pub trait Transport {
|
||||
/// The type of listener that binds to the given address.
|
||||
type Listener: Listener;
|
||||
/// Return a listener on the given address, and a dialer to that address.
|
||||
fn bind(&self) -> io::Result<Self::Listener>;
|
||||
}
|
||||
|
||||
/// Accepts incoming connections from dialers.
|
||||
pub trait Listener: Send + 'static {
|
||||
/// The type of address being listened on.
|
||||
type Dialer: Dialer;
|
||||
/// The type of stream this listener accepts.
|
||||
type Stream: Stream;
|
||||
/// Accept an incoming stream.
|
||||
fn accept(&self) -> io::Result<Self::Stream>;
|
||||
/// Returns the local address being listened on.
|
||||
fn dialer(&self) -> io::Result<Self::Dialer>;
|
||||
/// Iterate over incoming connections.
|
||||
fn incoming(&self) -> Incoming<Self> {
|
||||
Incoming { listener: self }
|
||||
}
|
||||
}
|
||||
|
||||
/// A cloneable Reader/Writer.
|
||||
pub trait Stream: Read + Write + Send + Sized + 'static {
|
||||
/// Creates a new independently owned handle to the Stream.
|
||||
///
|
||||
/// The returned Stream should reference the same stream that this
|
||||
/// object references. Both handles should read and write the same
|
||||
/// stream of data, and options set on one stream should be propagated
|
||||
/// to the other stream.
|
||||
fn try_clone(&self) -> io::Result<Self>;
|
||||
/// Sets a read timeout.
|
||||
///
|
||||
/// If the value specified is `None`, then read calls will block indefinitely.
|
||||
/// It is an error to pass the zero `Duration` to this method.
|
||||
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
|
||||
/// Sets a write timeout.
|
||||
///
|
||||
/// If the value specified is `None`, then write calls will block indefinitely.
|
||||
/// It is an error to pass the zero `Duration` to this method.
|
||||
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
|
||||
/// Shuts down both ends of the stream.
|
||||
///
|
||||
/// Implementations should cause all pending and future I/O on the specified
|
||||
/// portions to return immediately with an appropriate value.
|
||||
fn shutdown(&self) -> io::Result<()>;
|
||||
}
|
||||
|
||||
/// A `Stream` factory.
|
||||
pub trait Dialer {
|
||||
/// The type of `Stream` this can create.
|
||||
type Stream: Stream;
|
||||
/// Open a stream.
|
||||
fn dial(&self) -> io::Result<Self::Stream>;
|
||||
}
|
||||
|
||||
impl<P, D: ?Sized> Dialer for P
|
||||
where P: ::std::ops::Deref<Target = D>,
|
||||
D: Dialer + 'static
|
||||
{
|
||||
type Stream = D::Stream;
|
||||
|
||||
fn dial(&self) -> io::Result<Self::Stream> {
|
||||
(**self).dial()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over incoming connections.
|
||||
pub struct Incoming<'a, L: Listener + ?Sized + 'a> {
|
||||
listener: &'a L,
|
||||
}
|
||||
|
||||
impl<'a, L: Listener> Iterator for Incoming<'a, L> {
|
||||
type Item = io::Result<L::Stream>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
Some(self.listener.accept())
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a TCP transport.
|
||||
pub mod tcp;
|
||||
/// Provides a unix socket transport.
|
||||
pub mod unix;
|
||||
77
tarpc/src/transport/tcp.rs
Normal file
77
tarpc/src/transport/tcp.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use std::io;
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
|
||||
use std::time::Duration;
|
||||
|
||||
/// A transport for TCP.
|
||||
#[derive(Debug)]
|
||||
pub struct TcpTransport<A: ToSocketAddrs>(pub A);
|
||||
|
||||
impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
|
||||
type Listener = TcpListener;
|
||||
|
||||
fn bind(&self) -> io::Result<TcpListener> {
|
||||
TcpListener::bind(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: ToSocketAddrs> super::Transport for A {
|
||||
type Listener = TcpListener;
|
||||
|
||||
fn bind(&self) -> io::Result<TcpListener> {
|
||||
TcpListener::bind(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Listener for TcpListener {
|
||||
type Dialer = TcpDialer<SocketAddr>;
|
||||
|
||||
type Stream = TcpStream;
|
||||
|
||||
fn accept(&self) -> io::Result<TcpStream> {
|
||||
self.accept().map(|(stream, _)| stream)
|
||||
}
|
||||
|
||||
fn dialer(&self) -> io::Result<TcpDialer<SocketAddr>> {
|
||||
self.local_addr().map(|addr| TcpDialer(addr))
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Stream for TcpStream {
|
||||
fn try_clone(&self) -> io::Result<Self> {
|
||||
self.try_clone()
|
||||
}
|
||||
|
||||
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
|
||||
self.set_read_timeout(dur)
|
||||
}
|
||||
|
||||
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
|
||||
self.set_write_timeout(dur)
|
||||
}
|
||||
|
||||
fn shutdown(&self) -> io::Result<()> {
|
||||
self.shutdown(::std::net::Shutdown::Both)
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to a socket address.
|
||||
#[derive(Debug)]
|
||||
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
|
||||
|
||||
impl<A> super::Dialer for TcpDialer<A>
|
||||
where A: ToSocketAddrs
|
||||
{
|
||||
type Stream = TcpStream;
|
||||
|
||||
fn dial(&self) -> io::Result<TcpStream> {
|
||||
TcpStream::connect(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Dialer for str {
|
||||
type Stream = TcpStream;
|
||||
|
||||
fn dial(&self) -> io::Result<TcpStream> {
|
||||
TcpStream::connect(self)
|
||||
}
|
||||
}
|
||||
72
tarpc/src/transport/unix.rs
Normal file
72
tarpc/src/transport/unix.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
use unix_socket::{UnixListener, UnixStream};
|
||||
|
||||
/// A transport for unix sockets.
|
||||
#[derive(Debug)]
|
||||
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
|
||||
|
||||
impl<P> super::Transport for UnixTransport<P>
|
||||
where P: AsRef<Path>
|
||||
{
|
||||
type Listener = UnixListener;
|
||||
|
||||
fn bind(&self) -> io::Result<UnixListener> {
|
||||
UnixListener::bind(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to a unix socket address.
|
||||
#[derive(Debug)]
|
||||
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
|
||||
|
||||
impl<P> super::Dialer for UnixDialer<P>
|
||||
where P: AsRef<Path>
|
||||
{
|
||||
type Stream = UnixStream;
|
||||
|
||||
fn dial(&self) -> io::Result<UnixStream> {
|
||||
UnixStream::connect(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Listener for UnixListener {
|
||||
type Stream = UnixStream;
|
||||
|
||||
type Dialer = UnixDialer<PathBuf>;
|
||||
|
||||
fn accept(&self) -> io::Result<UnixStream> {
|
||||
self.accept().map(|(stream, _)| stream)
|
||||
}
|
||||
|
||||
fn dialer(&self) -> io::Result<UnixDialer<PathBuf>> {
|
||||
self.local_addr().and_then(|addr| {
|
||||
match addr.as_pathname() {
|
||||
Some(path) => Ok(UnixDialer(path.to_owned())),
|
||||
None => {
|
||||
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
|
||||
"Couldn't get a path to bound unix socket"))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Stream for UnixStream {
|
||||
fn try_clone(&self) -> io::Result<Self> {
|
||||
self.try_clone()
|
||||
}
|
||||
|
||||
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
self.set_read_timeout(timeout)
|
||||
}
|
||||
|
||||
fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
self.set_write_timeout(timeout)
|
||||
}
|
||||
|
||||
fn shutdown(&self) -> io::Result<()> {
|
||||
self.shutdown(::std::net::Shutdown::Both)
|
||||
}
|
||||
}
|
||||
@@ -5,5 +5,5 @@ authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.k
|
||||
|
||||
[dev-dependencies]
|
||||
tarpc = { path = "../tarpc" }
|
||||
lazy_static = "^0.1.15"
|
||||
env_logger = "^0.3.2"
|
||||
lazy_static = "0.2"
|
||||
env_logger = "0.3"
|
||||
|
||||
@@ -37,12 +37,13 @@ mod benchmark {
|
||||
// Prevents resource exhaustion when benching
|
||||
lazy_static! {
|
||||
static ref HANDLE: Arc<Mutex<ServeHandle>> = {
|
||||
let handle = serve("localhost:0", HelloServer, None).unwrap();
|
||||
let handle = HelloServer.spawn("localhost:0").unwrap();
|
||||
Arc::new(Mutex::new(handle))
|
||||
};
|
||||
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
|
||||
let addr = HANDLE.lock().unwrap().local_addr().clone();
|
||||
let client = AsyncClient::new(addr, None).unwrap();
|
||||
let lock = HANDLE.lock().unwrap();
|
||||
let dialer = lock.dialer();
|
||||
let client = AsyncClient::new(dialer).unwrap();
|
||||
Arc::new(Mutex::new(client))
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user