49 Commits

Author SHA1 Message Date
Tim Kuehn
801f09e9e6 Bump version to v0.5.0 2016-04-24 19:18:42 -07:00
shaladdle
6ce3a3d943 Merge pull request #27 from tikue/listener
[Breaking] Add support for arbitrary transports.
2016-04-24 18:39:27 -07:00
Tim Kuehn
4d636d2882 Merge master into listener. 2016-04-24 18:28:30 -07:00
shaladdle
3693c95a67 Merge pull request #32 from tikue/update-releases
Updates to docs and pre-push
2016-04-24 17:57:28 -07:00
Tim
43a2df4a13 Make version of serde explicit in release notes 2016-04-24 17:56:41 -07:00
Tim Kuehn
166f1523d6 Update version in README 2016-04-24 17:52:08 -07:00
Tim Kuehn
1cc8cbcdc3 Update pre-push to use rustup in lieu of multirust, because rustup is #thefuture. 2016-04-24 17:48:29 -07:00
Tim Kuehn
9dafc704e9 Update RELEASES.md 2016-04-24 17:19:53 -07:00
shaladdle
32e0b0d7f8 Bump version to 0.4.0 2016-04-02 16:56:20 -07:00
shaladdle
b87c52758e Merge pull request #30 from tikue/update-serde
Update serde to 0.7
2016-04-02 15:57:18 -07:00
Tim Kuehn
9235e12904 Handle Serde(EndOfStream) error as ConnectionBroken 2016-04-02 15:33:56 -07:00
Tim Kuehn
265fe56fa6 Merge update-serde into master 2016-04-02 15:23:37 -07:00
Tim Kuehn
7b5b29a9c3 Update to serde 0.7 2016-04-02 15:18:24 -07:00
Tim Kuehn
709f4ab1ac Add spaces between items in impls. 2016-03-16 21:46:14 -07:00
Tim Kuehn
bbfb4325d2 Simplify readme example 2016-03-16 20:49:38 -07:00
Tim Kuehn
f33cb3bd53 Add a line between impl and struct 2016-03-16 20:46:23 -07:00
Tim Kuehn
6a6832cfbc Generify doc comment 2016-03-16 20:45:55 -07:00
Tim Kuehn
b0495ebc00 Cargo fmt 2016-03-16 20:43:36 -07:00
Tim Kuehn
aec1574824 Add a line in between struct and impl 2016-03-16 20:43:22 -07:00
Tim Kuehn
5d27d34bd3 Add a documentation note on addresses 2016-03-16 20:36:54 -07:00
shaladdle
fe978f2c56 Merge pull request #29 from tikue/rm-lazy-static
Remove unused dep
2016-03-02 21:36:24 -08:00
Tim Kuehn
44f472c65c Remove unused dep 2016-02-27 22:32:53 -08:00
Tim Kuehn
e995acd4c9 Merge branch 'master' into listener 2016-02-27 14:11:16 -08:00
Tim Kuehn
e8fcf0e4de Fix issue with grep exit status 2016-02-27 02:30:39 -08:00
Tim Kuehn
9dcd38c012 Merge branch 'master' into listener 2016-02-25 23:30:22 -08:00
Tim Kuehn
5ac4b710a5 Simplify lib.rs example 2016-02-25 23:30:00 -08:00
shaladdle
2eb0b2cc83 Merge pull request #28 from tikue/fix-pre-commit
Fix formatting pre-commit check
2016-02-25 23:24:18 -08:00
Tim Kuehn
72a9f8f70d Update deps versions. 2016-02-25 22:49:22 -08:00
Tim Kuehn
8e5a44b423 Update README to list arbitrary transports as a feature. 2016-02-25 22:28:39 -08:00
Tim Kuehn
714541a7a4 Don't unwrap in Listener::dialer 2016-02-25 01:05:38 -08:00
Tim Kuehn
a1f529f794 Reformat some generic bounds 2016-02-25 00:58:48 -08:00
Tim Kuehn
a8766a9200 Use rustfmt --write-mode=diff in lieu of hashes 2016-02-25 00:50:46 -08:00
Tim Kuehn
ef96c87226 Skip children when rustfmting in pre-commit 2016-02-25 00:50:37 -08:00
Tim Kuehn
3543b34f2b Fix formatting check.
* shasum suffixes the checksum with '- filename' so pipe in the text instead.
* rustfmt prefixes the formatting with 'Using rustfmt config file filename' so pipe in the text instead.
2016-02-25 00:50:29 -08:00
Tim Kuehn
6273ebefa7 rustfmt 2016-02-25 00:04:35 -08:00
Tim Kuehn
9827f75459 Fix examples 2016-02-24 23:33:03 -08:00
Tim Kuehn
c398e2389b Why were we wrapping the service in an arc? 2016-02-24 23:25:50 -08:00
Tim Kuehn
03dc512e25 Remove Addr associated type of Dialer.
Also, make spawn() take a Dialer, but impl Dialer for str, defaulting to TCP transport.
2016-02-24 21:59:21 -08:00
Tim Kuehn
8307c708a3 Better documentation for Stream.
Basically copied from TcpStream verbatim.
2016-02-24 20:32:15 -08:00
Tim Kuehn
774411c636 Create temp file using tempdir in test 2016-02-24 20:26:49 -08:00
Tim Kuehn
d5b2f23f74 Move generic bounds to where clause 2016-02-23 08:26:56 -08:00
Tim Kuehn
396aec3c2f Add a test 2016-02-23 01:53:20 -08:00
Tim Kuehn
28c6c333e5 Reorgnize modules 2016-02-23 01:13:11 -08:00
Tim Kuehn
2d1a77ec10 Merge branch 'master' of github.com:google/tarpc into listener 2016-02-23 00:09:53 -08:00
Tim Kuehn
a0e6147482 Make Tcp* default types 2016-02-23 00:07:03 -08:00
Tim Kuehn
fcdb0d9375 Add support for arbitrary transports.
Quite a bit of machinery added:
 * Listener trait
 * Dialer trait
 * Stream trait
 * Transport trait
2016-02-22 23:50:34 -08:00
Tim
4c1d15f8ea Merge pull request #26 from Bowbaq/master
Fix your typo
2016-02-20 18:50:25 -08:00
Maxime Bury
ece1cc60b9 Fix your typo 2016-02-20 18:43:31 -08:00
shaladdle
7d8a508379 Merge pull request #25 from tikue/fix-readme
Fix the README example. It broke again. :(
2016-02-20 01:33:29 -08:00
15 changed files with 467 additions and 149 deletions

View File

@@ -23,7 +23,7 @@ function then returns the value produced by that other server.
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.3.0" tarpc = "0.5.0"
``` ```
## Example ## Example
@@ -46,8 +46,9 @@ impl HelloService for HelloServer {
} }
fn main() { fn main() {
let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap(); let addr = "localhost:10000";
let client = hello_service::Client::new(server_handle.local_addr()).unwrap(); let server_handle = HelloServer.spawn(addr).unwrap();
let client = hello_service::Client::new(addr).unwrap();
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap()); assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
drop(client); drop(client);
server_handle.shutdown(); server_handle.shutdown();
@@ -56,19 +57,20 @@ fn main() {
The `service!` macro expands to a collection of items that collectively form an rpc service. In the The `service!` macro expands to a collection of items that collectively form an rpc service. In the
above example, the macro is called within the `hello_service` module. This module will contain a above example, the macro is called within the `hello_service` module. This module will contain a
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for `Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for
starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp starting the service: `spawn` and `spawn_with_config`, which start the service listening over an
port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated
easy and ergonomic to write servers without dealing with sockets or serialization directly. See the types make it easy and ergonomic to write servers without dealing with sockets or serialization
tarpc_examples package for more sophisticated examples. directly. See the tarpc_examples package for more sophisticated examples.
## Documentation ## Documentation
Use `cargo doc` as you normally would to see the documentation created for all Use `cargo doc` as you normally would to see the documentation created for all
items expanded by a `service!` invocation. items expanded by a `service!` invocation.
## Additional Features ## Additional Features
- Connect over any transport that `impl`s the `Transport` trait.
- Concurrent requests from a single client. - Concurrent requests from a single client.
- Any type that `impl`s `serde`'s Serialize` and `Deserialize` can be used in the rpc signatures. - Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait - Attributes can be specified on rpc methods. These will be included on both the `Service` trait
methods as well as on the `Client`'s stub methods. methods as well as on the `Client`'s stub methods.
- Just like regular fns, the return type can be left off when it's `-> ()`. - Just like regular fns, the return type can be left off when it's `-> ()`.

View File

@@ -1,4 +1,19 @@
## 1.3 (2016-02-20) ## 0.5 (2016-04-24)
0.5 adds support for arbitrary transports via the
[`Transport`](tarpc/src/transport/mod.rs#L7) trait.
Out of the box tarpc provides implementations for:
* Tcp, for types `impl`ing `ToSocketAddrs`.
* Unix sockets via the `UnixTransport` type.
## 0.4 (2016-04-02)
### Breaking Changes
* Updated to the latest version of serde, 0.7.0. Because tarpc exposes serde in
its API, this forces downstream code to update to the latest version of
serde, as well.
## 0.3 (2016-02-20)
### Breaking Changes ### Breaking Changes
* The timeout arg to `serve` was replaced with a `Config` struct, which * The timeout arg to `serve` was replaced with a `Config` struct, which

View File

@@ -92,9 +92,8 @@ FMTRESULT=0
for file in $(git diff --name-only --cached); for file in $(git diff --name-only --cached);
do do
if [ ${file: -3} == ".rs" ]; then if [ ${file: -3} == ".rs" ]; then
HASH=$(shasum $file) diff=$(rustfmt --skip-children --write-mode=diff $file)
NEW_HASH=$(rustfmt --write-mode=display $file | shasum) if grep --quiet "^Diff at line" <<< "$diff"; then
if [ "${HASH}" != "${NEW_HASH}" ]; then
FMTRESULT=1 FMTRESULT=1
fi fi
fi fi
@@ -105,6 +104,7 @@ if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then
elif [ ${FMTRESULT} != 0 ]; then elif [ ${FMTRESULT} != 0 ]; then
FAILED=1 FAILED=1
printf "${FAILURE}\n" printf "${FAILURE}\n"
echo "$diff" | sed '/Using rustfmt.*$/d'
else else
printf "${SUCCESS}\n" printf "${SUCCESS}\n"
fi fi

View File

@@ -8,7 +8,7 @@
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository # Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
# root. # root.
# #
# This hook runs tests to make sure only working code is being pushed. If present, multirust is used # This hook runs tests to make sure only working code is being pushed. If present, rustup is used
# to build and test the code on the appropriate toolchains. The working copy must not contain # to build and test the code on the appropriate toolchains. The working copy must not contain
# uncommitted changes, since the script currently just runs cargo build/test in the working copy. # uncommitted changes, since the script currently just runs cargo build/test in the working copy.
# #
@@ -67,7 +67,7 @@ run_cargo() {
fi fi
if [ "$3" != "" ]; then if [ "$3" != "" ]; then
printf "${PREFIX} $VERB $2 on $3 ... " printf "${PREFIX} $VERB $2 on $3 ... "
multirust run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
else else
printf "${PREFIX} $VERB $2 ... " printf "${PREFIX} $VERB $2 ... "
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
@@ -83,7 +83,7 @@ run_cargo() {
TOOLCHAIN_RESULT=0 TOOLCHAIN_RESULT=0
check_toolchain() { check_toolchain() {
printf "${PREFIX} Checking for $1 toolchain ... " printf "${PREFIX} Checking for $1 toolchain ... "
if [[ $(multirust list-toolchain) =~ $1 ]]; then if [[ $(rustup toolchain list) =~ $1 ]]; then
printf "${SUCCESS}\n" printf "${SUCCESS}\n"
else else
TOOLCHAIN_RESULT=1 TOOLCHAIN_RESULT=1
@@ -92,8 +92,8 @@ check_toolchain() {
fi fi
} }
printf "${PREFIX} Checking for multirust ... " printf "${PREFIX} Checking for rustup ... "
command -v multirust &>/dev/null command -v rustup &>/dev/null
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
printf "${SUCCESS}\n" printf "${SUCCESS}\n"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.3.0" version = "0.5.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
license = "MIT" license = "MIT"
documentation = "https://google.github.io/tarpc" documentation = "https://google.github.io/tarpc"
@@ -11,11 +11,13 @@ readme = "../README.md"
description = "An RPC framework for Rust with a focus on ease of use." description = "An RPC framework for Rust with a focus on ease of use."
[dependencies] [dependencies]
bincode = "^0.4.0" bincode = "0.5"
log = "^0.3.5" log = "0.3"
scoped-pool = "^0.1.5" scoped-pool = "0.1"
serde = "^0.6.14" serde = "0.7"
unix_socket = "0.5"
[dev-dependencies] [dev-dependencies]
lazy_static = "^0.1.15" lazy_static = "0.1"
env_logger = "^0.3.2" env_logger = "0.3"
tempdir = "0.3"

View File

@@ -30,14 +30,13 @@
//! } //! }
//! //!
//! fn main() { //! fn main() {
//! let addr = "127.0.0.1:9000"; //! let serve_handle = Server.spawn("localhost:0").unwrap();
//! let shutdown = Server.spawn(addr).unwrap(); //! let client = Client::new(serve_handle.dialer()).unwrap();
//! let client = Client::new(addr).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(), //! assert_eq!("Hello, Mom!".to_string(),
//! client.hello("Mom".to_string()).unwrap()); //! client.hello("Mom".to_string()).unwrap());
//! drop(client); //! drop(client);
//! shutdown.shutdown(); //! serve_handle.shutdown();
//! } //! }
//! ``` //! ```
@@ -48,6 +47,7 @@ extern crate bincode;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate scoped_pool; extern crate scoped_pool;
extern crate unix_socket;
macro_rules! pos { macro_rules! pos {
() => (concat!(file!(), ":", line!())) () => (concat!(file!(), ":", line!()))
@@ -60,4 +60,7 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs. /// Provides the macro used for constructing rpc services and client stubs.
pub mod macros; pub mod macros;
/// Provides transport traits and implementations.
pub mod transport;
pub use protocol::{Config, Error, Result, ServeHandle}; pub use protocol::{Config, Error, Result, ServeHandle};

View File

@@ -110,7 +110,7 @@ macro_rules! impl_serialize {
match *self { match *self {
$( $(
$impler::$name(ref field) => $impler::$name(ref field) =>
$crate::macros::serde::Serializer::visit_newtype_variant( $crate::macros::serde::Serializer::serialize_newtype_variant(
serializer, serializer,
stringify!($impler), stringify!($impler),
$n, $n,
@@ -165,11 +165,12 @@ macro_rules! impl_deserialize {
} }
)* )*
return ::std::result::Result::Err( return ::std::result::Result::Err(
$crate::macros::serde::de::Error::syntax("expected a field") $crate::macros::serde::de::Error::custom(
format!("No variants have a value of {}!", value))
); );
} }
} }
deserializer.visit_struct_field(__FieldVisitor) deserializer.deserialize_struct_field(__FieldVisitor)
} }
} }
@@ -197,7 +198,7 @@ macro_rules! impl_deserialize {
stringify!($name) stringify!($name)
),* ),*
]; ];
deserializer.visit_enum(stringify!($impler), VARIANTS, __Visitor) deserializer.deserialize_enum(stringify!($impler), VARIANTS, __Visitor)
} }
} }
); );
@@ -261,7 +262,7 @@ macro_rules! service {
#[doc(hidden)] #[doc(hidden)]
#[macro_export] #[macro_export]
macro_rules! service_inner { macro_rules! service_inner {
// Pattern for when the next rpc has an implicit unit return type // Pattern for when the next rpc has an implicit unit return type
( (
{ {
$(#[$attr:meta])* $(#[$attr:meta])*
@@ -280,7 +281,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> (); rpc $fn_name( $( $arg : $in_ ),* ) -> ();
} }
}; };
// Pattern for when the next rpc has an explicit return type // Pattern for when the next rpc has an explicit return type
( (
{ {
$(#[$attr:meta])* $(#[$attr:meta])*
@@ -299,7 +300,7 @@ macro_rules! service_inner {
rpc $fn_name( $( $arg : $in_ ),* ) -> $out; rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
} }
}; };
// Pattern when all return types have been expanded // Pattern when all return types have been expanded
( (
{ } // none left to expand { } // none left to expand
$( $(
@@ -315,21 +316,30 @@ macro_rules! service_inner {
)* )*
#[doc="Spawn a running service."] #[doc="Spawn a running service."]
fn spawn<A>(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle> fn spawn<T>(self,
where A: ::std::net::ToSocketAddrs, transport: T)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static, Self: 'static,
{ {
self.spawn_with_config(addr, $crate::Config::default()) self.spawn_with_config(transport, $crate::Config::default())
} }
#[doc="Spawn a running service."] #[doc="Spawn a running service."]
fn spawn_with_config<A>(self, addr: A, config: $crate::Config) fn spawn_with_config<T>(self,
-> $crate::Result<$crate::protocol::ServeHandle> transport: T,
where A: ::std::net::ToSocketAddrs, config: $crate::Config)
-> $crate::Result<
$crate::protocol::ServeHandle<
<T::Listener as $crate::transport::Listener>::Dialer>>
where T: $crate::transport::Transport,
Self: 'static, Self: 'static,
{ {
let server = ::std::sync::Arc::new(__Server(self)); let server = __Server(self);
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config)); let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
let handle = try!(result);
::std::result::Result::Ok(handle) ::std::result::Result::Ok(handle)
} }
} }
@@ -385,25 +395,29 @@ macro_rules! service_inner {
#[allow(unused)] #[allow(unused)]
#[doc="The client stub that makes RPC calls to the server."] #[doc="The client stub that makes RPC calls to the server."]
pub struct Client($crate::protocol::Client<__Request, __Reply>); pub struct Client<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl Client { impl<S> Client<S>
where S: $crate::transport::Stream
{
#[allow(unused)] #[allow(unused)]
#[doc="Create a new client with default configuration that connects to the given \ #[doc="Create a new client with default configuration that connects to the given \
address."] address."]
pub fn new<A>(addr: A) -> $crate::Result<Self> pub fn new<D>(dialer: D) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs, where D: $crate::transport::Dialer<Stream=S>,
{ {
Self::with_config(addr, $crate::Config::default()) Self::with_config(dialer, $crate::Config::default())
} }
#[allow(unused)] #[allow(unused)]
#[doc="Create a new client with the specified configuration that connects to the \ #[doc="Create a new client with the specified configuration that connects to the \
given address."] given address."]
pub fn with_config<A>(addr: A, config: $crate::Config) -> $crate::Result<Self> pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs, where D: $crate::transport::Dialer<Stream=S>,
{ {
let inner = try!($crate::protocol::Client::with_config(addr, config)); let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(Client(inner)) ::std::result::Result::Ok(Client(inner))
} }
@@ -424,25 +438,27 @@ macro_rules! service_inner {
#[allow(unused)] #[allow(unused)]
#[doc="The client stub that makes asynchronous RPC calls to the server."] #[doc="The client stub that makes asynchronous RPC calls to the server."]
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>); pub struct AsyncClient<S = ::std::net::TcpStream>(
$crate::protocol::Client<__Request, __Reply, S>
) where S: $crate::transport::Stream;
impl AsyncClient { impl<S> AsyncClient<S>
where S: $crate::transport::Stream {
#[allow(unused)] #[allow(unused)]
#[doc="Create a new asynchronous client with default configuration that connects to \ #[doc="Create a new asynchronous client with default configuration that connects to \
the given address."] the given address."]
pub fn new<A>(addr: A) -> $crate::Result<Self> pub fn new<D>(dialer: D) -> $crate::Result<Self>
where A: ::std::net::ToSocketAddrs, where D: $crate::transport::Dialer<Stream=S>,
{ {
Self::with_config(addr, $crate::Config::default()) Self::with_config(dialer, $crate::Config::default())
} }
#[allow(unused)] #[allow(unused)]
#[doc="Create a new asynchronous client that connects to the given address."] #[doc="Create a new asynchronous client that connects to the given address."]
pub fn with_config<A>(addr: A, config: $crate::Config) pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
-> $crate::Result<Self> where D: $crate::transport::Dialer<Stream=S>,
where A: ::std::net::ToSocketAddrs,
{ {
let inner = try!($crate::protocol::Client::with_config(addr, config)); let inner = try!($crate::protocol::Client::with_config(dialer, config));
::std::result::Result::Ok(AsyncClient(inner)) ::std::result::Result::Ok(AsyncClient(inner))
} }
@@ -462,7 +478,8 @@ macro_rules! service_inner {
} }
#[allow(unused)] #[allow(unused)]
struct __Server<S: 'static + Service>(S); struct __Server<S>(S)
where S: 'static + Service;
impl<S> $crate::protocol::Serve for __Server<S> impl<S> $crate::protocol::Serve for __Server<S>
where S: 'static + Service where S: 'static + Service
@@ -512,6 +529,8 @@ mod syntax_test {
#[cfg(test)] #[cfg(test)]
mod functional_test { mod functional_test {
extern crate env_logger; extern crate env_logger;
extern crate tempdir;
use transport::unix::UnixTransport;
service! { service! {
rpc add(x: i32, y: i32) -> i32; rpc add(x: i32, y: i32) -> i32;
@@ -533,7 +552,7 @@ mod functional_test {
fn simple() { fn simple() {
let _ = env_logger::init(); let _ = env_logger::init();
let handle = Server.spawn("localhost:0").unwrap(); let handle = Server.spawn("localhost:0").unwrap();
let client = Client::new(handle.local_addr()).unwrap(); let client = Client::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).unwrap()); assert_eq!(3, client.add(1, 2).unwrap());
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap()); assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
drop(client); drop(client);
@@ -544,7 +563,7 @@ mod functional_test {
fn simple_async() { fn simple_async() {
let _ = env_logger::init(); let _ = env_logger::init();
let handle = Server.spawn("localhost:0").unwrap(); let handle = Server.spawn("localhost:0").unwrap();
let client = AsyncClient::new(handle.local_addr()).unwrap(); let client = AsyncClient::new(handle.dialer()).unwrap();
assert_eq!(3, client.add(1, 2).get().unwrap()); assert_eq!(3, client.add(1, 2).get().unwrap());
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap()); assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
drop(client); drop(client);
@@ -554,7 +573,7 @@ mod functional_test {
#[test] #[test]
fn try_clone() { fn try_clone() {
let handle = Server.spawn("localhost:0").unwrap(); let handle = Server.spawn("localhost:0").unwrap();
let client1 = Client::new(handle.local_addr()).unwrap(); let client1 = Client::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap(); let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap()); assert_eq!(3, client1.add(1, 2).unwrap());
assert_eq!(3, client2.add(1, 2).unwrap()); assert_eq!(3, client2.add(1, 2).unwrap());
@@ -563,7 +582,19 @@ mod functional_test {
#[test] #[test]
fn async_try_clone() { fn async_try_clone() {
let handle = Server.spawn("localhost:0").unwrap(); let handle = Server.spawn("localhost:0").unwrap();
let client1 = AsyncClient::new(handle.local_addr()).unwrap(); let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap());
}
#[test]
fn async_try_clone_unix() {
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
let temp_file = temp_dir.path()
.join("async_try_clone_unix.tmp");
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
let client1 = AsyncClient::new(handle.dialer()).unwrap();
let client2 = client1.try_clone().unwrap(); let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).get().unwrap()); assert_eq!(3, client1.add(1, 2).get().unwrap());
assert_eq!(3, client2.add(1, 2).get().unwrap()); assert_eq!(3, client2.add(1, 2).get().unwrap());
@@ -575,6 +606,12 @@ mod functional_test {
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0"); let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
} }
// Tests that a tcp client can be created from &str
#[allow(dead_code)]
fn test_client_str() {
let _ = Client::new("localhost:0");
}
#[test] #[test]
fn serde() { fn serde() {
use bincode; use bincode;

View File

@@ -8,38 +8,44 @@ use std::fmt;
use std::io::{self, BufReader, BufWriter, Read}; use std::io::{self, BufReader, BufWriter, Read};
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel}; use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread; use std::thread;
use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Stream};
/// A client stub that connects to a server to run rpcs. /// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply> pub struct Client<Request, Reply, S>
where Request: serde::ser::Serialize where Request: serde::ser::Serialize,
S: Stream
{ {
// The guard is in an option so it can be joined in the drop fn // The guard is in an option so it can be joined in the drop fn
reader_guard: Arc<Option<thread::JoinHandle<()>>>, reader_guard: Arc<Option<thread::JoinHandle<()>>>,
outbound: Sender<(Request, Sender<Result<Reply>>)>, outbound: Sender<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>, requests: Arc<Mutex<RpcFutures<Reply>>>,
shutdown: TcpStream, shutdown: S,
} }
impl<Request, Reply> Client<Request, Reply> impl<Request, Reply, S> Client<Request, Reply, S>
where Request: serde::ser::Serialize + Send + 'static, where Request: serde::ser::Serialize + Send + 'static,
Reply: serde::de::Deserialize + Send + 'static Reply: serde::de::Deserialize + Send + 'static,
S: Stream
{ {
/// Create a new client that connects to `addr`. The client uses the given timeout /// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes. /// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> { pub fn new<D>(dialer: D) -> io::Result<Self>
Self::with_config(addr, Config::default()) where D: Dialer<Stream = S>
{
Self::with_config(dialer, Config::default())
} }
/// Create a new client that connects to `addr`. The client uses the given timeout /// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes. /// for both reads and writes.
pub fn with_config<A: ToSocketAddrs>(addr: A, config: Config) -> io::Result<Self> { pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
let stream = try!(TcpStream::connect(addr)); where D: Dialer<Stream = S>
{
let stream = try!(dialer.dial());
try!(stream.set_read_timeout(config.timeout)); try!(stream.set_read_timeout(config.timeout));
try!(stream.set_write_timeout(config.timeout)); try!(stream.set_write_timeout(config.timeout));
let reader_stream = try!(stream.try_clone()); let reader_stream = try!(stream.try_clone());
@@ -59,7 +65,7 @@ impl<Request, Reply> Client<Request, Reply>
} }
/// Clones the Client so that it can be shared across threads. /// Clones the Client so that it can be shared across threads.
pub fn try_clone(&self) -> io::Result<Client<Request, Reply>> { pub fn try_clone(&self) -> io::Result<Self> {
Ok(Client { Ok(Client {
reader_guard: self.reader_guard.clone(), reader_guard: self.reader_guard.clone(),
outbound: self.outbound.clone(), outbound: self.outbound.clone(),
@@ -97,14 +103,15 @@ impl<Request, Reply> Client<Request, Reply>
} }
} }
impl<Request, Reply> Drop for Client<Request, Reply> impl<Request, Reply, S> Drop for Client<Request, Reply, S>
where Request: serde::ser::Serialize where Request: serde::ser::Serialize,
S: Stream
{ {
fn drop(&mut self) { fn drop(&mut self) {
debug!("Dropping Client."); debug!("Dropping Client.");
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) { if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
debug!("Attempting to shut down writer and reader threads."); debug!("Attempting to shut down writer and reader threads.");
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) { if let Err(e) = self.shutdown.shutdown() {
warn!("Client: couldn't shutdown writer and reader threads: {:?}", warn!("Client: couldn't shutdown writer and reader threads: {:?}",
e); e);
} else { } else {
@@ -185,11 +192,12 @@ impl<Reply> RpcFutures<Reply> {
} }
} }
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>, fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>, requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream) stream: S)
where Request: serde::Serialize, where Request: serde::Serialize,
Reply: serde::Deserialize Reply: serde::Deserialize,
S: Stream
{ {
let mut next_id = 0; let mut next_id = 0;
let mut stream = BufWriter::new(stream); let mut stream = BufWriter::new(stream);
@@ -238,8 +246,9 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
} }
fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream) fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
where Reply: serde::Deserialize where Reply: serde::Deserialize,
S: Stream
{ {
let mut stream = BufReader::new(stream); let mut stream = BufReader::new(stream);
loop { loop {

View File

@@ -6,6 +6,7 @@
use bincode::{self, SizeLimit}; use bincode::{self, SizeLimit};
use bincode::serde::{deserialize_from, serialize_into}; use bincode::serde::{deserialize_from, serialize_into};
use serde; use serde;
use serde::de::value::Error::EndOfStream;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::convert; use std::convert;
use std::sync::Arc; use std::sync::Arc;
@@ -40,10 +41,14 @@ impl convert::From<bincode::serde::SerializeError> for Error {
impl convert::From<bincode::serde::DeserializeError> for Error { impl convert::From<bincode::serde::DeserializeError> for Error {
fn from(err: bincode::serde::DeserializeError) -> Error { fn from(err: bincode::serde::DeserializeError) -> Error {
match err { match err {
bincode::serde::DeserializeError::IoError(ref err) bincode::serde::DeserializeError::Serde(EndOfStream) => Error::ConnectionBroken,
if err.kind() == io::ErrorKind::ConnectionReset => Error::ConnectionBroken, bincode::serde::DeserializeError::IoError(err) => {
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken, match err.kind() {
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), io::ErrorKind::ConnectionReset |
io::ErrorKind::UnexpectedEof => Error::ConnectionBroken,
_ => Error::Io(Arc::new(err)),
}
}
err => panic!("Unexpected error during deserialization: {:?}", err), err => panic!("Unexpected error during deserialization: {:?}", err),
} }
} }
@@ -88,6 +93,7 @@ mod test {
extern crate env_logger; extern crate env_logger;
use super::{Client, Config, Serve}; use super::{Client, Config, Serve};
use scoped_pool::Pool; use scoped_pool::Pool;
use std::net::TcpStream;
use std::sync::{Arc, Barrier, Mutex}; use std::sync::{Arc, Barrier, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@@ -127,7 +133,7 @@ mod test {
let _ = env_logger::init(); let _ = env_logger::init();
let server = Arc::new(Server::new()); let server = Arc::new(Server::new());
let serve_handle = server.spawn("localhost:0").unwrap(); let serve_handle = server.spawn("localhost:0").unwrap();
let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap(); let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
drop(client); drop(client);
serve_handle.shutdown(); serve_handle.shutdown();
} }
@@ -137,9 +143,8 @@ mod test {
let _ = env_logger::init(); let _ = env_logger::init();
let server = Arc::new(Server::new()); let server = Arc::new(Server::new());
let serve_handle = server.clone().spawn("localhost:0").unwrap(); let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone();
// The explicit type is required so that it doesn't deserialize a u32 instead of u64 // The explicit type is required so that it doesn't deserialize a u32 instead of u64
let client: Client<(), u64> = Client::new(addr).unwrap(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
assert_eq!(0, client.rpc(()).unwrap()); assert_eq!(0, client.rpc(()).unwrap());
assert_eq!(1, server.count()); assert_eq!(1, server.count());
assert_eq!(1, client.rpc(()).unwrap()); assert_eq!(1, client.rpc(()).unwrap());
@@ -180,12 +185,9 @@ mod test {
let _ = env_logger::init(); let _ = env_logger::init();
let server = Arc::new(Server::new()); let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config("localhost:0", let serve_handle = server.spawn_with_config("localhost:0",
Config { Config { timeout: Some(Duration::new(0, 10)) })
timeout: Some(Duration::new(0, 10))
})
.unwrap(); .unwrap();
let addr = serve_handle.local_addr().clone(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
let thread = thread::spawn(move || serve_handle.shutdown()); let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(())); info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
thread.join().unwrap(); thread.join().unwrap();
@@ -196,12 +198,9 @@ mod test {
let _ = env_logger::init(); let _ = env_logger::init();
let server = Arc::new(Server::new()); let server = Arc::new(Server::new());
let serve_handle = server.spawn_with_config("localhost:0", let serve_handle = server.spawn_with_config("localhost:0",
Config { Config { timeout: test_timeout() })
timeout: test_timeout(),
})
.unwrap(); .unwrap();
let addr = serve_handle.local_addr().clone(); let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr).unwrap());
client.rpc(()).unwrap(); client.rpc(()).unwrap();
serve_handle.shutdown(); serve_handle.shutdown();
match client.rpc(()) { match client.rpc(()) {
@@ -218,8 +217,7 @@ mod test {
let pool = Pool::new(concurrency); let pool = Pool::new(concurrency);
let server = Arc::new(BarrierServer::new(concurrency)); let server = Arc::new(BarrierServer::new(concurrency));
let serve_handle = server.clone().spawn("localhost:0").unwrap(); let serve_handle = server.clone().spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
pool.scoped(|scope| { pool.scoped(|scope| {
for _ in 0..concurrency { for _ in 0..concurrency {
let client = client.try_clone().unwrap(); let client = client.try_clone().unwrap();
@@ -238,8 +236,7 @@ mod test {
let _ = env_logger::init(); let _ = env_logger::init();
let server = Arc::new(Server::new()); let server = Arc::new(Server::new());
let serve_handle = server.spawn("localhost:0").unwrap(); let serve_handle = server.spawn("localhost:0").unwrap();
let addr = serve_handle.local_addr().clone(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
let client: Client<(), u64> = Client::new(addr).unwrap();
// Drop future immediately; does the reader channel panic when sending? // Drop future immediately; does the reader channel panic when sending?
client.rpc_async(()); client.rpc_async(());

View File

@@ -19,11 +19,11 @@ impl<T: Serialize> Serialize for Packet<T> {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error> fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer where S: Serializer
{ {
serializer.visit_struct(PACKET, serializer.serialize_struct(PACKET,
MapVisitor { MapVisitor {
value: self, value: self,
state: 0, state: 0,
}) })
} }
} }
@@ -40,11 +40,11 @@ impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
match self.state { match self.state {
0 => { 0 => {
self.state += 1; self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(RPC_ID, &self.value.rpc_id)))) Ok(Some(try!(serializer.serialize_struct_elt(RPC_ID, &self.value.rpc_id))))
} }
1 => { 1 => {
self.state += 1; self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message)))) Ok(Some(try!(serializer.serialize_struct_elt(MESSAGE, &self.value.message))))
} }
_ => Ok(None), _ => Ok(None),
} }
@@ -62,7 +62,7 @@ impl<T: Deserialize> Deserialize for Packet<T> {
where D: Deserializer where D: Deserializer
{ {
const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE]; const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE];
deserializer.visit_struct(PACKET, FIELDS, Visitor(PhantomData)) deserializer.deserialize_struct(PACKET, FIELDS, Visitor(PhantomData))
} }
} }

View File

@@ -7,24 +7,27 @@ use serde;
use scoped_pool::{Pool, Scope}; use scoped_pool::{Pool, Scope};
use std::fmt; use std::fmt;
use std::io::{self, BufReader, BufWriter}; use std::io::{self, BufReader, BufWriter};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration; use std::time::Duration;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use super::{Config, Deserialize, Error, Packet, Result, Serialize}; use super::{Config, Deserialize, Error, Packet, Result, Serialize};
use transport::{Dialer, Listener, Stream, Transport};
use transport::tcp::TcpDialer;
struct ConnectionHandler<'a, S> struct ConnectionHandler<'a, S, St>
where S: Serve where S: Serve,
St: Stream
{ {
read_stream: BufReader<TcpStream>, read_stream: BufReader<St>,
write_stream: BufWriter<TcpStream>, write_stream: BufWriter<St>,
server: S, server: S,
shutdown: &'a AtomicBool, shutdown: &'a AtomicBool,
} }
impl<'a, S> ConnectionHandler<'a, S> impl<'a, S, St> ConnectionHandler<'a, S, St>
where S: Serve where S: Serve,
St: Stream
{ {
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> { fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
let ConnectionHandler { let ConnectionHandler {
@@ -83,7 +86,7 @@ impl<'a, S> ConnectionHandler<'a, S>
} }
} }
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<TcpStream>) { fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<St>) {
loop { loop {
match rx.recv() { match rx.recv() {
Err(e) => { Err(e) => {
@@ -101,21 +104,25 @@ impl<'a, S> ConnectionHandler<'a, S>
} }
/// Provides methods for blocking until the server completes, /// Provides methods for blocking until the server completes,
pub struct ServeHandle { pub struct ServeHandle<D = TcpDialer>
where D: Dialer
{
tx: Sender<()>, tx: Sender<()>,
join_handle: JoinHandle<()>, join_handle: JoinHandle<()>,
addr: SocketAddr, dialer: D,
} }
impl ServeHandle { impl<D> ServeHandle<D>
where D: Dialer
{
/// Block until the server completes /// Block until the server completes
pub fn wait(self) { pub fn wait(self) {
self.join_handle.join().expect(pos!()); self.join_handle.join().expect(pos!());
} }
/// Returns the address the server is bound to /// Returns the dialer to the server.
pub fn local_addr(&self) -> &SocketAddr { pub fn dialer(&self) -> &D {
&self.addr &self.dialer
} }
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not /// Shutdown the server. Gracefully shuts down the serve thread but currently does not
@@ -123,7 +130,7 @@ impl ServeHandle {
pub fn shutdown(self) { pub fn shutdown(self) {
info!("ServeHandle: attempting to shut down the server."); info!("ServeHandle: attempting to shut down the server.");
self.tx.send(()).expect(pos!()); self.tx.send(()).expect(pos!());
if let Ok(_) = TcpStream::connect(self.addr) { if let Ok(_) = self.dialer.dial() {
self.join_handle.join().expect(pos!()); self.join_handle.join().expect(pos!());
} else { } else {
warn!("ServeHandle: best effort shutdown of serve thread failed"); warn!("ServeHandle: best effort shutdown of serve thread failed");
@@ -131,16 +138,19 @@ impl ServeHandle {
} }
} }
struct Server<'a, S: 'a> { struct Server<'a, S: 'a, L>
where L: Listener
{
server: &'a S, server: &'a S,
listener: TcpListener, listener: L,
read_timeout: Option<Duration>, read_timeout: Option<Duration>,
die_rx: Receiver<()>, die_rx: Receiver<()>,
shutdown: &'a AtomicBool, shutdown: &'a AtomicBool,
} }
impl<'a, S: 'a> Server<'a, S> impl<'a, S, L> Server<'a, S, L>
where S: Serve + 'static where S: Serve + 'static,
L: Listener
{ {
fn serve<'b>(self, scope: &Scope<'b>) fn serve<'b>(self, scope: &Scope<'b>)
where 'a: 'b where 'a: 'b
@@ -194,7 +204,9 @@ impl<'a, S: 'a> Server<'a, S>
} }
} }
impl<'a, S> Drop for Server<'a, S> { impl<'a, S, L> Drop for Server<'a, S, L>
where L: Listener
{
fn drop(&mut self) { fn drop(&mut self) {
debug!("Shutting down connection handlers."); debug!("Shutting down connection handlers.");
self.shutdown.store(true, Ordering::SeqCst); self.shutdown.store(true, Ordering::SeqCst);
@@ -212,29 +224,33 @@ pub trait Serve: Send + Sync + Sized {
fn serve(&self, request: Self::Request) -> Self::Reply; fn serve(&self, request: Self::Request) -> Self::Reply;
/// spawn /// spawn
fn spawn<A>(self, addr: A) -> io::Result<ServeHandle> fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where A: ToSocketAddrs, where T: Transport,
Self: 'static, Self: 'static
{ {
self.spawn_with_config(addr, Config::default()) self.spawn_with_config(transport, Config::default())
} }
/// spawn /// spawn
fn spawn_with_config<A>(self, addr: A, config: Config) -> io::Result<ServeHandle> fn spawn_with_config<T>(self,
where A: ToSocketAddrs, transport: T,
Self: 'static, config: Config)
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
where T: Transport,
Self: 'static
{ {
let listener = try!(TcpListener::bind(&addr)); let listener = try!(transport.bind());
let addr = try!(listener.local_addr()); let dialer = try!(listener.dialer());
info!("spawn_with_config: spinning up server on {:?}", addr); info!("spawn_with_config: spinning up server.");
let (die_tx, die_rx) = channel(); let (die_tx, die_rx) = channel();
let timeout = config.timeout;
let join_handle = thread::spawn(move || { let join_handle = thread::spawn(move || {
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
let shutdown = AtomicBool::new(false); let shutdown = AtomicBool::new(false);
let server = Server { let server = Server {
server: &self, server: &self,
listener: listener, listener: listener,
read_timeout: config.timeout, read_timeout: timeout,
die_rx: die_rx, die_rx: die_rx,
shutdown: &shutdown, shutdown: &shutdown,
}; };
@@ -245,7 +261,7 @@ pub trait Serve: Send + Sync + Sized {
Ok(ServeHandle { Ok(ServeHandle {
tx: die_tx, tx: die_tx,
join_handle: join_handle, join_handle: join_handle,
addr: addr.clone(), dialer: dialer,
}) })
} }

View File

@@ -0,0 +1,91 @@
use std::io::{self, Read, Write};
use std::time::Duration;
/// A factory for creating a listener on a given address.
/// For TCP, an address might be an IPv4 address; for Unix sockets, it
/// is just a file name.
pub trait Transport {
/// The type of listener that binds to the given address.
type Listener: Listener;
/// Return a listener on the given address, and a dialer to that address.
fn bind(&self) -> io::Result<Self::Listener>;
}
/// Accepts incoming connections from dialers.
pub trait Listener: Send + 'static {
/// The type of address being listened on.
type Dialer: Dialer;
/// The type of stream this listener accepts.
type Stream: Stream;
/// Accept an incoming stream.
fn accept(&self) -> io::Result<Self::Stream>;
/// Returns the local address being listened on.
fn dialer(&self) -> io::Result<Self::Dialer>;
/// Iterate over incoming connections.
fn incoming(&self) -> Incoming<Self> {
Incoming { listener: self }
}
}
/// A cloneable Reader/Writer.
pub trait Stream: Read + Write + Send + Sized + 'static {
/// Creates a new independently owned handle to the Stream.
///
/// The returned Stream should reference the same stream that this
/// object references. Both handles should read and write the same
/// stream of data, and options set on one stream should be propagated
/// to the other stream.
fn try_clone(&self) -> io::Result<Self>;
/// Sets a read timeout.
///
/// If the value specified is `None`, then read calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Sets a write timeout.
///
/// If the value specified is `None`, then write calls will block indefinitely.
/// It is an error to pass the zero `Duration` to this method.
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
/// Shuts down both ends of the stream.
///
/// Implementations should cause all pending and future I/O on the specified
/// portions to return immediately with an appropriate value.
fn shutdown(&self) -> io::Result<()>;
}
/// A `Stream` factory.
pub trait Dialer {
/// The type of `Stream` this can create.
type Stream: Stream;
/// Open a stream.
fn dial(&self) -> io::Result<Self::Stream>;
}
impl<P, D: ?Sized> Dialer for P
where P: ::std::ops::Deref<Target = D>,
D: Dialer + 'static
{
type Stream = D::Stream;
fn dial(&self) -> io::Result<Self::Stream> {
(**self).dial()
}
}
/// Iterates over incoming connections.
pub struct Incoming<'a, L: Listener + ?Sized + 'a> {
listener: &'a L,
}
impl<'a, L: Listener> Iterator for Incoming<'a, L> {
type Item = io::Result<L::Stream>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.listener.accept())
}
}
/// Provides a TCP transport.
pub mod tcp;
/// Provides a unix socket transport.
pub mod unix;

View File

@@ -0,0 +1,75 @@
use std::io;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::time::Duration;
/// A transport for TCP.
pub struct TcpTransport<A: ToSocketAddrs>(pub A);
impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(&self.0)
}
}
impl<A: ToSocketAddrs> super::Transport for A {
type Listener = TcpListener;
fn bind(&self) -> io::Result<TcpListener> {
TcpListener::bind(self)
}
}
impl super::Listener for TcpListener {
type Dialer = TcpDialer<SocketAddr>;
type Stream = TcpStream;
fn accept(&self) -> io::Result<TcpStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<TcpDialer<SocketAddr>> {
self.local_addr().map(|addr| TcpDialer(addr))
}
}
impl super::Stream for TcpStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(dur)
}
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(dur)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}
/// Connects to a socket address.
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
impl<A> super::Dialer for TcpDialer<A>
where A: ToSocketAddrs
{
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(&self.0)
}
}
impl super::Dialer for str {
type Stream = TcpStream;
fn dial(&self) -> io::Result<TcpStream> {
TcpStream::connect(self)
}
}

View File

@@ -0,0 +1,70 @@
use std::io;
use std::path::{Path, PathBuf};
use std::time::Duration;
use unix_socket::{UnixListener, UnixStream};
/// A transport for unix sockets.
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
impl<P> super::Transport for UnixTransport<P>
where P: AsRef<Path>
{
type Listener = UnixListener;
fn bind(&self) -> io::Result<UnixListener> {
UnixListener::bind(&self.0)
}
}
/// Connects to a unix socket address.
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
impl<P> super::Dialer for UnixDialer<P>
where P: AsRef<Path>
{
type Stream = UnixStream;
fn dial(&self) -> io::Result<UnixStream> {
UnixStream::connect(&self.0)
}
}
impl super::Listener for UnixListener {
type Stream = UnixStream;
type Dialer = UnixDialer<PathBuf>;
fn accept(&self) -> io::Result<UnixStream> {
self.accept().map(|(stream, _)| stream)
}
fn dialer(&self) -> io::Result<UnixDialer<PathBuf>> {
self.local_addr().and_then(|addr| {
match addr.as_pathname() {
Some(path) => Ok(UnixDialer(path.to_owned())),
None => {
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
"Couldn't get a path to bound unix socket"))
}
}
})
}
}
impl super::Stream for UnixStream {
fn try_clone(&self) -> io::Result<Self> {
self.try_clone()
}
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(timeout)
}
fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.set_write_timeout(timeout)
}
fn shutdown(&self) -> io::Result<()> {
self.shutdown(::std::net::Shutdown::Both)
}
}

View File

@@ -41,8 +41,9 @@ mod benchmark {
Arc::new(Mutex::new(handle)) Arc::new(Mutex::new(handle))
}; };
static ref CLIENT: Arc<Mutex<AsyncClient>> = { static ref CLIENT: Arc<Mutex<AsyncClient>> = {
let addr = HANDLE.lock().unwrap().local_addr().clone(); let lock = HANDLE.lock().unwrap();
let client = AsyncClient::new(addr).unwrap(); let dialer = lock.dialer();
let client = AsyncClient::new(dialer).unwrap();
Arc::new(Mutex::new(client)) Arc::new(Mutex::new(client))
}; };
} }