mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-02-23 15:49:54 +01:00
Compare commits
49 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
801f09e9e6 | ||
|
|
6ce3a3d943 | ||
|
|
4d636d2882 | ||
|
|
3693c95a67 | ||
|
|
43a2df4a13 | ||
|
|
166f1523d6 | ||
|
|
1cc8cbcdc3 | ||
|
|
9dafc704e9 | ||
|
|
32e0b0d7f8 | ||
|
|
b87c52758e | ||
|
|
9235e12904 | ||
|
|
265fe56fa6 | ||
|
|
7b5b29a9c3 | ||
|
|
709f4ab1ac | ||
|
|
bbfb4325d2 | ||
|
|
f33cb3bd53 | ||
|
|
6a6832cfbc | ||
|
|
b0495ebc00 | ||
|
|
aec1574824 | ||
|
|
5d27d34bd3 | ||
|
|
fe978f2c56 | ||
|
|
44f472c65c | ||
|
|
e995acd4c9 | ||
|
|
e8fcf0e4de | ||
|
|
9dcd38c012 | ||
|
|
5ac4b710a5 | ||
|
|
2eb0b2cc83 | ||
|
|
72a9f8f70d | ||
|
|
8e5a44b423 | ||
|
|
714541a7a4 | ||
|
|
a1f529f794 | ||
|
|
a8766a9200 | ||
|
|
ef96c87226 | ||
|
|
3543b34f2b | ||
|
|
6273ebefa7 | ||
|
|
9827f75459 | ||
|
|
c398e2389b | ||
|
|
03dc512e25 | ||
|
|
8307c708a3 | ||
|
|
774411c636 | ||
|
|
d5b2f23f74 | ||
|
|
396aec3c2f | ||
|
|
28c6c333e5 | ||
|
|
2d1a77ec10 | ||
|
|
a0e6147482 | ||
|
|
fcdb0d9375 | ||
|
|
4c1d15f8ea | ||
|
|
ece1cc60b9 | ||
|
|
7d8a508379 |
20
README.md
20
README.md
@@ -23,7 +23,7 @@ function then returns the value produced by that other server.
|
|||||||
Add to your `Cargo.toml` dependencies:
|
Add to your `Cargo.toml` dependencies:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
tarpc = "0.3.0"
|
tarpc = "0.5.0"
|
||||||
```
|
```
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
@@ -46,8 +46,9 @@ impl HelloService for HelloServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let server_handle = HelloServer.spawn("0.0.0.0:0").unwrap();
|
let addr = "localhost:10000";
|
||||||
let client = hello_service::Client::new(server_handle.local_addr()).unwrap();
|
let server_handle = HelloServer.spawn(addr).unwrap();
|
||||||
|
let client = hello_service::Client::new(addr).unwrap();
|
||||||
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
|
assert_eq!("Hello, Mom!", client.hello("Mom".into()).unwrap());
|
||||||
drop(client);
|
drop(client);
|
||||||
server_handle.shutdown();
|
server_handle.shutdown();
|
||||||
@@ -56,19 +57,20 @@ fn main() {
|
|||||||
|
|
||||||
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
|
The `service!` macro expands to a collection of items that collectively form an rpc service. In the
|
||||||
above example, the macro is called within the `hello_service` module. This module will contain a
|
above example, the macro is called within the `hello_service` module. This module will contain a
|
||||||
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides `default fn`s for
|
`Client` (and `AsyncClient`) type, and a `Service` trait. The trait provides default `fn`s for
|
||||||
starting the service: `spawn` and `spawn_with_config`, which start the service listening on a tcp
|
starting the service: `spawn` and `spawn_with_config`, which start the service listening over an
|
||||||
port. A `Client` (or `AsyncClient`) can connect to such a service. These generated types make it
|
arbitrary transport. A `Client` (or `AsyncClient`) can connect to such a service. These generated
|
||||||
easy and ergonomic to write servers without dealing with sockets or serialization directly. See the
|
types make it easy and ergonomic to write servers without dealing with sockets or serialization
|
||||||
tarpc_examples package for more sophisticated examples.
|
directly. See the tarpc_examples package for more sophisticated examples.
|
||||||
|
|
||||||
## Documentation
|
## Documentation
|
||||||
Use `cargo doc` as you normally would to see the documentation created for all
|
Use `cargo doc` as you normally would to see the documentation created for all
|
||||||
items expanded by a `service!` invocation.
|
items expanded by a `service!` invocation.
|
||||||
|
|
||||||
## Additional Features
|
## Additional Features
|
||||||
|
- Connect over any transport that `impl`s the `Transport` trait.
|
||||||
- Concurrent requests from a single client.
|
- Concurrent requests from a single client.
|
||||||
- Any type that `impl`s `serde`'s Serialize` and `Deserialize` can be used in the rpc signatures.
|
- Any type that `impl`s `serde`'s `Serialize` and `Deserialize` can be used in the rpc signatures.
|
||||||
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
|
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait
|
||||||
methods as well as on the `Client`'s stub methods.
|
methods as well as on the `Client`'s stub methods.
|
||||||
- Just like regular fns, the return type can be left off when it's `-> ()`.
|
- Just like regular fns, the return type can be left off when it's `-> ()`.
|
||||||
|
|||||||
17
RELEASES.md
17
RELEASES.md
@@ -1,4 +1,19 @@
|
|||||||
## 1.3 (2016-02-20)
|
## 0.5 (2016-04-24)
|
||||||
|
0.5 adds support for arbitrary transports via the
|
||||||
|
[`Transport`](tarpc/src/transport/mod.rs#L7) trait.
|
||||||
|
Out of the box tarpc provides implementations for:
|
||||||
|
|
||||||
|
* Tcp, for types `impl`ing `ToSocketAddrs`.
|
||||||
|
* Unix sockets via the `UnixTransport` type.
|
||||||
|
|
||||||
|
## 0.4 (2016-04-02)
|
||||||
|
|
||||||
|
### Breaking Changes
|
||||||
|
* Updated to the latest version of serde, 0.7.0. Because tarpc exposes serde in
|
||||||
|
its API, this forces downstream code to update to the latest version of
|
||||||
|
serde, as well.
|
||||||
|
|
||||||
|
## 0.3 (2016-02-20)
|
||||||
|
|
||||||
### Breaking Changes
|
### Breaking Changes
|
||||||
* The timeout arg to `serve` was replaced with a `Config` struct, which
|
* The timeout arg to `serve` was replaced with a `Config` struct, which
|
||||||
|
|||||||
@@ -92,9 +92,8 @@ FMTRESULT=0
|
|||||||
for file in $(git diff --name-only --cached);
|
for file in $(git diff --name-only --cached);
|
||||||
do
|
do
|
||||||
if [ ${file: -3} == ".rs" ]; then
|
if [ ${file: -3} == ".rs" ]; then
|
||||||
HASH=$(shasum $file)
|
diff=$(rustfmt --skip-children --write-mode=diff $file)
|
||||||
NEW_HASH=$(rustfmt --write-mode=display $file | shasum)
|
if grep --quiet "^Diff at line" <<< "$diff"; then
|
||||||
if [ "${HASH}" != "${NEW_HASH}" ]; then
|
|
||||||
FMTRESULT=1
|
FMTRESULT=1
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
@@ -105,6 +104,7 @@ if [ "${TARPC_SKIP_RUSTFMT}" == 1 ]; then
|
|||||||
elif [ ${FMTRESULT} != 0 ]; then
|
elif [ ${FMTRESULT} != 0 ]; then
|
||||||
FAILED=1
|
FAILED=1
|
||||||
printf "${FAILURE}\n"
|
printf "${FAILURE}\n"
|
||||||
|
echo "$diff" | sed '/Using rustfmt.*$/d'
|
||||||
else
|
else
|
||||||
printf "${SUCCESS}\n"
|
printf "${SUCCESS}\n"
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
|
# Pre-push hook for the tarpc repository. To use this hook, copy it to .git/hooks in your repository
|
||||||
# root.
|
# root.
|
||||||
#
|
#
|
||||||
# This hook runs tests to make sure only working code is being pushed. If present, multirust is used
|
# This hook runs tests to make sure only working code is being pushed. If present, rustup is used
|
||||||
# to build and test the code on the appropriate toolchains. The working copy must not contain
|
# to build and test the code on the appropriate toolchains. The working copy must not contain
|
||||||
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
|
# uncommitted changes, since the script currently just runs cargo build/test in the working copy.
|
||||||
#
|
#
|
||||||
@@ -67,7 +67,7 @@ run_cargo() {
|
|||||||
fi
|
fi
|
||||||
if [ "$3" != "" ]; then
|
if [ "$3" != "" ]; then
|
||||||
printf "${PREFIX} $VERB $2 on $3 ... "
|
printf "${PREFIX} $VERB $2 on $3 ... "
|
||||||
multirust run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
rustup run $3 cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
||||||
else
|
else
|
||||||
printf "${PREFIX} $VERB $2 ... "
|
printf "${PREFIX} $VERB $2 ... "
|
||||||
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
cargo $1 --manifest-path $2/Cargo.toml &>/dev/null
|
||||||
@@ -83,7 +83,7 @@ run_cargo() {
|
|||||||
TOOLCHAIN_RESULT=0
|
TOOLCHAIN_RESULT=0
|
||||||
check_toolchain() {
|
check_toolchain() {
|
||||||
printf "${PREFIX} Checking for $1 toolchain ... "
|
printf "${PREFIX} Checking for $1 toolchain ... "
|
||||||
if [[ $(multirust list-toolchain) =~ $1 ]]; then
|
if [[ $(rustup toolchain list) =~ $1 ]]; then
|
||||||
printf "${SUCCESS}\n"
|
printf "${SUCCESS}\n"
|
||||||
else
|
else
|
||||||
TOOLCHAIN_RESULT=1
|
TOOLCHAIN_RESULT=1
|
||||||
@@ -92,8 +92,8 @@ check_toolchain() {
|
|||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
printf "${PREFIX} Checking for multirust ... "
|
printf "${PREFIX} Checking for rustup ... "
|
||||||
command -v multirust &>/dev/null
|
command -v rustup &>/dev/null
|
||||||
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
|
if [ "$?" == 0 ] && [ "${TARPC_USE_CURRENT_TOOLCHAIN}" == "" ]; then
|
||||||
printf "${SUCCESS}\n"
|
printf "${SUCCESS}\n"
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "tarpc"
|
name = "tarpc"
|
||||||
version = "0.3.0"
|
version = "0.5.0"
|
||||||
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
documentation = "https://google.github.io/tarpc"
|
documentation = "https://google.github.io/tarpc"
|
||||||
@@ -11,11 +11,13 @@ readme = "../README.md"
|
|||||||
description = "An RPC framework for Rust with a focus on ease of use."
|
description = "An RPC framework for Rust with a focus on ease of use."
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bincode = "^0.4.0"
|
bincode = "0.5"
|
||||||
log = "^0.3.5"
|
log = "0.3"
|
||||||
scoped-pool = "^0.1.5"
|
scoped-pool = "0.1"
|
||||||
serde = "^0.6.14"
|
serde = "0.7"
|
||||||
|
unix_socket = "0.5"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
lazy_static = "^0.1.15"
|
lazy_static = "0.1"
|
||||||
env_logger = "^0.3.2"
|
env_logger = "0.3"
|
||||||
|
tempdir = "0.3"
|
||||||
|
|||||||
@@ -30,14 +30,13 @@
|
|||||||
//! }
|
//! }
|
||||||
//!
|
//!
|
||||||
//! fn main() {
|
//! fn main() {
|
||||||
//! let addr = "127.0.0.1:9000";
|
//! let serve_handle = Server.spawn("localhost:0").unwrap();
|
||||||
//! let shutdown = Server.spawn(addr).unwrap();
|
//! let client = Client::new(serve_handle.dialer()).unwrap();
|
||||||
//! let client = Client::new(addr).unwrap();
|
|
||||||
//! assert_eq!(3, client.add(1, 2).unwrap());
|
//! assert_eq!(3, client.add(1, 2).unwrap());
|
||||||
//! assert_eq!("Hello, Mom!".to_string(),
|
//! assert_eq!("Hello, Mom!".to_string(),
|
||||||
//! client.hello("Mom".to_string()).unwrap());
|
//! client.hello("Mom".to_string()).unwrap());
|
||||||
//! drop(client);
|
//! drop(client);
|
||||||
//! shutdown.shutdown();
|
//! serve_handle.shutdown();
|
||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
@@ -48,6 +47,7 @@ extern crate bincode;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
extern crate scoped_pool;
|
extern crate scoped_pool;
|
||||||
|
extern crate unix_socket;
|
||||||
|
|
||||||
macro_rules! pos {
|
macro_rules! pos {
|
||||||
() => (concat!(file!(), ":", line!()))
|
() => (concat!(file!(), ":", line!()))
|
||||||
@@ -60,4 +60,7 @@ pub mod protocol;
|
|||||||
/// Provides the macro used for constructing rpc services and client stubs.
|
/// Provides the macro used for constructing rpc services and client stubs.
|
||||||
pub mod macros;
|
pub mod macros;
|
||||||
|
|
||||||
|
/// Provides transport traits and implementations.
|
||||||
|
pub mod transport;
|
||||||
|
|
||||||
pub use protocol::{Config, Error, Result, ServeHandle};
|
pub use protocol::{Config, Error, Result, ServeHandle};
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ macro_rules! impl_serialize {
|
|||||||
match *self {
|
match *self {
|
||||||
$(
|
$(
|
||||||
$impler::$name(ref field) =>
|
$impler::$name(ref field) =>
|
||||||
$crate::macros::serde::Serializer::visit_newtype_variant(
|
$crate::macros::serde::Serializer::serialize_newtype_variant(
|
||||||
serializer,
|
serializer,
|
||||||
stringify!($impler),
|
stringify!($impler),
|
||||||
$n,
|
$n,
|
||||||
@@ -165,11 +165,12 @@ macro_rules! impl_deserialize {
|
|||||||
}
|
}
|
||||||
)*
|
)*
|
||||||
return ::std::result::Result::Err(
|
return ::std::result::Result::Err(
|
||||||
$crate::macros::serde::de::Error::syntax("expected a field")
|
$crate::macros::serde::de::Error::custom(
|
||||||
|
format!("No variants have a value of {}!", value))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deserializer.visit_struct_field(__FieldVisitor)
|
deserializer.deserialize_struct_field(__FieldVisitor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,7 +198,7 @@ macro_rules! impl_deserialize {
|
|||||||
stringify!($name)
|
stringify!($name)
|
||||||
),*
|
),*
|
||||||
];
|
];
|
||||||
deserializer.visit_enum(stringify!($impler), VARIANTS, __Visitor)
|
deserializer.deserialize_enum(stringify!($impler), VARIANTS, __Visitor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@@ -261,7 +262,7 @@ macro_rules! service {
|
|||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! service_inner {
|
macro_rules! service_inner {
|
||||||
// Pattern for when the next rpc has an implicit unit return type
|
// Pattern for when the next rpc has an implicit unit return type
|
||||||
(
|
(
|
||||||
{
|
{
|
||||||
$(#[$attr:meta])*
|
$(#[$attr:meta])*
|
||||||
@@ -280,7 +281,7 @@ macro_rules! service_inner {
|
|||||||
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
|
rpc $fn_name( $( $arg : $in_ ),* ) -> ();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Pattern for when the next rpc has an explicit return type
|
// Pattern for when the next rpc has an explicit return type
|
||||||
(
|
(
|
||||||
{
|
{
|
||||||
$(#[$attr:meta])*
|
$(#[$attr:meta])*
|
||||||
@@ -299,7 +300,7 @@ macro_rules! service_inner {
|
|||||||
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
|
rpc $fn_name( $( $arg : $in_ ),* ) -> $out;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Pattern when all return types have been expanded
|
// Pattern when all return types have been expanded
|
||||||
(
|
(
|
||||||
{ } // none left to expand
|
{ } // none left to expand
|
||||||
$(
|
$(
|
||||||
@@ -315,21 +316,30 @@ macro_rules! service_inner {
|
|||||||
)*
|
)*
|
||||||
|
|
||||||
#[doc="Spawn a running service."]
|
#[doc="Spawn a running service."]
|
||||||
fn spawn<A>(self, addr: A) -> $crate::Result<$crate::protocol::ServeHandle>
|
fn spawn<T>(self,
|
||||||
where A: ::std::net::ToSocketAddrs,
|
transport: T)
|
||||||
|
-> $crate::Result<
|
||||||
|
$crate::protocol::ServeHandle<
|
||||||
|
<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||||
|
where T: $crate::transport::Transport,
|
||||||
Self: 'static,
|
Self: 'static,
|
||||||
{
|
{
|
||||||
self.spawn_with_config(addr, $crate::Config::default())
|
self.spawn_with_config(transport, $crate::Config::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc="Spawn a running service."]
|
#[doc="Spawn a running service."]
|
||||||
fn spawn_with_config<A>(self, addr: A, config: $crate::Config)
|
fn spawn_with_config<T>(self,
|
||||||
-> $crate::Result<$crate::protocol::ServeHandle>
|
transport: T,
|
||||||
where A: ::std::net::ToSocketAddrs,
|
config: $crate::Config)
|
||||||
|
-> $crate::Result<
|
||||||
|
$crate::protocol::ServeHandle<
|
||||||
|
<T::Listener as $crate::transport::Listener>::Dialer>>
|
||||||
|
where T: $crate::transport::Transport,
|
||||||
Self: 'static,
|
Self: 'static,
|
||||||
{
|
{
|
||||||
let server = ::std::sync::Arc::new(__Server(self));
|
let server = __Server(self);
|
||||||
let handle = try!($crate::protocol::Serve::spawn_with_config(server, addr, config));
|
let result = $crate::protocol::Serve::spawn_with_config(server, transport, config);
|
||||||
|
let handle = try!(result);
|
||||||
::std::result::Result::Ok(handle)
|
::std::result::Result::Ok(handle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -385,25 +395,29 @@ macro_rules! service_inner {
|
|||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="The client stub that makes RPC calls to the server."]
|
#[doc="The client stub that makes RPC calls to the server."]
|
||||||
pub struct Client($crate::protocol::Client<__Request, __Reply>);
|
pub struct Client<S = ::std::net::TcpStream>(
|
||||||
|
$crate::protocol::Client<__Request, __Reply, S>
|
||||||
|
) where S: $crate::transport::Stream;
|
||||||
|
|
||||||
impl Client {
|
impl<S> Client<S>
|
||||||
|
where S: $crate::transport::Stream
|
||||||
|
{
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="Create a new client with default configuration that connects to the given \
|
#[doc="Create a new client with default configuration that connects to the given \
|
||||||
address."]
|
address."]
|
||||||
pub fn new<A>(addr: A) -> $crate::Result<Self>
|
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||||
where A: ::std::net::ToSocketAddrs,
|
where D: $crate::transport::Dialer<Stream=S>,
|
||||||
{
|
{
|
||||||
Self::with_config(addr, $crate::Config::default())
|
Self::with_config(dialer, $crate::Config::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="Create a new client with the specified configuration that connects to the \
|
#[doc="Create a new client with the specified configuration that connects to the \
|
||||||
given address."]
|
given address."]
|
||||||
pub fn with_config<A>(addr: A, config: $crate::Config) -> $crate::Result<Self>
|
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
|
||||||
where A: ::std::net::ToSocketAddrs,
|
where D: $crate::transport::Dialer<Stream=S>,
|
||||||
{
|
{
|
||||||
let inner = try!($crate::protocol::Client::with_config(addr, config));
|
let inner = try!($crate::protocol::Client::with_config(dialer, config));
|
||||||
::std::result::Result::Ok(Client(inner))
|
::std::result::Result::Ok(Client(inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -424,25 +438,27 @@ macro_rules! service_inner {
|
|||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="The client stub that makes asynchronous RPC calls to the server."]
|
#[doc="The client stub that makes asynchronous RPC calls to the server."]
|
||||||
pub struct AsyncClient($crate::protocol::Client<__Request, __Reply>);
|
pub struct AsyncClient<S = ::std::net::TcpStream>(
|
||||||
|
$crate::protocol::Client<__Request, __Reply, S>
|
||||||
|
) where S: $crate::transport::Stream;
|
||||||
|
|
||||||
impl AsyncClient {
|
impl<S> AsyncClient<S>
|
||||||
|
where S: $crate::transport::Stream {
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="Create a new asynchronous client with default configuration that connects to \
|
#[doc="Create a new asynchronous client with default configuration that connects to \
|
||||||
the given address."]
|
the given address."]
|
||||||
pub fn new<A>(addr: A) -> $crate::Result<Self>
|
pub fn new<D>(dialer: D) -> $crate::Result<Self>
|
||||||
where A: ::std::net::ToSocketAddrs,
|
where D: $crate::transport::Dialer<Stream=S>,
|
||||||
{
|
{
|
||||||
Self::with_config(addr, $crate::Config::default())
|
Self::with_config(dialer, $crate::Config::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
#[doc="Create a new asynchronous client that connects to the given address."]
|
#[doc="Create a new asynchronous client that connects to the given address."]
|
||||||
pub fn with_config<A>(addr: A, config: $crate::Config)
|
pub fn with_config<D>(dialer: D, config: $crate::Config) -> $crate::Result<Self>
|
||||||
-> $crate::Result<Self>
|
where D: $crate::transport::Dialer<Stream=S>,
|
||||||
where A: ::std::net::ToSocketAddrs,
|
|
||||||
{
|
{
|
||||||
let inner = try!($crate::protocol::Client::with_config(addr, config));
|
let inner = try!($crate::protocol::Client::with_config(dialer, config));
|
||||||
::std::result::Result::Ok(AsyncClient(inner))
|
::std::result::Result::Ok(AsyncClient(inner))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -462,7 +478,8 @@ macro_rules! service_inner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
struct __Server<S: 'static + Service>(S);
|
struct __Server<S>(S)
|
||||||
|
where S: 'static + Service;
|
||||||
|
|
||||||
impl<S> $crate::protocol::Serve for __Server<S>
|
impl<S> $crate::protocol::Serve for __Server<S>
|
||||||
where S: 'static + Service
|
where S: 'static + Service
|
||||||
@@ -512,6 +529,8 @@ mod syntax_test {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod functional_test {
|
mod functional_test {
|
||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
|
extern crate tempdir;
|
||||||
|
use transport::unix::UnixTransport;
|
||||||
|
|
||||||
service! {
|
service! {
|
||||||
rpc add(x: i32, y: i32) -> i32;
|
rpc add(x: i32, y: i32) -> i32;
|
||||||
@@ -533,7 +552,7 @@ mod functional_test {
|
|||||||
fn simple() {
|
fn simple() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.spawn("localhost:0").unwrap();
|
let handle = Server.spawn("localhost:0").unwrap();
|
||||||
let client = Client::new(handle.local_addr()).unwrap();
|
let client = Client::new(handle.dialer()).unwrap();
|
||||||
assert_eq!(3, client.add(1, 2).unwrap());
|
assert_eq!(3, client.add(1, 2).unwrap());
|
||||||
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
|
assert_eq!("Hey, Tim.", client.hey("Tim".into()).unwrap());
|
||||||
drop(client);
|
drop(client);
|
||||||
@@ -544,7 +563,7 @@ mod functional_test {
|
|||||||
fn simple_async() {
|
fn simple_async() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let handle = Server.spawn("localhost:0").unwrap();
|
let handle = Server.spawn("localhost:0").unwrap();
|
||||||
let client = AsyncClient::new(handle.local_addr()).unwrap();
|
let client = AsyncClient::new(handle.dialer()).unwrap();
|
||||||
assert_eq!(3, client.add(1, 2).get().unwrap());
|
assert_eq!(3, client.add(1, 2).get().unwrap());
|
||||||
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
|
assert_eq!("Hey, Adam.", client.hey("Adam".into()).get().unwrap());
|
||||||
drop(client);
|
drop(client);
|
||||||
@@ -554,7 +573,7 @@ mod functional_test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn try_clone() {
|
fn try_clone() {
|
||||||
let handle = Server.spawn("localhost:0").unwrap();
|
let handle = Server.spawn("localhost:0").unwrap();
|
||||||
let client1 = Client::new(handle.local_addr()).unwrap();
|
let client1 = Client::new(handle.dialer()).unwrap();
|
||||||
let client2 = client1.try_clone().unwrap();
|
let client2 = client1.try_clone().unwrap();
|
||||||
assert_eq!(3, client1.add(1, 2).unwrap());
|
assert_eq!(3, client1.add(1, 2).unwrap());
|
||||||
assert_eq!(3, client2.add(1, 2).unwrap());
|
assert_eq!(3, client2.add(1, 2).unwrap());
|
||||||
@@ -563,7 +582,19 @@ mod functional_test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn async_try_clone() {
|
fn async_try_clone() {
|
||||||
let handle = Server.spawn("localhost:0").unwrap();
|
let handle = Server.spawn("localhost:0").unwrap();
|
||||||
let client1 = AsyncClient::new(handle.local_addr()).unwrap();
|
let client1 = AsyncClient::new(handle.dialer()).unwrap();
|
||||||
|
let client2 = client1.try_clone().unwrap();
|
||||||
|
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
||||||
|
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn async_try_clone_unix() {
|
||||||
|
let temp_dir = tempdir::TempDir::new("tarpc").unwrap();
|
||||||
|
let temp_file = temp_dir.path()
|
||||||
|
.join("async_try_clone_unix.tmp");
|
||||||
|
let handle = Server.spawn(UnixTransport(temp_file)).unwrap();
|
||||||
|
let client1 = AsyncClient::new(handle.dialer()).unwrap();
|
||||||
let client2 = client1.try_clone().unwrap();
|
let client2 = client1.try_clone().unwrap();
|
||||||
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
assert_eq!(3, client1.add(1, 2).get().unwrap());
|
||||||
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
assert_eq!(3, client2.add(1, 2).get().unwrap());
|
||||||
@@ -575,6 +606,12 @@ mod functional_test {
|
|||||||
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
|
let _ = ::std::sync::Arc::new(Server).spawn("localhost:0");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that a tcp client can be created from &str
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn test_client_str() {
|
||||||
|
let _ = Client::new("localhost:0");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serde() {
|
fn serde() {
|
||||||
use bincode;
|
use bincode;
|
||||||
|
|||||||
@@ -8,38 +8,44 @@ use std::fmt;
|
|||||||
use std::io::{self, BufReader, BufWriter, Read};
|
use std::io::{self, BufReader, BufWriter, Read};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::net::{TcpStream, ToSocketAddrs};
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::sync::mpsc::{Receiver, Sender, channel};
|
use std::sync::mpsc::{Receiver, Sender, channel};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||||
|
use transport::{Dialer, Stream};
|
||||||
|
|
||||||
/// A client stub that connects to a server to run rpcs.
|
/// A client stub that connects to a server to run rpcs.
|
||||||
pub struct Client<Request, Reply>
|
pub struct Client<Request, Reply, S>
|
||||||
where Request: serde::ser::Serialize
|
where Request: serde::ser::Serialize,
|
||||||
|
S: Stream
|
||||||
{
|
{
|
||||||
// The guard is in an option so it can be joined in the drop fn
|
// The guard is in an option so it can be joined in the drop fn
|
||||||
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
|
reader_guard: Arc<Option<thread::JoinHandle<()>>>,
|
||||||
outbound: Sender<(Request, Sender<Result<Reply>>)>,
|
outbound: Sender<(Request, Sender<Result<Reply>>)>,
|
||||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||||
shutdown: TcpStream,
|
shutdown: S,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Request, Reply> Client<Request, Reply>
|
impl<Request, Reply, S> Client<Request, Reply, S>
|
||||||
where Request: serde::ser::Serialize + Send + 'static,
|
where Request: serde::ser::Serialize + Send + 'static,
|
||||||
Reply: serde::de::Deserialize + Send + 'static
|
Reply: serde::de::Deserialize + Send + 'static,
|
||||||
|
S: Stream
|
||||||
{
|
{
|
||||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||||
/// for both reads and writes.
|
/// for both reads and writes.
|
||||||
pub fn new<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
|
pub fn new<D>(dialer: D) -> io::Result<Self>
|
||||||
Self::with_config(addr, Config::default())
|
where D: Dialer<Stream = S>
|
||||||
|
{
|
||||||
|
Self::with_config(dialer, Config::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new client that connects to `addr`. The client uses the given timeout
|
/// Create a new client that connects to `addr`. The client uses the given timeout
|
||||||
/// for both reads and writes.
|
/// for both reads and writes.
|
||||||
pub fn with_config<A: ToSocketAddrs>(addr: A, config: Config) -> io::Result<Self> {
|
pub fn with_config<D>(dialer: D, config: Config) -> io::Result<Self>
|
||||||
let stream = try!(TcpStream::connect(addr));
|
where D: Dialer<Stream = S>
|
||||||
|
{
|
||||||
|
let stream = try!(dialer.dial());
|
||||||
try!(stream.set_read_timeout(config.timeout));
|
try!(stream.set_read_timeout(config.timeout));
|
||||||
try!(stream.set_write_timeout(config.timeout));
|
try!(stream.set_write_timeout(config.timeout));
|
||||||
let reader_stream = try!(stream.try_clone());
|
let reader_stream = try!(stream.try_clone());
|
||||||
@@ -59,7 +65,7 @@ impl<Request, Reply> Client<Request, Reply>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Clones the Client so that it can be shared across threads.
|
/// Clones the Client so that it can be shared across threads.
|
||||||
pub fn try_clone(&self) -> io::Result<Client<Request, Reply>> {
|
pub fn try_clone(&self) -> io::Result<Self> {
|
||||||
Ok(Client {
|
Ok(Client {
|
||||||
reader_guard: self.reader_guard.clone(),
|
reader_guard: self.reader_guard.clone(),
|
||||||
outbound: self.outbound.clone(),
|
outbound: self.outbound.clone(),
|
||||||
@@ -97,14 +103,15 @@ impl<Request, Reply> Client<Request, Reply>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Request, Reply> Drop for Client<Request, Reply>
|
impl<Request, Reply, S> Drop for Client<Request, Reply, S>
|
||||||
where Request: serde::ser::Serialize
|
where Request: serde::ser::Serialize,
|
||||||
|
S: Stream
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
debug!("Dropping Client.");
|
debug!("Dropping Client.");
|
||||||
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
|
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
|
||||||
debug!("Attempting to shut down writer and reader threads.");
|
debug!("Attempting to shut down writer and reader threads.");
|
||||||
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
|
if let Err(e) = self.shutdown.shutdown() {
|
||||||
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
|
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
|
||||||
e);
|
e);
|
||||||
} else {
|
} else {
|
||||||
@@ -185,11 +192,12 @@ impl<Reply> RpcFutures<Reply> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
fn write<Request, Reply, S>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||||
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
requests: Arc<Mutex<RpcFutures<Reply>>>,
|
||||||
stream: TcpStream)
|
stream: S)
|
||||||
where Request: serde::Serialize,
|
where Request: serde::Serialize,
|
||||||
Reply: serde::Deserialize
|
Reply: serde::Deserialize,
|
||||||
|
S: Stream
|
||||||
{
|
{
|
||||||
let mut next_id = 0;
|
let mut next_id = 0;
|
||||||
let mut stream = BufWriter::new(stream);
|
let mut stream = BufWriter::new(stream);
|
||||||
@@ -238,8 +246,9 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
|
fn read<Reply, S>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: S)
|
||||||
where Reply: serde::Deserialize
|
where Reply: serde::Deserialize,
|
||||||
|
S: Stream
|
||||||
{
|
{
|
||||||
let mut stream = BufReader::new(stream);
|
let mut stream = BufReader::new(stream);
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
use bincode::{self, SizeLimit};
|
use bincode::{self, SizeLimit};
|
||||||
use bincode::serde::{deserialize_from, serialize_into};
|
use bincode::serde::{deserialize_from, serialize_into};
|
||||||
use serde;
|
use serde;
|
||||||
|
use serde::de::value::Error::EndOfStream;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::convert;
|
use std::convert;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -40,10 +41,14 @@ impl convert::From<bincode::serde::SerializeError> for Error {
|
|||||||
impl convert::From<bincode::serde::DeserializeError> for Error {
|
impl convert::From<bincode::serde::DeserializeError> for Error {
|
||||||
fn from(err: bincode::serde::DeserializeError) -> Error {
|
fn from(err: bincode::serde::DeserializeError) -> Error {
|
||||||
match err {
|
match err {
|
||||||
bincode::serde::DeserializeError::IoError(ref err)
|
bincode::serde::DeserializeError::Serde(EndOfStream) => Error::ConnectionBroken,
|
||||||
if err.kind() == io::ErrorKind::ConnectionReset => Error::ConnectionBroken,
|
bincode::serde::DeserializeError::IoError(err) => {
|
||||||
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken,
|
match err.kind() {
|
||||||
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)),
|
io::ErrorKind::ConnectionReset |
|
||||||
|
io::ErrorKind::UnexpectedEof => Error::ConnectionBroken,
|
||||||
|
_ => Error::Io(Arc::new(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
err => panic!("Unexpected error during deserialization: {:?}", err),
|
err => panic!("Unexpected error during deserialization: {:?}", err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -88,6 +93,7 @@ mod test {
|
|||||||
extern crate env_logger;
|
extern crate env_logger;
|
||||||
use super::{Client, Config, Serve};
|
use super::{Client, Config, Serve};
|
||||||
use scoped_pool::Pool;
|
use scoped_pool::Pool;
|
||||||
|
use std::net::TcpStream;
|
||||||
use std::sync::{Arc, Barrier, Mutex};
|
use std::sync::{Arc, Barrier, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -127,7 +133,7 @@ mod test {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let server = Arc::new(Server::new());
|
let server = Arc::new(Server::new());
|
||||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||||
let client: Client<(), u64> = Client::new(serve_handle.local_addr()).unwrap();
|
let client: Client<(), u64, TcpStream> = Client::new(serve_handle.dialer()).unwrap();
|
||||||
drop(client);
|
drop(client);
|
||||||
serve_handle.shutdown();
|
serve_handle.shutdown();
|
||||||
}
|
}
|
||||||
@@ -137,9 +143,8 @@ mod test {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let server = Arc::new(Server::new());
|
let server = Arc::new(Server::new());
|
||||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||||
let addr = serve_handle.local_addr().clone();
|
|
||||||
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
|
// The explicit type is required so that it doesn't deserialize a u32 instead of u64
|
||||||
let client: Client<(), u64> = Client::new(addr).unwrap();
|
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||||
assert_eq!(0, client.rpc(()).unwrap());
|
assert_eq!(0, client.rpc(()).unwrap());
|
||||||
assert_eq!(1, server.count());
|
assert_eq!(1, server.count());
|
||||||
assert_eq!(1, client.rpc(()).unwrap());
|
assert_eq!(1, client.rpc(()).unwrap());
|
||||||
@@ -180,12 +185,9 @@ mod test {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let server = Arc::new(Server::new());
|
let server = Arc::new(Server::new());
|
||||||
let serve_handle = server.spawn_with_config("localhost:0",
|
let serve_handle = server.spawn_with_config("localhost:0",
|
||||||
Config {
|
Config { timeout: Some(Duration::new(0, 10)) })
|
||||||
timeout: Some(Duration::new(0, 10))
|
|
||||||
})
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let addr = serve_handle.local_addr().clone();
|
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||||
let client: Client<(), u64> = Client::new(addr).unwrap();
|
|
||||||
let thread = thread::spawn(move || serve_handle.shutdown());
|
let thread = thread::spawn(move || serve_handle.shutdown());
|
||||||
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
|
info!("force_shutdown:: rpc1: {:?}", client.rpc(()));
|
||||||
thread.join().unwrap();
|
thread.join().unwrap();
|
||||||
@@ -196,12 +198,9 @@ mod test {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let server = Arc::new(Server::new());
|
let server = Arc::new(Server::new());
|
||||||
let serve_handle = server.spawn_with_config("localhost:0",
|
let serve_handle = server.spawn_with_config("localhost:0",
|
||||||
Config {
|
Config { timeout: test_timeout() })
|
||||||
timeout: test_timeout(),
|
|
||||||
})
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let addr = serve_handle.local_addr().clone();
|
let client: Arc<Client<(), u64, _>> = Arc::new(Client::new(serve_handle.dialer()).unwrap());
|
||||||
let client: Arc<Client<(), u64>> = Arc::new(Client::new(addr).unwrap());
|
|
||||||
client.rpc(()).unwrap();
|
client.rpc(()).unwrap();
|
||||||
serve_handle.shutdown();
|
serve_handle.shutdown();
|
||||||
match client.rpc(()) {
|
match client.rpc(()) {
|
||||||
@@ -218,8 +217,7 @@ mod test {
|
|||||||
let pool = Pool::new(concurrency);
|
let pool = Pool::new(concurrency);
|
||||||
let server = Arc::new(BarrierServer::new(concurrency));
|
let server = Arc::new(BarrierServer::new(concurrency));
|
||||||
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
let serve_handle = server.clone().spawn("localhost:0").unwrap();
|
||||||
let addr = serve_handle.local_addr().clone();
|
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||||
let client: Client<(), u64> = Client::new(addr).unwrap();
|
|
||||||
pool.scoped(|scope| {
|
pool.scoped(|scope| {
|
||||||
for _ in 0..concurrency {
|
for _ in 0..concurrency {
|
||||||
let client = client.try_clone().unwrap();
|
let client = client.try_clone().unwrap();
|
||||||
@@ -238,8 +236,7 @@ mod test {
|
|||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
let server = Arc::new(Server::new());
|
let server = Arc::new(Server::new());
|
||||||
let serve_handle = server.spawn("localhost:0").unwrap();
|
let serve_handle = server.spawn("localhost:0").unwrap();
|
||||||
let addr = serve_handle.local_addr().clone();
|
let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap();
|
||||||
let client: Client<(), u64> = Client::new(addr).unwrap();
|
|
||||||
|
|
||||||
// Drop future immediately; does the reader channel panic when sending?
|
// Drop future immediately; does the reader channel panic when sending?
|
||||||
client.rpc_async(());
|
client.rpc_async(());
|
||||||
|
|||||||
@@ -19,11 +19,11 @@ impl<T: Serialize> Serialize for Packet<T> {
|
|||||||
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
|
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
|
||||||
where S: Serializer
|
where S: Serializer
|
||||||
{
|
{
|
||||||
serializer.visit_struct(PACKET,
|
serializer.serialize_struct(PACKET,
|
||||||
MapVisitor {
|
MapVisitor {
|
||||||
value: self,
|
value: self,
|
||||||
state: 0,
|
state: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,11 +40,11 @@ impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
|
|||||||
match self.state {
|
match self.state {
|
||||||
0 => {
|
0 => {
|
||||||
self.state += 1;
|
self.state += 1;
|
||||||
Ok(Some(try!(serializer.visit_struct_elt(RPC_ID, &self.value.rpc_id))))
|
Ok(Some(try!(serializer.serialize_struct_elt(RPC_ID, &self.value.rpc_id))))
|
||||||
}
|
}
|
||||||
1 => {
|
1 => {
|
||||||
self.state += 1;
|
self.state += 1;
|
||||||
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
|
Ok(Some(try!(serializer.serialize_struct_elt(MESSAGE, &self.value.message))))
|
||||||
}
|
}
|
||||||
_ => Ok(None),
|
_ => Ok(None),
|
||||||
}
|
}
|
||||||
@@ -62,7 +62,7 @@ impl<T: Deserialize> Deserialize for Packet<T> {
|
|||||||
where D: Deserializer
|
where D: Deserializer
|
||||||
{
|
{
|
||||||
const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE];
|
const FIELDS: &'static [&'static str] = &[RPC_ID, MESSAGE];
|
||||||
deserializer.visit_struct(PACKET, FIELDS, Visitor(PhantomData))
|
deserializer.deserialize_struct(PACKET, FIELDS, Visitor(PhantomData))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,24 +7,27 @@ use serde;
|
|||||||
use scoped_pool::{Pool, Scope};
|
use scoped_pool::{Pool, Scope};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, BufReader, BufWriter};
|
use std::io::{self, BufReader, BufWriter};
|
||||||
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
|
|
||||||
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
|
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::thread::{self, JoinHandle};
|
use std::thread::{self, JoinHandle};
|
||||||
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
use super::{Config, Deserialize, Error, Packet, Result, Serialize};
|
||||||
|
use transport::{Dialer, Listener, Stream, Transport};
|
||||||
|
use transport::tcp::TcpDialer;
|
||||||
|
|
||||||
struct ConnectionHandler<'a, S>
|
struct ConnectionHandler<'a, S, St>
|
||||||
where S: Serve
|
where S: Serve,
|
||||||
|
St: Stream
|
||||||
{
|
{
|
||||||
read_stream: BufReader<TcpStream>,
|
read_stream: BufReader<St>,
|
||||||
write_stream: BufWriter<TcpStream>,
|
write_stream: BufWriter<St>,
|
||||||
server: S,
|
server: S,
|
||||||
shutdown: &'a AtomicBool,
|
shutdown: &'a AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, S> ConnectionHandler<'a, S>
|
impl<'a, S, St> ConnectionHandler<'a, S, St>
|
||||||
where S: Serve
|
where S: Serve,
|
||||||
|
St: Stream
|
||||||
{
|
{
|
||||||
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
|
fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> {
|
||||||
let ConnectionHandler {
|
let ConnectionHandler {
|
||||||
@@ -83,7 +86,7 @@ impl<'a, S> ConnectionHandler<'a, S>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<TcpStream>) {
|
fn write(rx: Receiver<Packet<<S as Serve>::Reply>>, stream: &mut BufWriter<St>) {
|
||||||
loop {
|
loop {
|
||||||
match rx.recv() {
|
match rx.recv() {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -101,21 +104,25 @@ impl<'a, S> ConnectionHandler<'a, S>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Provides methods for blocking until the server completes,
|
/// Provides methods for blocking until the server completes,
|
||||||
pub struct ServeHandle {
|
pub struct ServeHandle<D = TcpDialer>
|
||||||
|
where D: Dialer
|
||||||
|
{
|
||||||
tx: Sender<()>,
|
tx: Sender<()>,
|
||||||
join_handle: JoinHandle<()>,
|
join_handle: JoinHandle<()>,
|
||||||
addr: SocketAddr,
|
dialer: D,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServeHandle {
|
impl<D> ServeHandle<D>
|
||||||
|
where D: Dialer
|
||||||
|
{
|
||||||
/// Block until the server completes
|
/// Block until the server completes
|
||||||
pub fn wait(self) {
|
pub fn wait(self) {
|
||||||
self.join_handle.join().expect(pos!());
|
self.join_handle.join().expect(pos!());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the address the server is bound to
|
/// Returns the dialer to the server.
|
||||||
pub fn local_addr(&self) -> &SocketAddr {
|
pub fn dialer(&self) -> &D {
|
||||||
&self.addr
|
&self.dialer
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
|
/// Shutdown the server. Gracefully shuts down the serve thread but currently does not
|
||||||
@@ -123,7 +130,7 @@ impl ServeHandle {
|
|||||||
pub fn shutdown(self) {
|
pub fn shutdown(self) {
|
||||||
info!("ServeHandle: attempting to shut down the server.");
|
info!("ServeHandle: attempting to shut down the server.");
|
||||||
self.tx.send(()).expect(pos!());
|
self.tx.send(()).expect(pos!());
|
||||||
if let Ok(_) = TcpStream::connect(self.addr) {
|
if let Ok(_) = self.dialer.dial() {
|
||||||
self.join_handle.join().expect(pos!());
|
self.join_handle.join().expect(pos!());
|
||||||
} else {
|
} else {
|
||||||
warn!("ServeHandle: best effort shutdown of serve thread failed");
|
warn!("ServeHandle: best effort shutdown of serve thread failed");
|
||||||
@@ -131,16 +138,19 @@ impl ServeHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Server<'a, S: 'a> {
|
struct Server<'a, S: 'a, L>
|
||||||
|
where L: Listener
|
||||||
|
{
|
||||||
server: &'a S,
|
server: &'a S,
|
||||||
listener: TcpListener,
|
listener: L,
|
||||||
read_timeout: Option<Duration>,
|
read_timeout: Option<Duration>,
|
||||||
die_rx: Receiver<()>,
|
die_rx: Receiver<()>,
|
||||||
shutdown: &'a AtomicBool,
|
shutdown: &'a AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, S: 'a> Server<'a, S>
|
impl<'a, S, L> Server<'a, S, L>
|
||||||
where S: Serve + 'static
|
where S: Serve + 'static,
|
||||||
|
L: Listener
|
||||||
{
|
{
|
||||||
fn serve<'b>(self, scope: &Scope<'b>)
|
fn serve<'b>(self, scope: &Scope<'b>)
|
||||||
where 'a: 'b
|
where 'a: 'b
|
||||||
@@ -194,7 +204,9 @@ impl<'a, S: 'a> Server<'a, S>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, S> Drop for Server<'a, S> {
|
impl<'a, S, L> Drop for Server<'a, S, L>
|
||||||
|
where L: Listener
|
||||||
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
debug!("Shutting down connection handlers.");
|
debug!("Shutting down connection handlers.");
|
||||||
self.shutdown.store(true, Ordering::SeqCst);
|
self.shutdown.store(true, Ordering::SeqCst);
|
||||||
@@ -212,29 +224,33 @@ pub trait Serve: Send + Sync + Sized {
|
|||||||
fn serve(&self, request: Self::Request) -> Self::Reply;
|
fn serve(&self, request: Self::Request) -> Self::Reply;
|
||||||
|
|
||||||
/// spawn
|
/// spawn
|
||||||
fn spawn<A>(self, addr: A) -> io::Result<ServeHandle>
|
fn spawn<T>(self, transport: T) -> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||||
where A: ToSocketAddrs,
|
where T: Transport,
|
||||||
Self: 'static,
|
Self: 'static
|
||||||
{
|
{
|
||||||
self.spawn_with_config(addr, Config::default())
|
self.spawn_with_config(transport, Config::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// spawn
|
/// spawn
|
||||||
fn spawn_with_config<A>(self, addr: A, config: Config) -> io::Result<ServeHandle>
|
fn spawn_with_config<T>(self,
|
||||||
where A: ToSocketAddrs,
|
transport: T,
|
||||||
Self: 'static,
|
config: Config)
|
||||||
|
-> io::Result<ServeHandle<<T::Listener as Listener>::Dialer>>
|
||||||
|
where T: Transport,
|
||||||
|
Self: 'static
|
||||||
{
|
{
|
||||||
let listener = try!(TcpListener::bind(&addr));
|
let listener = try!(transport.bind());
|
||||||
let addr = try!(listener.local_addr());
|
let dialer = try!(listener.dialer());
|
||||||
info!("spawn_with_config: spinning up server on {:?}", addr);
|
info!("spawn_with_config: spinning up server.");
|
||||||
let (die_tx, die_rx) = channel();
|
let (die_tx, die_rx) = channel();
|
||||||
|
let timeout = config.timeout;
|
||||||
let join_handle = thread::spawn(move || {
|
let join_handle = thread::spawn(move || {
|
||||||
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
|
let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads
|
||||||
let shutdown = AtomicBool::new(false);
|
let shutdown = AtomicBool::new(false);
|
||||||
let server = Server {
|
let server = Server {
|
||||||
server: &self,
|
server: &self,
|
||||||
listener: listener,
|
listener: listener,
|
||||||
read_timeout: config.timeout,
|
read_timeout: timeout,
|
||||||
die_rx: die_rx,
|
die_rx: die_rx,
|
||||||
shutdown: &shutdown,
|
shutdown: &shutdown,
|
||||||
};
|
};
|
||||||
@@ -245,7 +261,7 @@ pub trait Serve: Send + Sync + Sized {
|
|||||||
Ok(ServeHandle {
|
Ok(ServeHandle {
|
||||||
tx: die_tx,
|
tx: die_tx,
|
||||||
join_handle: join_handle,
|
join_handle: join_handle,
|
||||||
addr: addr.clone(),
|
dialer: dialer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
91
tarpc/src/transport/mod.rs
Normal file
91
tarpc/src/transport/mod.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use std::io::{self, Read, Write};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// A factory for creating a listener on a given address.
|
||||||
|
/// For TCP, an address might be an IPv4 address; for Unix sockets, it
|
||||||
|
/// is just a file name.
|
||||||
|
pub trait Transport {
|
||||||
|
/// The type of listener that binds to the given address.
|
||||||
|
type Listener: Listener;
|
||||||
|
/// Return a listener on the given address, and a dialer to that address.
|
||||||
|
fn bind(&self) -> io::Result<Self::Listener>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accepts incoming connections from dialers.
|
||||||
|
pub trait Listener: Send + 'static {
|
||||||
|
/// The type of address being listened on.
|
||||||
|
type Dialer: Dialer;
|
||||||
|
/// The type of stream this listener accepts.
|
||||||
|
type Stream: Stream;
|
||||||
|
/// Accept an incoming stream.
|
||||||
|
fn accept(&self) -> io::Result<Self::Stream>;
|
||||||
|
/// Returns the local address being listened on.
|
||||||
|
fn dialer(&self) -> io::Result<Self::Dialer>;
|
||||||
|
/// Iterate over incoming connections.
|
||||||
|
fn incoming(&self) -> Incoming<Self> {
|
||||||
|
Incoming { listener: self }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A cloneable Reader/Writer.
|
||||||
|
pub trait Stream: Read + Write + Send + Sized + 'static {
|
||||||
|
/// Creates a new independently owned handle to the Stream.
|
||||||
|
///
|
||||||
|
/// The returned Stream should reference the same stream that this
|
||||||
|
/// object references. Both handles should read and write the same
|
||||||
|
/// stream of data, and options set on one stream should be propagated
|
||||||
|
/// to the other stream.
|
||||||
|
fn try_clone(&self) -> io::Result<Self>;
|
||||||
|
/// Sets a read timeout.
|
||||||
|
///
|
||||||
|
/// If the value specified is `None`, then read calls will block indefinitely.
|
||||||
|
/// It is an error to pass the zero `Duration` to this method.
|
||||||
|
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
|
||||||
|
/// Sets a write timeout.
|
||||||
|
///
|
||||||
|
/// If the value specified is `None`, then write calls will block indefinitely.
|
||||||
|
/// It is an error to pass the zero `Duration` to this method.
|
||||||
|
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()>;
|
||||||
|
/// Shuts down both ends of the stream.
|
||||||
|
///
|
||||||
|
/// Implementations should cause all pending and future I/O on the specified
|
||||||
|
/// portions to return immediately with an appropriate value.
|
||||||
|
fn shutdown(&self) -> io::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `Stream` factory.
|
||||||
|
pub trait Dialer {
|
||||||
|
/// The type of `Stream` this can create.
|
||||||
|
type Stream: Stream;
|
||||||
|
/// Open a stream.
|
||||||
|
fn dial(&self) -> io::Result<Self::Stream>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, D: ?Sized> Dialer for P
|
||||||
|
where P: ::std::ops::Deref<Target = D>,
|
||||||
|
D: Dialer + 'static
|
||||||
|
{
|
||||||
|
type Stream = D::Stream;
|
||||||
|
|
||||||
|
fn dial(&self) -> io::Result<Self::Stream> {
|
||||||
|
(**self).dial()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterates over incoming connections.
|
||||||
|
pub struct Incoming<'a, L: Listener + ?Sized + 'a> {
|
||||||
|
listener: &'a L,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, L: Listener> Iterator for Incoming<'a, L> {
|
||||||
|
type Item = io::Result<L::Stream>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
Some(self.listener.accept())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provides a TCP transport.
|
||||||
|
pub mod tcp;
|
||||||
|
/// Provides a unix socket transport.
|
||||||
|
pub mod unix;
|
||||||
75
tarpc/src/transport/tcp.rs
Normal file
75
tarpc/src/transport/tcp.rs
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
use std::io;
|
||||||
|
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// A transport for TCP.
|
||||||
|
pub struct TcpTransport<A: ToSocketAddrs>(pub A);
|
||||||
|
|
||||||
|
impl<A: ToSocketAddrs> super::Transport for TcpTransport<A> {
|
||||||
|
type Listener = TcpListener;
|
||||||
|
|
||||||
|
fn bind(&self) -> io::Result<TcpListener> {
|
||||||
|
TcpListener::bind(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: ToSocketAddrs> super::Transport for A {
|
||||||
|
type Listener = TcpListener;
|
||||||
|
|
||||||
|
fn bind(&self) -> io::Result<TcpListener> {
|
||||||
|
TcpListener::bind(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Listener for TcpListener {
|
||||||
|
type Dialer = TcpDialer<SocketAddr>;
|
||||||
|
|
||||||
|
type Stream = TcpStream;
|
||||||
|
|
||||||
|
fn accept(&self) -> io::Result<TcpStream> {
|
||||||
|
self.accept().map(|(stream, _)| stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dialer(&self) -> io::Result<TcpDialer<SocketAddr>> {
|
||||||
|
self.local_addr().map(|addr| TcpDialer(addr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Stream for TcpStream {
|
||||||
|
fn try_clone(&self) -> io::Result<Self> {
|
||||||
|
self.try_clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
|
||||||
|
self.set_read_timeout(dur)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
|
||||||
|
self.set_write_timeout(dur)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self) -> io::Result<()> {
|
||||||
|
self.shutdown(::std::net::Shutdown::Both)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connects to a socket address.
|
||||||
|
pub struct TcpDialer<A = SocketAddr>(pub A) where A: ToSocketAddrs;
|
||||||
|
|
||||||
|
impl<A> super::Dialer for TcpDialer<A>
|
||||||
|
where A: ToSocketAddrs
|
||||||
|
{
|
||||||
|
type Stream = TcpStream;
|
||||||
|
|
||||||
|
fn dial(&self) -> io::Result<TcpStream> {
|
||||||
|
TcpStream::connect(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Dialer for str {
|
||||||
|
type Stream = TcpStream;
|
||||||
|
|
||||||
|
fn dial(&self) -> io::Result<TcpStream> {
|
||||||
|
TcpStream::connect(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
70
tarpc/src/transport/unix.rs
Normal file
70
tarpc/src/transport/unix.rs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
use std::io;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Duration;
|
||||||
|
use unix_socket::{UnixListener, UnixStream};
|
||||||
|
|
||||||
|
/// A transport for unix sockets.
|
||||||
|
pub struct UnixTransport<P>(pub P) where P: AsRef<Path>;
|
||||||
|
|
||||||
|
impl<P> super::Transport for UnixTransport<P>
|
||||||
|
where P: AsRef<Path>
|
||||||
|
{
|
||||||
|
type Listener = UnixListener;
|
||||||
|
|
||||||
|
fn bind(&self) -> io::Result<UnixListener> {
|
||||||
|
UnixListener::bind(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connects to a unix socket address.
|
||||||
|
pub struct UnixDialer<P>(pub P) where P: AsRef<Path>;
|
||||||
|
|
||||||
|
impl<P> super::Dialer for UnixDialer<P>
|
||||||
|
where P: AsRef<Path>
|
||||||
|
{
|
||||||
|
type Stream = UnixStream;
|
||||||
|
|
||||||
|
fn dial(&self) -> io::Result<UnixStream> {
|
||||||
|
UnixStream::connect(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Listener for UnixListener {
|
||||||
|
type Stream = UnixStream;
|
||||||
|
|
||||||
|
type Dialer = UnixDialer<PathBuf>;
|
||||||
|
|
||||||
|
fn accept(&self) -> io::Result<UnixStream> {
|
||||||
|
self.accept().map(|(stream, _)| stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dialer(&self) -> io::Result<UnixDialer<PathBuf>> {
|
||||||
|
self.local_addr().and_then(|addr| {
|
||||||
|
match addr.as_pathname() {
|
||||||
|
Some(path) => Ok(UnixDialer(path.to_owned())),
|
||||||
|
None => {
|
||||||
|
Err(io::Error::new(io::ErrorKind::AddrNotAvailable,
|
||||||
|
"Couldn't get a path to bound unix socket"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Stream for UnixStream {
|
||||||
|
fn try_clone(&self) -> io::Result<Self> {
|
||||||
|
self.try_clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
|
||||||
|
self.set_read_timeout(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
|
||||||
|
self.set_write_timeout(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self) -> io::Result<()> {
|
||||||
|
self.shutdown(::std::net::Shutdown::Both)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -41,8 +41,9 @@ mod benchmark {
|
|||||||
Arc::new(Mutex::new(handle))
|
Arc::new(Mutex::new(handle))
|
||||||
};
|
};
|
||||||
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
|
static ref CLIENT: Arc<Mutex<AsyncClient>> = {
|
||||||
let addr = HANDLE.lock().unwrap().local_addr().clone();
|
let lock = HANDLE.lock().unwrap();
|
||||||
let client = AsyncClient::new(addr).unwrap();
|
let dialer = lock.dialer();
|
||||||
|
let client = AsyncClient::new(dialer).unwrap();
|
||||||
Arc::new(Mutex::new(client))
|
Arc::new(Mutex::new(client))
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user