119 Commits

Author SHA1 Message Date
Tim Kuehn
3cf8e440f7 Bump minor version. 2016-08-07 13:02:04 -07:00
Tim Kuehn
2e02f33fc4 Merge branch 'master' of github.com:google/tarpc 2016-07-28 20:49:12 -07:00
Tim Kuehn
d8472dcd1c Update README to reflect latest version. 2016-07-28 20:48:07 -07:00
Tim
2c5846621f Update dependency versions (#43)
* Update dependency versions

* Bump minor version.
2016-07-28 20:46:30 -07:00
Tim Kuehn
6a6157948a Bump minor version. 2016-07-28 20:37:51 -07:00
Tim Kuehn
1c18a3c4fe Update dependency versions 2016-07-28 20:25:46 -07:00
shaladdle
e8ec295e85 Merge pull request #35 from tikue/missing-debug
Add missing Debug impls.
2016-04-24 21:19:42 -07:00
Tim Kuehn
44eec09418 Add missing Debug impls. 2016-04-24 21:06:42 -07:00
shaladdle
fe116a1b6b Merge pull request #34 from tikue/macro-cleanup
Minor macro implementation cleanup.
2016-04-24 20:36:08 -07:00
Tim Kuehn
ec4fa8636b Minor macro implementation cleanup.
* Fold service into service_inner.
* Rename service_inner => service.
2016-04-24 20:02:46 -07:00
shaladdle
2d58340d16 Merge pull request #33 from tikue/bump-version
Bump version to v0.5.0
2016-04-24 19:37:04 -07:00
Tim Kuehn
801f09e9e6 Bump version to v0.5.0 2016-04-24 19:18:42 -07:00
shaladdle
6ce3a3d943 Merge pull request #27 from tikue/listener
[Breaking] Add support for arbitrary transports.
2016-04-24 18:39:27 -07:00
Tim Kuehn
4d636d2882 Merge master into listener. 2016-04-24 18:28:30 -07:00
shaladdle
3693c95a67 Merge pull request #32 from tikue/update-releases
Updates to docs and pre-push
2016-04-24 17:57:28 -07:00
Tim
43a2df4a13 Make version of serde explicit in release notes 2016-04-24 17:56:41 -07:00
Tim Kuehn
166f1523d6 Update version in README 2016-04-24 17:52:08 -07:00
Tim Kuehn
1cc8cbcdc3 Update pre-push to use rustup in lieu of multirust, because rustup is #thefuture. 2016-04-24 17:48:29 -07:00
Tim Kuehn
9dafc704e9 Update RELEASES.md 2016-04-24 17:19:53 -07:00
shaladdle
32e0b0d7f8 Bump version to 0.4.0 2016-04-02 16:56:20 -07:00
shaladdle
b87c52758e Merge pull request #30 from tikue/update-serde
Update serde to 0.7
2016-04-02 15:57:18 -07:00
Tim Kuehn
9235e12904 Handle Serde(EndOfStream) error as ConnectionBroken 2016-04-02 15:33:56 -07:00
Tim Kuehn
265fe56fa6 Merge update-serde into master 2016-04-02 15:23:37 -07:00
Tim Kuehn
7b5b29a9c3 Update to serde 0.7 2016-04-02 15:18:24 -07:00
Tim Kuehn
709f4ab1ac Add spaces between items in impls. 2016-03-16 21:46:14 -07:00
Tim Kuehn
bbfb4325d2 Simplify readme example 2016-03-16 20:49:38 -07:00
Tim Kuehn
f33cb3bd53 Add a line between impl and struct 2016-03-16 20:46:23 -07:00
Tim Kuehn
6a6832cfbc Generify doc comment 2016-03-16 20:45:55 -07:00
Tim Kuehn
b0495ebc00 Cargo fmt 2016-03-16 20:43:36 -07:00
Tim Kuehn
aec1574824 Add a line in between struct and impl 2016-03-16 20:43:22 -07:00
Tim Kuehn
5d27d34bd3 Add a documentation note on addresses 2016-03-16 20:36:54 -07:00
shaladdle
fe978f2c56 Merge pull request #29 from tikue/rm-lazy-static
Remove unused dep
2016-03-02 21:36:24 -08:00
Tim Kuehn
44f472c65c Remove unused dep 2016-02-27 22:32:53 -08:00
Tim Kuehn
e995acd4c9 Merge branch 'master' into listener 2016-02-27 14:11:16 -08:00
Tim Kuehn
e8fcf0e4de Fix issue with grep exit status 2016-02-27 02:30:39 -08:00
Tim Kuehn
9dcd38c012 Merge branch 'master' into listener 2016-02-25 23:30:22 -08:00
Tim Kuehn
5ac4b710a5 Simplify lib.rs example 2016-02-25 23:30:00 -08:00
shaladdle
2eb0b2cc83 Merge pull request #28 from tikue/fix-pre-commit
Fix formatting pre-commit check
2016-02-25 23:24:18 -08:00
Tim Kuehn
72a9f8f70d Update deps versions. 2016-02-25 22:49:22 -08:00
Tim Kuehn
8e5a44b423 Update README to list arbitrary transports as a feature. 2016-02-25 22:28:39 -08:00
Tim Kuehn
714541a7a4 Don't unwrap in Listener::dialer 2016-02-25 01:05:38 -08:00
Tim Kuehn
a1f529f794 Reformat some generic bounds 2016-02-25 00:58:48 -08:00
Tim Kuehn
a8766a9200 Use rustfmt --write-mode=diff in lieu of hashes 2016-02-25 00:50:46 -08:00
Tim Kuehn
ef96c87226 Skip children when rustfmting in pre-commit 2016-02-25 00:50:37 -08:00
Tim Kuehn
3543b34f2b Fix formatting check.
* shasum suffixes the checksum with '- filename' so pipe in the text instead.
* rustfmt prefixes the formatting with 'Using rustfmt config file filename' so pipe in the text instead.
2016-02-25 00:50:29 -08:00
Tim Kuehn
6273ebefa7 rustfmt 2016-02-25 00:04:35 -08:00
Tim Kuehn
9827f75459 Fix examples 2016-02-24 23:33:03 -08:00
Tim Kuehn
c398e2389b Why were we wrapping the service in an arc? 2016-02-24 23:25:50 -08:00
Tim Kuehn
03dc512e25 Remove Addr associated type of Dialer.
Also, make spawn() take a Dialer, but impl Dialer for str, defaulting to TCP transport.
2016-02-24 21:59:21 -08:00
Tim Kuehn
8307c708a3 Better documentation for Stream.
Basically copied from TcpStream verbatim.
2016-02-24 20:32:15 -08:00
Tim Kuehn
774411c636 Create temp file using tempdir in test 2016-02-24 20:26:49 -08:00
Tim Kuehn
d5b2f23f74 Move generic bounds to where clause 2016-02-23 08:26:56 -08:00
Tim Kuehn
396aec3c2f Add a test 2016-02-23 01:53:20 -08:00
Tim Kuehn
28c6c333e5 Reorgnize modules 2016-02-23 01:13:11 -08:00
Tim Kuehn
2d1a77ec10 Merge branch 'master' of github.com:google/tarpc into listener 2016-02-23 00:09:53 -08:00
Tim Kuehn
a0e6147482 Make Tcp* default types 2016-02-23 00:07:03 -08:00
Tim Kuehn
fcdb0d9375 Add support for arbitrary transports.
Quite a bit of machinery added:
 * Listener trait
 * Dialer trait
 * Stream trait
 * Transport trait
2016-02-22 23:50:34 -08:00
Tim
4c1d15f8ea Merge pull request #26 from Bowbaq/master
Fix your typo
2016-02-20 18:50:25 -08:00
Maxime Bury
ece1cc60b9 Fix your typo 2016-02-20 18:43:31 -08:00
shaladdle
7d8a508379 Merge pull request #25 from tikue/fix-readme
Fix the README example. It broke again. :(
2016-02-20 01:33:29 -08:00
Tim Kuehn
9193357d60 Fix the README example. It broke again. :( 2016-02-20 01:29:22 -08:00
shaladdle
b777e0bbf7 Merge pull request #24 from tikue/release-notes
Add release notes
2016-02-20 01:15:49 -08:00
Tim Kuehn
04624f054d Forgot the file 2016-02-20 01:11:02 -08:00
Tim Kuehn
f870f832a9 Rename RELEASE_NOTES.md => RELEASES.md to conform to rust-lang/rust. 2016-02-20 01:07:51 -08:00
Tim Kuehn
dc347021d4 Rework a line 2016-02-20 01:05:43 -08:00
Tim Kuehn
5973e54f62 Fix markdown 2016-02-20 01:04:40 -08:00
Tim Kuehn
e5e5c5975c Add release notes 2016-02-20 01:02:13 -08:00
shaladdle
6bb3691a30 Merge pull request #23 from tikue/deps
Update deps versions
2016-02-20 00:53:27 -08:00
Tim Kuehn
e2f1511fb3 Update deps versions 2016-02-19 23:53:27 -08:00
Tim Kuehn
99ba380825 Update readme version 2016-02-19 23:45:01 -08:00
Tim
39235343d6 Merge pull request #22 from shaladdle/allowunused
Allow most things to be unused
2016-02-19 23:40:42 -08:00
Adam Wright
f3afd080f3 Allow most things to be unused 2016-02-19 23:28:34 -08:00
Tim
043d0a1c21 Merge pull request #21 from shaladdle/bashpls
Merge shaladdle/bashpls into master.
2016-02-19 23:23:20 -08:00
shaladdle
be4caeebe8 Merge pull request #20 from tikue/config
Merge tikue/config into master.
2016-02-19 23:16:15 -08:00
Adam Wright
06a2cab31c Move /bin/bash to the top so it works 2016-02-19 23:08:50 -08:00
Tim Kuehn
934c51f4ab Bunch version number 2016-02-19 22:57:47 -08:00
Tim Kuehn
cc8a8e76b0 Remove a bunch of derived traits from Config for backwards compatibility reasons. 2016-02-19 22:51:18 -08:00
Tim Kuehn
b9ba10b8a4 Fix tarpc_examples 2016-02-19 22:42:36 -08:00
Tim Kuehn
1ee1f9274a Fix some log statements 2016-02-19 22:33:13 -08:00
Tim Kuehn
7f354be850 Merge branch 'master' of github.com:google/tarpc into config 2016-02-19 22:32:16 -08:00
Tim Kuehn
c9a63c2a5a Don't derive Hash for Config because Duration is only Hashable on nightly 2016-02-19 22:06:29 -08:00
Tim
ee1143c709 Merge pull request #14 from shaladdle/hooks
Merge shaladdle/hooks into master.
2016-02-19 21:46:00 -08:00
Adam Wright
4ed127b39e NOT FOUND instead of FAILED 2016-02-19 21:31:11 -08:00
Tim Kuehn
66cd136c6a Add a note about serde in the readme 2016-02-18 21:27:22 -08:00
Tim Kuehn
58cbe6f4ea Update README and service macro doc comment 2016-02-18 21:23:42 -08:00
Tim Kuehn
250a7fd7b9 Move 'static bound from trait to spawn fns 2016-02-18 20:48:25 -08:00
Tim Kuehn
a44fd808d9 Merge branch 'master' of github.com:google/tarpc into config 2016-02-18 20:42:33 -08:00
Tim Kuehn
65c4d83c88 [breaking] Renamings and refactorings.
* Add a Config struct. `Client::new(addr, duration)` => `Client::new(addr, config)`
* `serve(addr, server, config)` => `server.spawn_with_config(addr, config)`
* Also added `server.spawn(addr)`
2016-02-18 20:40:08 -08:00
shaladdle
00692fe9a3 Merge pull request #19 from tikue/readme
Make a note of AsyncClient and `cargo doc` in the readme
2016-02-18 15:35:56 -08:00
Tim Kuehn
0968760ef7 Make a note of AsyncClient and cargo doc in the readme 2016-02-18 15:28:41 -08:00
shaladdle
75b2c00b54 Merge pull request #17 from tikue/readme
Define RPC in readme
2016-02-18 14:59:43 -08:00
Tim Kuehn
ffee124526 List dependency info in readme 2016-02-18 08:49:20 -08:00
Tim Kuehn
06a03697c4 Merge branch 'master' of github.com:google/tarpc into readme 2016-02-18 08:43:35 -08:00
Tim Kuehn
a675551a31 Define RPC in README 2016-02-18 08:43:31 -08:00
Adam Wright
d0e9693263 Be consistent with function declarations 2016-02-18 01:40:44 -08:00
Adam Wright
6d23174219 Build on nightly too, as per review comment 2016-02-18 01:36:39 -08:00
Adam Wright
a06b583334 Also check existence of shasum 2016-02-18 01:27:07 -08:00
shaladdle
937e9c2c43 Merge pull request #15 from tikue/readme
Make a line in the readme a bit cleaner
2016-02-18 01:24:11 -08:00
Tim Kuehn
54883d6354 Make a line in the readme a bit cleaner 2016-02-18 01:20:20 -08:00
Tim
86b1470832 Merge pull request #13 from gsquire/readme-fix
Fix README code example
2016-02-18 01:17:52 -08:00
Adam Wright
82762583be Change colors a bit, only exit when check_toolchain fails.. 2016-02-18 01:06:31 -08:00
Adam Wright
3462451256 Check for multirust, rustfmt, clean up some things 2016-02-18 01:06:31 -08:00
Adam Wright
17d800b8a8 Update comments 2016-02-18 01:06:31 -08:00
Adam Wright
403eba201b Group builds/tests by toolchain for speed 2016-02-18 01:06:31 -08:00
Adam Wright
f2328d200e pre-commit doesn't modify files, just checks foramtting 2016-02-18 01:06:31 -08:00
Garrett Squire
51e6bac2dc fix the example in the README 2016-02-17 23:25:33 -08:00
shaladdle
f3fcbbb8d2 Merge pull request #12 from tikue/second-test
Add a second rpc to test service.
2016-02-17 18:08:23 -08:00
Tim Kuehn
05acb97f04 Add a second rpc to test service.
To better exercise the serialization parts -- namely, the enum tags.
2016-02-16 22:17:06 -08:00
Adam Wright
07c052a1c1 Individually format crates, use -q 2016-02-15 18:54:15 -08:00
Adam Wright
34cf0c8172 Updated hook to print messages like prepush 2016-02-15 18:30:26 -08:00
Adam Wright
7b196400b8 Revamp push hook
1. Create environment variables for options
3. Use multirust to test different toolchains. If multirust isn't
   installed, use current toolchain.
2016-02-15 17:28:16 -08:00
Adam Wright
1f30bb9ba6 Remove the install hooks script 2016-02-15 13:58:42 -08:00
Adam Wright
e2756edd72 Test all crates and format on pre-commit 2016-02-15 13:58:04 -08:00
shaladdle
8957d2dac3 Merge pull request #11 from tikue/rustfmt
rustfmt the things
2016-02-15 13:36:16 -08:00
shaladdle
21e5734ef7 Merge pull request #10 from tikue/inlines
Add some missing #[inline]s.
2016-02-15 13:33:11 -08:00
Tim Kuehn
dddeca19a1 .travis.yml: don't set unstable feature when building on nightly 2016-02-15 12:29:13 -08:00
Tim Kuehn
a9b86280b5 Add coveralls deps 2016-02-15 12:17:21 -08:00
Tim Kuehn
7dae99d7b5 rustfmt the things 2016-02-15 02:34:20 -08:00
Tim Kuehn
9dd3d55744 Add some missing #[inline]s.
And add the optional fn len method to Packet's MapVisitor. Also add a link to documentation in the readme.
2016-02-15 02:20:27 -08:00
19 changed files with 904 additions and 319 deletions

View File

@@ -6,6 +6,13 @@ rust:
- beta
- nightly
addons:
apt:
packages:
- libcurl4-openssl-dev
- libelf-dev
- libdw-dev
before_script:
- |
pip install 'travis-cargo<0.2' --user &&
@@ -13,8 +20,13 @@ before_script:
script:
- |
(cd tarpc && cargo build) &&
(cd tarpc && cargo test)
(cd tarpc && travis-cargo build) &&
(cd tarpc && travis-cargo test)
after_success:
- (cd tarpc && travis-cargo coveralls --no-sudo)
env:
global:
# override the default `--features unstable` used for the nightly branch
- TRAVIS_CARGO_NIGHTLY_FEATURE=""

View File

@@ -6,8 +6,27 @@
*Disclaimer*: This is not an official Google product.
tarpc is an RPC framework for rust with a focus on ease of use. Defining and implementing an echo-like server can be done in just a few lines of code:
tarpc is an RPC framework for rust with a focus on ease of use. Defining a service can be done in
just a few lines of code, and most of the boilerplate of writing a server is taken care of for you.
[Documentation](https://google.github.io/tarpc)
## What is an RPC framework?
"RPC" stands for "Remote Procedure Call," a function call where the work of producing the return
value is being done somewhere else. When an rpc function is invoked, behind the scenes the function
contacts some other process somewhere and asks them to compute the function instead. The original
function then returns the value produced by that other server.
[More information](https://www.cs.cf.ac.uk/Dave/C/node33.html)
## Usage
Add to your `Cargo.toml` dependencies:
```toml
tarpc = "0.6"
```
## Example
```rust
#[macro_use]
extern crate tarpc;
@@ -17,18 +36,20 @@ mod hello_service {
rpc hello(name: String) -> String;
}
}
use hello_service::Service as HelloService;
struct HelloService;
impl hello_service::Service for HelloService {
struct HelloServer;
impl HelloService for HelloServer {
fn hello(&self, name: String) -> String {
format!("Hello, {}!", s)
format!("Hello, {}!", name)
}
}
fn main() {
let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap();
let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap();
assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap());
let addr = "localhost:10000";
let server_handle = HelloServer.spawn(addr).unwrap();
let client = hello_service::Client::new(addr).unwrap();
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
drop(client);
server_handle.shutdown();
}
@@ -36,14 +57,20 @@ fn main() {
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
above example, the macro is called within the `hello_service` module. This module will contain a
`Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server
listening on a tcp port. A `Client` can connect to such a service. Any type implementing the
`Service` trait can be passed to `serve`. These generated types are specific to the echo service,
and make it easy and ergonomic to write servers without dealing with sockets or serialization
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening over an
arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated
types make it easy and ergonomic to write servers without dealing with sockets or serialization
directly. See the tarpc_examples package for more sophisticated examples.
## Documentation
Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation.
## Additional Features
- Connect over any transport that `impl`s the `Transport` trait.
- Concurrent requests from a single client.
- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
methods as well as on the `Client`'s stub methods.
- Just like regular fns, the return type can be left off when it's `-> ()`.

37
RELEASES.md Normal file
View File

@@ -0,0 +1,37 @@
## 0.6 (20216-08-07)
### Breaking Changes
* Updated serde to 0.8. Requires dependents to update as well.
## 0.5 (2016-04-24)
### Breaking Changes
0.5 adds support for arbitrary transports via the
[`Transport`](tarpc/src/transport/mod.rs#L7) trait.
Out of the box tarpc provides implementations for:
* Tcp, for types `impl`ing `ToSocketAddrs`.
* Unix sockets via the `UnixTransport` type.
This was a breaking change: `handler.local_addr()` was renamed
`handler.dialer()`.
## 0.4 (2016-04-02)
### Breaking Changes
* Updated to the latest version of serde, 0.7.0. Because tarpc exposes serde in
its API, this forces downstream code to update to the latest version of
serde, as well.
## 0.3 (2016-02-20)
### Breaking Changes
* The timeout arg to `serve` was replaced with a `Config` struct, which
currently only contains one field, but will be expanded in the future
to allow configuring serialization protocol, and other things.
* `serve` was changed to be a default method on the generated `Service` traits,
and it was renamed `spawn_with_config`. A second `default fn` was added:
`spawn`, which takes no `Config` arg.
### Other Changes
* Expanded items will no longer generate unused warnings.

View File

@@ -1,18 +1,41 @@
#!/bin/bash
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
#
# An example hook script to verify what is about to be committed.
# Called by "git commit" with no arguments. The hook should
# exit with non-zero status after issuing an appropriate message if
# it wants to stop the commit.
# Pre-commit hook for the tarpc repository. To use this hook, copy it to .git/hooks in your
# repository root.
#
# To enable this hook, rename this file to "pre-commit".
# This precommit checks the following:
# 1. All filenames are ascii
# 2. There is no bad whitespace
# 3. rustfmt is installed
# 4. rustfmt is a noop on files that are in the index
#
# Options:
#
# - TARPC_SKIP_RUSTFMT, default = 0
#
# Set this to 1 to skip running rustfmt
#
# Note that these options are most useful for testing the hooks themselves. Use git commit
# --no-verify to skip the pre-commit hook altogether.
if git rev-parse --verify HEAD >/dev/null 2>&1
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
PREFIX="${GREEN}[PRECOMMIT]${NC}"
FAILURE="${RED}FAILED${NC}"
WARNING="${RED}[WARNING]${NC}"
SKIPPED="${YELLOW}SKIPPED${NC}"
SUCCESS="${GREEN}ok${NC}"
if git rev-parse --verify HEAD &>/dev/null
then
against=HEAD
else
@@ -20,35 +43,70 @@ else
against=4b825dc642cb6eb9a060e54bf8d69288fbee4904
fi
# If you want to allow non-ASCII filenames set this variable to true.
allownonascii=$(git config --bool hooks.allownonascii)
FAILED=0
# Redirect output to stderr.
exec 1>&2
# Cross platform projects tend to avoid non-ASCII filenames; prevent
# them from being added to the repository. We exploit the fact that the
# printable range starts at the space character and ends with tilde.
if [ "$allownonascii" != "true" ] &&
# Note that the use of brackets around a tr range is ok here, (it's
# even required, for portability to Solaris 10's /usr/bin/tr), since
# the square bracket bytes happen to fall in the designated range.
test $(git diff --cached --name-only --diff-filter=A -z $against |
LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
printf "${PREFIX} Checking that all filenames are ascii ... "
# Note that the use of brackets around a tr range is ok here, (it's
# even required, for portability to Solaris 10's /usr/bin/tr), since
# the square bracket bytes happen to fall in the designated range.
if test $(git diff --cached --name-only --diff-filter=A -z $against | LC_ALL=C tr -d '[ -~]\0' | wc -c) != 0
then
cat <<\EOF
Error: Attempt to add a non-ASCII file name.
FAILED=1
printf "${FAILURE}\n"
else
printf "${SUCCESS}\n"
fi
This can cause problems if you want to work with people on other platforms.
printf "${PREFIX} Checking for bad whitespace ... "
git diff-index --check --cached $against -- &>/dev/null
if [ "$?" != 0 ]; then
FAILED=1
printf "${FAILURE}\n"
else
printf "${SUCCESS}\n"
fi
To be portable it is advisable to rename the file.
If you know what you are doing you can disable this check using:
git config hooks.allownonascii true
EOF
printf "${PREFIX} Checking for rustfmt ... "
command -v rustfmt &>/dev/null
if [ $? == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${FAILURE}\n"
exit 1
fi
# If there are whitespace errors, print the offending file names and fail.
exec git diff-index --check --cached $against --
printf "${PREFIX} Checking for shasum ... "
command -v shasum &>/dev/null
if [ $? == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${FAILURE}\n"
exit 1
fi
# Just check that running rustfmt doesn't do anything to the file. I do this instead of
# modifying the file because I don't want to mess with the developer's index, which may
# not only contain discrete files.
printf "${PREFIX} Checking formatting ... "
FMTRESULT=0
for file in $(git diff --name-only --cached);
do
if [ ${file: -3} == ".rs" ]; then
diff=$(rustfmt --skip-children --write-mode=diff $file)
if grep --quiet "^Diff at line" <<< "$diff"; then
FMTRESULT=1
fi
fi
done
if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then
printf "${SKIPPED}\n"$?
elif [ ${FMTRESULT} != 0 ]; then
FAILED=1
printf "${FAILURE}\n"
echo "$diff" | sed '/Using rustfmt.*$/d'
else
printf "${SUCCESS}\n"
fi
exit ${FAILED}

View File

@@ -1,39 +1,127 @@
#!/bin/bash
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
# root.
#
# This hook runs tests to make sure only working code is being pushed. If present, rustup is used
# to build and test the code on the appropriate toolchains. The working copy must not contain
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
#
# Options:
#
# - TARPC_ALLOW_DIRTY, default = 0
#
# Setting this variable to 1 will run tests even though there are code changes in the working
# copy. Set to 0 by default, since the intent is to test the code that's being pushed, not changes
# still in the working copy.
#
# - TARPC_USE_CURRENT_TOOLCHAIN, default = 0
#
# Setting this variable to 1 will just run cargo build and cargo test, rather than running
# stable/beta/nightly.
#
# Note that these options are most useful for testing the hooks themselves. Use git push --no-verify
# to skip the pre-push hook altogether.
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
PREFIX="${GREEN}[PREPUSH]${NC}"
FAILURE="${RED}FAILED${NC}"
WARNING="${YELLOW}[WARNING]${NC}"
SKIPPED="${YELLOW}SKIPPED${NC}"
SUCCESS="${GREEN}ok${NC}"
printf "${PREFIX} Clean working copy ... "
git diff --exit-code &>/dev/null
if [ "$?" != 0 ];
then
echo ${RED}ERROR${NC} You have uncommitted changes please commit or stash them before pushing so that I can run tests!
exit 1
fi
printf "${YELLOW}[PRESUBMIT]${NC} Running tests ... "
TEST_RESULT=0
cargo test --manifest-path tarpc/Cargo.toml &>/dev/null
if [ "$?" != "0" ];
then
printf "${RED}FAILED${NC}"
TEST_RESULT=1
if [ "$?" == 0 ]; then
printf "${SUCCESS}\n"
else
printf "${GREEN}ok${NC}"
fi
printf "\n"
RESULT=0
if [ "$TEST_RESULT" == "1" ];
then
RESULT=1
if [ "${TARPC_ALLOW_DIRTY}" == "1" ]
then
printf "${SKIPPED}\n"
else
printf "${FAILURE}\n"
exit 1
fi
fi
exit $RESULT
PREPUSH_RESULT=0
# args:
# 1 - cargo command to run (build/test)
# 2 - directory name of crate to build
# 3 - rust toolchain (nightly/stable/beta)
run_cargo() {
if [ "$1" == "build" ]; then
VERB=Building
else
VERB=Testing
fi
if [ "$3" != "" ]; then
printf "${PREFIX} $VERB $2 on $3 ... "
rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
else
printf "${PREFIX} $VERB $2 ... "
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
fi
if [ "$?" != "0" ]; then
printf "${FAILURE}\n"
PREPUSH_RESULT=1
else
printf "${SUCCESS}\n"
fi
}
TOOLCHAIN_RESULT=0
check_toolchain() {
printf "${PREFIX} Checking for $1 toolchain ... "
if [[ $(rustup toolchain list) =~ $1 ]]; then
printf "${SUCCESS}\n"
else
TOOLCHAIN_RESULT=1
PREPUSH_RESULT=1
printf "${FAILURE}\n"
fi
}
printf "${PREFIX} Checking for rustup ... "
command -v rustup &>/dev/null
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
printf "${SUCCESS}\n"
check_toolchain stable
check_toolchain beta
check_toolchain nightly
if [ ${TOOLCHAIN_RESULT} == 1 ]; then
exit 1
fi
run_cargo build tarpc stable
run_cargo build tarpc_examples stable
run_cargo build tarpc beta
run_cargo build tarpc_examples beta
run_cargo build tarpc nightly
run_cargo build tarpc_examples nightly
# We still rely on some nightly stuff for tests
run_cargo test tarpc nightly
run_cargo test tarpc_examples nightly
else
printf "${YELLOW}NOT FOUND${NC}\n"
printf "${WARNING} Falling back to current toolchain: $(rustc -V)\n"
run_cargo test tarpc
run_cargo test tarpc_examples
fi
exit $PREPUSH_RESULT

View File

@@ -1,6 +1,6 @@
[package]
name = "tarpc"
version = "0.2.0"
version = "0.6.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT"
documentation = "https://google.github.io/tarpc"
@@ -11,11 +11,13 @@ readme = "../README.md"
description = "An RPC framework for Rust with a focus on ease of use."
[dependencies]
bincode = "^0.4.0"
log = "^0.3.5"
scoped-pool = "^0.1.4"
serde = "^0.6.13"
bincode = "0.6"
log = "0.3"
scoped-pool = "1.0"
serde = "0.8"
unix_socket = "0.5"
[dev-dependencies]
lazy_static = "^0.1.15"
env_logger = "^0.3.2"
lazy_static = "0.2"
env_logger = "0.3"
tempdir = "0.3"

View File

@@ -1,8 +0,0 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
# This file may not be copied, modified, or distributed except according to those terms.
#!/bin/sh
ln -s ../../hooks/pre-commit .git/hooks/pre-commit
ln -s ../../hooks/pre-push .git/hooks/pre-push

View File

@@ -1 +1,2 @@
ideal_width = 100
reorder_imports = true

View File

@@ -8,7 +8,7 @@
//! Example usage:
//!
//! ```
//! # #[macro_use] extern crate tarpc;
//! #[macro_use] extern crate tarpc;
//! mod my_server {
//! service! {
//! rpc hello(name: String) -> String;
@@ -30,17 +30,13 @@
//! }
//!
//! fn main() {
//! let addr = "127.0.0.1:9000";
//! let shutdown = my_server::serve(addr,
//! Server,
//! Some(Duration::from_secs(30)))
//! .unwrap();
//! let client = Client::new(addr, None).unwrap();
//! let serve_handle = Server.spawn("localhost:0").unwrap();
//! let client = Client::new(serve_handle.dialer()).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(),
//! client.hello("Mom".to_string()).unwrap());
//! drop(client);
//! shutdown.shutdown();
//! serve_handle.shutdown();
//! }
//! ```
@@ -51,6 +47,7 @@ extern crate bincode;
#[macro_use]
extern crate log;
extern crate scoped_pool;
extern crate unix_socket;
macro_rules! pos {
() => (concat!(file!(), ":", line!()))
@@ -63,4 +60,7 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
pub use protocol::{Error, Result, ServeHandle};
/// Provides transport traits and implementations.
pub mod transport;
pub use protocol::{Config, Error, Result, ServeHandle};

View File

@@ -8,7 +8,7 @@ pub mod serde {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Deserialization re-exports required by macros. Not for general use.
pub mod de {
pub use serde::de::{EnumVisitor, Error, Visitor, VariantVisitor};
pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor};
}
}
@@ -21,6 +21,7 @@ macro_rules! client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($($arg:ident,)*) : ($($in_:ty,)*) ) -> $out:ty
) => (
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
@@ -32,13 +33,17 @@ macro_rules! client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident,)*) : ($($in_:ty, )*) ) -> $out:ty
)*) => ( $(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(__Request::$fn_name(($($arg,)*))));
if let __Reply::$fn_name(reply) = reply {
::std::result::Result::Ok(reply)
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, \
but got {:?}",
stringify!($fn_name),
reply);
}
}
)*);
@@ -53,6 +58,7 @@ macro_rules! async_client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
) => (
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
fn mapper(reply: __Reply) -> $out {
@@ -70,13 +76,17 @@ macro_rules! async_client_methods {
{ $(#[$attr:meta])* }
$fn_name:ident( ($( $arg:ident, )*) : ($( $in_:ty, )*) ) -> $out:ty
)*) => ( $(
#[allow(unused)]
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> Future<$out> {
fn mapper(reply: __Reply) -> $out {
if let __Reply::$fn_name(reply) = reply {
reply
} else {
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
panic!("Incorrect reply variant returned from rpc; expected `{}`, but got \
{:?}",
stringify!($fn_name),
reply);
}
}
let reply = (self.0).rpc_async(__Request::$fn_name(($($arg,)*)));
@@ -100,7 +110,7 @@ macro_rules! impl_serialize {
match *self {
$(
$impler::$name(ref field) =>
$crate::macros::serde::Serializer::visit_newtype_variant(
$crate::macros::serde::Serializer::serialize_newtype_variant(
serializer,
stringify!($impler),
$n,
@@ -130,7 +140,7 @@ macro_rules! impl_deserialize {
-> ::std::result::Result<$impler, D::Error>
where D: $crate::macros::serde::Deserializer
{
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
enum __Field {
$($name),*
}
@@ -144,6 +154,7 @@ macro_rules! impl_deserialize {
impl $crate::macros::serde::de::Visitor for __FieldVisitor {
type Value = __Field;
#[inline]
fn visit_usize<E>(&mut self, value: usize)
-> ::std::result::Result<__Field, E>
where E: $crate::macros::serde::de::Error,
@@ -154,11 +165,12 @@ macro_rules! impl_deserialize {
}
)*
return ::std::result::Result::Err(
$crate::macros::serde::de::Error::syntax("expected a field")
$crate::macros::serde::de::Error::custom(
format!("No variants have a value of {}!", value))
);
}
}
deserializer.visit_struct_field(__FieldVisitor)
deserializer.deserialize_struct_field(__FieldVisitor)
}
}
@@ -166,6 +178,7 @@ macro_rules! impl_deserialize {
impl $crate::macros::serde::de::EnumVisitor for __Visitor {
type Value = $impler;
#[inline]
fn visit<__V>(&mut self, mut visitor: __V)
-> ::std::result::Result<$impler, __V::Error>
where __V: $crate::macros::serde::de::VariantVisitor
@@ -185,7 +198,7 @@ macro_rules! impl_deserialize {
stringify!($name)
),*
];
deserializer.visit_enum(stringify!($impler), VARIANTS, __Visitor)
deserializer.deserialize_enum(stringify!($impler), VARIANTS, __Visitor)
}
}
);
@@ -210,17 +223,22 @@ macro_rules! impl_deserialize {
/// # }
/// ```
///
/// There are two rpc names reserved for the default fns `spawn` and `spawn_with_config`.
///
/// Attributes can be attached to each rpc. These attributes
/// will then be attached to the generated `Service` trait's
/// corresponding method, as well as to the `Client` stub's rpcs methods.
///
/// The following items are expanded in the enclosing module:
///
/// * `Service` -- the trait defining the RPC service
/// * `Service` -- the trait defining the RPC service. It comes with two default methods for
/// starting the server:
/// 1. `spawn` starts the service in another thread using default configuration.
/// 2. `spawn_with_config` starts the service in another thread using the specified
/// `Config`.
/// * `Client` -- a client that makes synchronous requests to the RPC server
/// * `AsyncClient` -- a client that makes asynchronous requests to the RPC server
/// * `Future` -- a handle for asynchronously retrieving the result of an RPC
/// * `serve` -- the function that starts the RPC server
///
/// **Warning**: In addition to the above items, there are a few expanded items that
/// are considered implementation details. As with the above items, shadowing
@@ -232,19 +250,21 @@ macro_rules! impl_deserialize {
/// * `__Reply` -- an implementation detail
#[macro_export]
macro_rules! service {
// Entry point
(
$( $tokens:tt )*
$(
$(#[$attr:meta])*
rpc $fn_name:ident( $( $arg:ident : $in_:ty ),* ) $(-> $out:ty)*;
)*
) => {
service_inner! {{
$( $tokens )*
service! {{
$(
$(#[$attr])*
rpc $fn_name( $( $arg : $in_ ),* ) $(-> $out)*;
)*
}}
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! service_inner {
// Pattern for when the next rpc has an implicit unit return type
};
// Pattern for when the next rpc has an implicit unit return type
(
{
$(#[$attr:meta])*
@@ -254,7 +274,7 @@ macro_rules! service_inner {
}
$( $expanded:tt )*
) => {
service_inner! {
service! {
{ $( $unexpanded )* }
$( $expanded )*
@@ -263,7 +283,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
}
};
// Pattern for when the next rpc has an explicit return type
// Pattern for when the next rpc has an explicit return type
(
{
$(#[$attr:meta])*
@@ -273,7 +293,7 @@ macro_rules! service_inner {
}
$( $expanded:tt )*
) => {
service_inner! {
service! {
{ $( $unexpanded )* }
$( $expanded )*
@@ -282,7 +302,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
}
};
// Pattern when all return types have been expanded
// Pattern for when all return types have been expanded
(
{ } // none left to expand
$(
@@ -291,15 +311,43 @@ macro_rules! service_inner {
)*
) => {
#[doc="Defines the RPC service"]
pub trait Service: Send + Sync {
pub trait Service: Send + Sync + Sized {
$(
$(#[$attr])*
fn $fn_name(&self, $($arg:$in_),*) -> $out;
)*
#[doc="Spawn a running service."]
fn spawn<T>(self,
transport: T)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
self.spawn_with_config(transport, $crate::Config::default())
}
#[doc="Spawn a running service."]
fn spawn_with_config<T>(self,
transport: T,
config: $crate::Config)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static,
{
let server = __Server(self);
let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
let handle = try!(result);
::std::result::Result::Ok(handle)
}
}
impl<P, S> Service for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + Sized + 'static + ::std::ops::Deref<Target=S>,
S: Service
{
$(
@@ -310,7 +358,7 @@ macro_rules! service_inner {
)*
}
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __Request {
$(
@@ -321,7 +369,7 @@ macro_rules! service_inner {
impl_serialize!(__Request, $($fn_name(($($in_),*)))*);
impl_deserialize!(__Request, $($fn_name(($($in_),*)))*);
#[allow(non_camel_case_types)]
#[allow(non_camel_case_types, unused)]
#[derive(Debug)]
enum __Reply {
$(
@@ -332,29 +380,46 @@ macro_rules! service_inner {
impl_serialize!(__Reply, $($fn_name($out))*);
impl_deserialize!(__Reply, $($fn_name($out))*);
/// An asynchronous RPC call
#[allow(unused)]
#[doc="An asynchronous RPC call"]
pub struct Future<T> {
future: $crate::protocol::Future<__Reply>,
mapper: fn(__Reply) -> T,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
#[allow(unused)]
#[doc="Block until the result of the RPC call is available"]
pub fn get(self) -> $crate::Result<T> {
self.future.get().map(self.mapper)
}
}
#[allow(unused)]
#[doc="The client stub that makes RPC calls to the server."]
pub struct Client($crate::protocol::Client<__Request, __Reply>);
pub struct Client<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl Client {
#[doc="Create a new client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
impl<S> Client<S>
where S: $crate::transport::Stream
{
#[allow(unused)]
#[doc="Create a new client with default configuration that connects to the given \
address."]
pub fn new<D>(dialer: D) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
Self::with_config(dialer, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new client with the specified configuration that connects to the \
given address."]
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(Client(inner))
}
@@ -365,6 +430,7 @@ macro_rules! service_inner {
)*
);
#[allow(unused)]
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
clone fails."]
pub fn try_clone(&self) -> ::std::io::Result<Self> {
@@ -372,16 +438,29 @@ macro_rules! service_inner {
}
}
#[allow(unused)]
#[doc="The client stub that makes asynchronous RPC calls to the server."]
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
pub struct AsyncClient<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl AsyncClient {
#[doc="Create a new asynchronous client that connects to the given address."]
pub fn new<A>(addr: A, timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs,
impl<S> AsyncClient<S>
where S: $crate::transport::Stream {
#[allow(unused)]
#[doc="Create a new asynchronous client with default configuration that connects to \
the given address."]
pub fn new<D>(dialer: D) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::new(addr, timeout));
Self::with_config(dialer, $crate::Config::default())
}
#[allow(unused)]
#[doc="Create a new asynchronous client that connects to the given address."]
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
where D: $crate::transport::Dialer<Stream=S>,
{
let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(AsyncClient(inner))
}
@@ -392,6 +471,7 @@ macro_rules! service_inner {
)*
);
#[allow(unused)]
#[doc="Attempt to clone the client object. This might fail if the underlying TcpStream \
clone fails."]
pub fn try_clone(&self) -> ::std::io::Result<Self> {
@@ -399,7 +479,9 @@ macro_rules! service_inner {
}
}
struct __Server<S: 'static + Service>(S);
#[allow(unused)]
struct __Server<S>(S)
where S: 'static + Service;
impl<S> $crate::protocol::Serve for __Server<S>
where S: 'static + Service
@@ -415,22 +497,11 @@ macro_rules! service_inner {
}
}
}
#[doc="Start a running service."]
pub fn serve<A, S>(addr: A,
service: S,
read_timeout: ::std::option::Option<::std::time::Duration>)
-> $crate::Result<$crate::protocol::ServeHandle>
where A: ::std::net::ToSocketAddrs,
S: 'static + Service
{
let server = ::std::sync::Arc::new(__Server(service));
::std::result::Result::Ok(try!($crate::protocol::serve_async(addr, server, read_timeout)))
}
}
}
#[allow(dead_code)] // because we're just testing that the macro expansion compiles
#[allow(dead_code)]
// because we're just testing that the macro expansion compiles
#[cfg(test)]
mod syntax_test {
// Tests a service definition with a fn that takes no args
@@ -461,14 +532,12 @@ mod syntax_test {
#[cfg(test)]
mod functional_test {
extern crate env_logger;
use std::time::Duration;
fn test_timeout() -> Option<Duration> {
Some(Duration::from_secs(5))
}
extern crate tempdir;
use transport::unix::UnixTransport;
service! {
rpc add(x: i32, y: i32) -> i32;
rpc hey(name: String) -> String;
}
struct Server;
@@ -477,14 +546,18 @@ mod functional_test {
fn add(&self, x: i32, y: i32) -> i32 {
x + y
}
fn hey(&self, name: String) -> String {
format!("Hey, {}.", name)
}
}
#[test]
fn simple() {
let _ = env_logger::init();
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let client = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = Client::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
drop(client);
handle.shutdown();
}
@@ -492,17 +565,18 @@ mod functional_test {
#[test]
fn simple_async() {
let _ = env_logger::init();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client = AsyncClient::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).get().unwrap());
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
drop(client);
handle.shutdown();
}
#[test]
fn try_clone() {
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let client1 = Client::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = Client::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap());
@@ -510,8 +584,20 @@ mod functional_test {
#[test]
fn async_try_clone() {
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client1 = AsyncClient::new(handle.local_addr(), None).unwrap();
let handle = Server.spawn("localhost:0").unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
}
#[test]
fn async_try_clone_unix() {
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
let temp_file = temp_dir.path()
.join("async_try_clone_unix.tmp");
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
@@ -520,7 +606,13 @@ mod functional_test {
// Tests that a server can be wrapped in an Arc; no need to run, just compile
#[allow(dead_code)]
fn serve_arc_server() {
let _ = serve("localhost:0", ::std::sync::Arc::new(Server), None);
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
}
// Tests that a tcp client can be created from &str
#[allow(dead_code)]
fn test_client_str() {
let _ = Client::new("localhost:0");
}
#[test]

View File

@@ -5,38 +5,49 @@
use serde;
use std::fmt;
use std::io::{self, BufReader, BufWriter, Read};
use std::io::{self, BufReader, BufWriter};
use std::collections::HashMap;
use std::mem;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use std::time::Duration;
use super::{Serialize, Deserialize, Error, Packet, Result};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Stream};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
where Request: serde::ser::Serialize
pub struct Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream
{
// The guard is in an option so it can be joined in the drop fn
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
outbound: Sender<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
shutdown: TcpStream,
shutdown: S,
}
impl<Request, Reply> Client<Request, Reply>
impl<Request, Reply, S> Client<Request, Reply, S>
where Request: serde::ser::Serialize + Send + 'static,
Reply: serde::de::Deserialize + Send + 'static
Reply: serde::de::Deserialize + Send + 'static,
S: Stream
{
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
try!(stream.set_read_timeout(timeout));
try!(stream.set_write_timeout(timeout));
pub fn new<D>(dialer: D) -> io::Result<Self>
where D: Dialer<Stream = S>
{
Self::with_config(dialer, Config::default())
}
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
where D: Dialer<Stream = S>
{
let stream = try!(dialer.dial());
try!(stream.set_read_timeout(config.timeout));
try!(stream.set_write_timeout(config.timeout));
let reader_stream = try!(stream.try_clone());
let writer_stream = try!(stream.try_clone());
let requests = Arc::new(Mutex::new(RpcFutures::new()));
@@ -54,7 +65,7 @@ impl<Request, Reply> Client<Request, Reply>
}
/// Clones the Client so that it can be shared across threads.
pub fn try_clone(&self) -> io::Result<Client<Request, Reply>> {
pub fn try_clone(&self) -> io::Result<Self> {
Ok(Client {
reader_guard: self.reader_guard.clone(),
outbound: self.outbound.clone(),
@@ -92,15 +103,17 @@ impl<Request, Reply> Client<Request, Reply>
}
}
impl<Request, Reply> Drop for Client<Request, Reply>
where Request: serde::ser::Serialize
impl<Request, Reply, S> Drop for Client<Request, Reply, S>
where Request: serde::ser::Serialize,
S: Stream
{
fn drop(&mut self) {
debug!("Dropping Client.");
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
debug!("Attempting to shut down writer and reader threads.");
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
warn!("Client: couldn't shutdown writer and reader threads: {:?}", e);
if let Err(e) = self.shutdown.shutdown() {
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
e);
} else {
// We only join if we know the TcpStream was shut down. Otherwise we might never
// finish.
@@ -118,14 +131,15 @@ impl<Request, Reply> Drop for Client<Request, Reply>
/// An asynchronous RPC call
pub struct Future<T> {
rx: Receiver<Result<T>>,
requests: Arc<Mutex<RpcFutures<T>>>
requests: Arc<Mutex<RpcFutures<T>>>,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
pub fn get(self) -> Result<T> {
let requests = self.requests;
self.rx.recv()
self.rx
.recv()
.map_err(|_| requests.lock().expect(pos!()).get_error())
.and_then(|reply| reply)
}
@@ -164,7 +178,8 @@ impl<Reply> RpcFutures<Reply> {
info!("Reader: could not complete reply: {:?}", e);
}
} else {
warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id);
warn!("RpcFutures: expected sender for id {} but got None!",
packet.rpc_id);
}
}
@@ -177,11 +192,12 @@ impl<Reply> RpcFutures<Reply> {
}
}
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: S)
where Request: serde::Serialize,
Reply: serde::Deserialize,
S: Stream
{
let mut next_id = 0;
let mut stream = BufWriter::new(stream);
@@ -221,8 +237,8 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
{
// Clone the err so we can log it if sending fails
if let Err(e2) = tx.send(Err(e.clone())) {
debug!("Error encountered while trying to send an error. \
Initial error: {:?}; Send error: {:?}",
debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \
error: {:?}",
e,
e2);
}
@@ -230,8 +246,9 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
}
fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
where Reply: serde::Deserialize
fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
where Reply: serde::Deserialize,
S: Stream
{
let mut stream = BufReader::new(stream);
loop {

View File

@@ -6,9 +6,11 @@
use bincode::{self, SizeLimit};
use bincode::serde::{deserialize_from, serialize_into};
use serde;
use serde::de::value::Error::EndOfStream;
use std::io::{self, Read, Write};
use std::convert;
use std::sync::Arc;
use std::time::Duration;
mod client;
mod server;
@@ -16,7 +18,7 @@ mod packet;
pub use self::packet::Packet;
pub use self::client::{Client, Future};
pub use self::server::{Serve, ServeHandle, serve_async};
pub use self::server::{Serve, ServeHandle};
/// Client errors that can occur during rpc calls
#[derive(Debug, Clone)]
@@ -39,10 +41,14 @@ impl convert::From<bincode::serde::SerializeError> for Error {
impl convert::From<bincode::serde::DeserializeError> for Error {
fn from(err: bincode::serde::DeserializeError) -> Error {
match err {
bincode::serde::DeserializeError::IoError(ref err)
if err.kind() == io::ErrorKind::ConnectionReset => Error::ConnectionBroken,
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken,
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)),
bincode::serde::DeserializeError::Serde(EndOfStream) => Error::ConnectionBroken,
bincode::serde::DeserializeError::IoError(err) => {
match err.kind() {
io::ErrorKind::ConnectionReset |
io::ErrorKind::UnexpectedEof => Error::ConnectionBroken,
_ => Error::Io(Arc::new(err)),
}
}
err => panic!("Unexpected error during deserialization: {:?}", err),
}
}
@@ -54,13 +60,19 @@ impl convert::From<io::Error> for Error {
}
}
/// Configuration for client and server.
#[derive(Debug, Default)]
pub struct Config {
/// Request/Response timeout between packet delivery.
pub timeout: Option<Duration>,
}
/// Return type of rpc calls: either the successful return value, or a client error.
pub type Result<T> = ::std::result::Result<T, Error>;
trait Deserialize: Read + Sized {
fn deserialize<T: serde::Deserialize>(&mut self) -> Result<T> {
deserialize_from(self, SizeLimit::Infinite)
.map_err(Error::from)
deserialize_from(self, SizeLimit::Infinite).map_err(Error::from)
}
}
@@ -79,8 +91,9 @@ impl<W: Write> Serialize for W {}
#[cfg(test)]
mod test {
extern crate env_logger;
use super::{Client, Serve, serve_async};
use super::{Client, Config, Serve};
use scoped_pool::Pool;
use std::net::TcpStream;
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::time::Duration;
@@ -119,8 +132,8 @@ mod test {
fn handle() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr(), None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
drop(client);
serve_handle.shutdown();
}
@@ -129,10 +142,9 @@ mod test {
fn simple() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let addr = serve_handle.local_addr().clone();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
assert_eq!(0, client.rpc(()).unwrap());
assert_eq!(1, server.count());
assert_eq!(1, client.rpc(()).unwrap());
@@ -172,9 +184,10 @@ mod test {
fn force_shutdown() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, Some(Duration::new(0, 10))).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let serve_handle = server.spawn_with_config("localhost:0",
Config { timeout: Some(Duration::new(0, 10)) })
.unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
thread.join().unwrap();
@@ -184,9 +197,10 @@ mod test {
fn client_failed_rpc() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr, None).unwrap());
let serve_handle =
server.spawn_with_config("localhost:0", Config { timeout: test_timeout() })
.unwrap();
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
client.rpc(()).unwrap();
serve_handle.shutdown();
match client.rpc(()) {
@@ -202,13 +216,14 @@ mod test {
let concurrency = 10;
let pool = Pool::new(concurrency);
let server = Arc::new(BarrierServer::new(concurrency));
let serve_handle = serve_async("localhost:0", server.clone(), test_timeout()).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let serve_handle = server.clone().spawn("localhost:0").unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
pool.scoped(|scope| {
for _ in 0..concurrency {
let client = client.try_clone().unwrap();
scope.execute(move || { client.rpc(()).unwrap(); });
scope.execute(move || {
client.rpc(()).unwrap();
});
}
});
assert_eq!(concurrency as u64, server.count());
@@ -220,9 +235,8 @@ mod test {
fn async() {
let _ = env_logger::init();
let server = Arc::new(Server::new());
let serve_handle = serve_async("localhost:0", server.clone(), None).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Client<(), u64> = Client::new(addr, None).unwrap();
let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
// Drop future immediately; does the reader channel panic when sending?
client.rpc_async(());

View File

@@ -1,4 +1,4 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer, de, ser};
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
use std::marker::PhantomData;
/// Packet shared between client and server.
@@ -19,44 +19,20 @@ impl<T: Serialize> Serialize for Packet<T> {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer
{
serializer.visit_struct(PACKET, MapVisitor {
value: self,
state: 0,
})
}
}
struct MapVisitor<'a, T: 'a> {
value: &'a Packet<T>,
state: u8,
}
impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
fn visit<S>(&mut self, serializer: &mut S) -> Result<Option<()>, S::Error>
where S: Serializer
{
match self.state {
0 => {
self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(RPC_ID, &self.value.rpc_id))))
}
1 => {
self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
}
_ => {
Ok(None)
}
}
let mut state = try!(serializer.serialize_struct(PACKET, 2));
try!(serializer.serialize_struct_elt(&mut state, RPC_ID, &self.rpc_id));
try!(serializer.serialize_struct_elt(&mut state, MESSAGE, &self.message));
serializer.serialize_struct_end(state)
}
}
impl<T: Deserialize> Deserialize for Packet<T> {
#[inline]
fn deserialize<D>(deserializer: &mut D) -> Result<Self, D::Error>
where D: Deserializer
{
const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE];
deserializer.visit_struct(PACKET, FIELDS, Visitor(PhantomData))
deserializer.deserialize_struct(PACKET, FIELDS, Visitor(PhantomData))
}
}
@@ -65,6 +41,7 @@ struct Visitor<T>(PhantomData<T>);
impl<T: Deserialize> de::Visitor for Visitor<T> {
type Value = Packet<T>;
#[inline]
fn visit_seq<V>(&mut self, mut visitor: V) -> Result<Packet<T>, V::Error>
where V: de::SeqVisitor
{
@@ -91,7 +68,10 @@ fn serde() {
use bincode;
let _ = env_logger::init();
let packet = Packet { rpc_id: 1, message: () };
let packet = Packet {
rpc_id: 1,
message: (),
};
let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap();
let de = bincode::serde::deserialize(&ser);
assert_eq!(packet, de.unwrap());

View File

@@ -7,24 +7,27 @@ use serde;
use scoped_pool::{Pool, Scope};
use std::fmt;
use std::io::{self, BufReader, BufWriter};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::{Deserialize, Error, Packet, Result, Serialize};
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Listener, Stream, Transport};
use transport::tcp::TcpDialer;
struct ConnectionHandler<'a, S>
where S: Serve
struct ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream
{
read_stream: BufReader<TcpStream>,
write_stream: BufWriter<TcpStream>,
read_stream: BufReader<St>,
write_stream: BufWriter<St>,
server: S,
shutdown: &'a AtomicBool,
}
impl<'a, S> ConnectionHandler<'a, S>
where S: Serve
impl<'a, S, St> ConnectionHandler<'a, S, St>
where S: Serve,
St: Stream
{
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
let ConnectionHandler {
@@ -38,13 +41,13 @@ impl<'a, S> ConnectionHandler<'a, S>
scope.execute(move || Self::write(rx, write_stream));
loop {
match read_stream.deserialize() {
Ok(Packet { rpc_id, message, }) => {
Ok(Packet { rpc_id, message }) => {
let tx = tx.clone();
scope.execute(move || {
let reply = server.serve(message);
let reply_packet = Packet {
rpc_id: rpc_id,
message: reply
message: reply,
};
tx.send(reply_packet).expect(pos!());
});
@@ -55,8 +58,8 @@ impl<'a, S> ConnectionHandler<'a, S>
}
Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => {
if !shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: read timed out ({:?}). Server not \
shutdown, so retrying read.",
info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \
retrying read.",
err);
continue;
} else {
@@ -78,12 +81,13 @@ impl<'a, S> ConnectionHandler<'a, S>
fn timed_out(error_kind: io::ErrorKind) -> bool {
match error_kind {
io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => true,
io::ErrorKind::TimedOut |
io::ErrorKind::WouldBlock => true,
_ => false,
}
}
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<TcpStream>) {
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<St>) {
loop {
match rx.recv() {
Err(e) => {
@@ -101,21 +105,25 @@ impl<'a, S> ConnectionHandler<'a, S>
}
/// Provides methods for blocking until the server completes,
pub struct ServeHandle {
pub struct ServeHandle<D = TcpDialer>
where D: Dialer
{
tx: Sender<()>,
join_handle: JoinHandle<()>,
addr: SocketAddr,
dialer: D,
}
impl ServeHandle {
impl<D> ServeHandle<D>
where D: Dialer
{
/// Block until the server completes
pub fn wait(self) {
self.join_handle.join().expect(pos!());
}
/// Returns the address the server is bound to
pub fn local_addr(&self) -> &SocketAddr {
&self.addr
/// Returns the dialer to the server.
pub fn dialer(&self) -> &D {
&self.dialer
}
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
@@ -123,7 +131,7 @@ impl ServeHandle {
pub fn shutdown(self) {
info!("ServeHandle: attempting to shut down the server.");
self.tx.send(()).expect(pos!());
if let Ok(_) = TcpStream::connect(self.addr) {
if let Ok(_) = self.dialer.dial() {
self.join_handle.join().expect(pos!());
} else {
warn!("ServeHandle: best effort shutdown of serve thread failed");
@@ -131,18 +139,23 @@ impl ServeHandle {
}
}
struct Server<'a, S: 'a> {
struct Server<'a, S: 'a, L>
where L: Listener
{
server: &'a S,
listener: TcpListener,
listener: L,
read_timeout: Option<Duration>,
die_rx: Receiver<()>,
shutdown: &'a AtomicBool,
}
impl<'a, S: 'a> Server<'a, S>
where S: Serve + 'static
impl<'a, S, L> Server<'a, S, L>
where S: Serve + 'static,
L: Listener
{
fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b {
fn serve<'b>(self, scope: &Scope<'b>)
where 'a: 'b
{
for conn in self.listener.incoming() {
match self.die_rx.try_recv() {
Ok(_) => {
@@ -169,7 +182,7 @@ impl<'a, S: 'a> Server<'a, S>
let read_conn = match conn.try_clone() {
Err(err) => {
error!("serve: could not clone tcp stream; possibly out of file descriptors? \
Err: {:?}",
Err: {:?}",
err);
continue;
}
@@ -192,48 +205,17 @@ impl<'a, S: 'a> Server<'a, S>
}
}
impl<'a, S> Drop for Server<'a, S> {
impl<'a, S, L> Drop for Server<'a, S, L>
where L: Listener
{
fn drop(&mut self) {
debug!("Shutting down connection handlers.");
self.shutdown.store(true, Ordering::SeqCst);
}
}
/// Start
pub fn serve_async<A, S>(addr: A,
server: S,
read_timeout: Option<Duration>)
-> io::Result<ServeHandle>
where A: ToSocketAddrs,
S: 'static + Serve
{
let listener = try!(TcpListener::bind(&addr));
let addr = try!(listener.local_addr());
info!("serve_async: spinning up server on {:?}", addr);
let (die_tx, die_rx) = channel();
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &server,
listener: listener,
read_timeout: read_timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
addr: addr.clone(),
})
}
/// A service provided by a server
pub trait Serve: Send + Sync {
pub trait Serve: Send + Sync + Sized {
/// The type of request received by the server
type Request: 'static + fmt::Debug + serde::ser::Serialize + serde::de::Deserialize + Send;
/// The type of reply sent by the server
@@ -241,10 +223,52 @@ pub trait Serve: Send + Sync {
/// Return a reply for a given request
fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn
fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static
{
self.spawn_with_config(transport, Config::default())
}
/// spawn
fn spawn_with_config<T>(self,
transport: T,
config: Config)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static
{
let listener = try!(transport.bind());
let dialer = try!(listener.dialer());
info!("spawn_with_config: spinning up server.");
let (die_tx, die_rx) = channel();
let timeout = config.timeout;
let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false);
let server = Server {
server: &self,
listener: listener,
read_timeout: timeout,
die_rx: die_rx,
shutdown: &shutdown,
};
pool.scoped(|scope| {
server.serve(scope);
});
});
Ok(ServeHandle {
tx: die_tx,
join_handle: join_handle,
dialer: dialer,
})
}
}
impl<P, S> Serve for P
where P: Send + Sync + ::std::ops::Deref<Target=S>,
where P: Send + Sync + ::std::ops::Deref<Target = S>,
S: Serve
{
type Request = S::Request;

View File

@@ -0,0 +1,91 @@
use std::io::{self, Read, Write};
use std::time::Duration;
/// A factory for creating a listener on a given address.
/// For TCP, an address might be an IPv4 address; for Unix sockets, it
/// is just a file name.
pub trait Transport {
/// The type of listener that binds to the given address.
type Listener: Listener;
/// Return a listener on the given address, and a dialer to that address.
fn bind(&self) -> io::Result<Self::Listener>;
}
/// Accepts incoming connections from dialers.
pub trait Listener: Send + 'static {
/// The type of address being listened on.
type Dialer: Dialer;
/// The type of stream this listener accepts.
type Stream: Stream;
/// Accept an incoming stream.
fn accept(&self) -> io::Result<Self::Stream>;
/// Returns the local address being listened on.
fn dialer(&self) -> io::Result<Self::Dialer>;
/// Iterate over incoming connections.
fn incoming(&self) -> Incoming<Self> {
Incoming { listener: self }
}
}
/// A cloneable Reader/Writer.
pub trait Stream: Read + Write + Send + Sized + 'static {
/// Creates a new independently owned handle to the Stream.
///
/// The returned Stream should reference the same stream that this
/// object references. Both handles should read and write the same
/// stream of data, and options set on one stream should be propagated
/// to the other stream.
fn try_clone(&self) -> io::Result<Self>;
/// Sets a read timeout.
///
/// If the value specified is `None`, then read calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Sets a write timeout.
///
/// If the value specified is `None`, then write calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Shuts down both ends of the stream.
///
/// Implementations should cause all pending and future I/O on the specified
/// portions to return immediately with an appropriate value.
fn shutdown(&self) -> io::Result<()>;
}
/// A `Stream` factory.
pub trait Dialer {
/// The type of `Stream` this can create.
type Stream: Stream;
/// Open a stream.
fn dial(&self) -> io::Result<Self::Stream>;
}
impl<P, D: ?Sized> Dialer for P
where P: ::std::ops::Deref<Target = D>,
D: Dialer + 'static
{
type Stream = D::Stream;
fn dial(&self) -> io::Result<Self::Stream> {
(**self).dial()
}
}
/// Iterates over incoming connections.
pub struct Incoming<'a, L: Listener + ?Sized + 'a> {
listener: &'a L,
}
impl<'a, L: Listener> Iterator for Incoming<'a, L> {
type Item = io::Result<L::Stream>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.listener.accept())
}
}
/// Provides a TCP transport.
pub mod tcp;
/// Provides a unix socket transport.
pub mod unix;

View File

@@ -0,0 +1,77 @@
use std::io;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::time::Duration;
/// A transport for TCP.
#[derive(Debug)]
pub struct TcpTransport<A: ToSocketAddrs>(pub A);
impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(&self.0)
}
}
impl<A: ToSocketAddrs> super::Transport for A {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(self)
}
}
impl super::Listener for TcpListener {
type Dialer = TcpDialer<SocketAddr>;
type Stream = TcpStream;
fn accept(&self) -> io::Result<TcpStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<TcpDialer<SocketAddr>> {
self.local_addr().map(|addr| TcpDialer(addr))
}
}
impl super::Stream for TcpStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(dur)
}
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(dur)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}
/// Connects to a socket address.
#[derive(Debug)]
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
impl<A> super::Dialer for TcpDialer<A>
where A: ToSocketAddrs
{
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(&self.0)
}
}
impl super::Dialer for str {
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(self)
}
}

View File

@@ -0,0 +1,72 @@
use std::io;
use std::path::{Path, PathBuf};
use std::time::Duration;
use unix_socket::{UnixListener, UnixStream};
/// A transport for unix sockets.
#[derive(Debug)]
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
impl<P> super::Transport for UnixTransport<P>
where P: AsRef<Path>
{
type Listener = UnixListener;
fn bind(&self) -> io::Result<UnixListener> {
UnixListener::bind(&self.0)
}
}
/// Connects to a unix socket address.
#[derive(Debug)]
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
impl<P> super::Dialer for UnixDialer<P>
where P: AsRef<Path>
{
type Stream = UnixStream;
fn dial(&self) -> io::Result<UnixStream> {
UnixStream::connect(&self.0)
}
}
impl super::Listener for UnixListener {
type Stream = UnixStream;
type Dialer = UnixDialer<PathBuf>;
fn accept(&self) -> io::Result<UnixStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<UnixDialer<PathBuf>> {
self.local_addr().and_then(|addr| {
match addr.as_pathname() {
Some(path) => Ok(UnixDialer(path.to_owned())),
None => {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"Couldn't get a path to bound unix socket"))
}
}
})
}
}
impl super::Stream for UnixStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(timeout)
}
fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(timeout)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}

View File

@@ -5,5 +5,5 @@ authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.k
[dev-dependencies]
tarpc = { path = "../tarpc" }
lazy_static = "^0.1.15"
env_logger = "^0.3.2"
lazy_static = "0.2"
env_logger = "0.3"

View File

@@ -37,12 +37,13 @@ mod benchmark {
// Prevents resource exhaustion when benching
lazy_static! {
static ref HANDLE: Arc<Mutex<ServeHandle>> = {
let handle = serve("localhost:0", HelloServer, None).unwrap();
let handle = HelloServer.spawn("localhost:0").unwrap();
Arc::new(Mutex::new(handle))
};
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
let addr = HANDLE.lock().unwrap().local_addr().clone();
let client = AsyncClient::new(addr, None).unwrap();
let lock = HANDLE.lock().unwrap();
let dialer = lock.dialer();
let client = AsyncClient::new(dialer).unwrap();
Arc::new(Mutex::new(client))
};
}