Merge branch 'remove-error-sender' into 'master'

Get rid of Error::{Sender, Receiver, Serializer, Deserializer}



See merge request !3
This commit is contained in:
Adam Wright
2016-01-18 11:43:21 +05:30
4 changed files with 71 additions and 100 deletions

View File

@@ -17,15 +17,16 @@ rpc! {
}
}
impl hello_service::Service for () {
struct HelloService;
impl hello_service::Service for HelloService {
fn hello(&self, name: String) -> String {
format!("Hello, {}!", s)
}
}
fn main() {
let server_handle = hello_service::serve("0.0.0.0:0", ()).unwrap();
let client = hello_service::Client::new(server_handle.local_addr()).unwrap();
let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap();
let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap();
assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap());
drop(client);
server_handle.shutdown();
@@ -34,11 +35,12 @@ fn main() {
The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples.
## Planned Improvements (actively being worked on)
## Additional Features
- Imports can be specified in an `item {}` block that appears above the `service {}` block.
- Attributes can be specified on rpc methods. These will be included on both the `Service` trait methods as well as on the `Client`'s stub methods.
## Planned Improvements (actively being worked on)
- Automatically reconnect on the client side when the connection cuts out.
- Allow omitting the return type in rpc definitions when the type is `()`.
- Allow users to specify imports inside the `rpc!` macro
- Support arbitrary serialization. (currently `serde_json` is used for all serialization)
- Support arbitrary serialization (currently `bincode` is used for all serialization).
- Support asynchronous server implementations (currently thread per connection).
- Support doc comments on rpc method definitions

View File

@@ -3,7 +3,6 @@
//! Example usage:
//!
//! ```
//! # #![feature(custom_derive)]
//! # #![feature(custom_derive, plugin)]
//! # #![plugin(serde_macros)]
//! # #[macro_use] extern crate tarpc;
@@ -42,21 +41,15 @@
//! }
//! ```
#![feature(trace_macros)]
#![feature(const_fn)]
#![feature(braced_empty_structs)]
#![deny(missing_docs)]
#![feature(custom_derive, plugin)]
#![plugin(serde_macros)]
#![deny(missing_docs)]
extern crate serde;
extern crate bincode;
#[macro_use]
extern crate log;
use std::io;
use std::convert::From;
/// Provides the tarpc client and server, which implements the tarpc protocol.
/// The protocol is defined by the implementation.
pub mod protocol;
@@ -64,30 +57,4 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
/// An error that occurred while processing an RPC request
#[derive(Debug)]
pub enum Error {
#[doc="An IO error occurred."]
Io(io::Error),
#[doc="An unexpected internal error. Typically a bug in the server impl."]
InternalError,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl From<protocol::Error> for Error {
fn from(err: protocol::Error) -> Error {
match err {
protocol::Error::Io(err) => Error::Io(err),
_ => Error::InternalError,
}
}
}
///The result of an RPC call; either the successful result or the error
pub type Result<T> = ::std::result::Result<T, Error>;
pub use protocol::{Error, Result};

View File

@@ -3,25 +3,32 @@
macro_rules! as_item { ($i:item) => {$i} }
// Required because if-let can't be used with irrefutable patterns, so it needs
// to be special
// cased.
// to be special cased.
#[doc(hidden)]
#[macro_export]
macro_rules! request_fns {
($fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty) => (
macro_rules! client_methods {
(
{ $(#[$attr:meta])* }
$fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty
) => (
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*)));
let __Reply::$fn_name(reply) = reply;
Ok(reply)
}
);
($( $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty)*) => ( $(
($(
{ $(#[$attr:meta])* }
$fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty
)*) => ( $(
$(#[$attr])*
pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> {
let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*)));
if let __Reply::$fn_name(reply) = reply {
Ok(reply)
} else {
Err($crate::Error::InternalError)
panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply);
}
}
)*);
@@ -137,7 +144,12 @@ macro_rules! rpc {
Ok(Client(inner))
}
request_fns!($($fn_name($($arg: $in_),*) -> $out)*);
client_methods!(
$(
{ $(#[$attr])* }
$fn_name($($arg: $in_),*) -> $out
)*
);
}
struct __Server<S: 'static + Service>(S);

View File

@@ -5,26 +5,19 @@ use std::io::{self, Read};
use std::convert;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs};
use std::sync::{self, Arc, Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::mpsc::{channel, Sender, TryRecvError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
/// Client errors that can occur during rpc calls
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Error {
/// An IO-related error
Io(io::Error),
/// An error in serialization
Serialize(bincode::serde::SerializeError),
/// An error in deserialization
Deserialize(bincode::serde::DeserializeError),
/// An internal message failed to be received.
/// Channels are used for the client's inter-thread communication. This message is
/// propagated if the sender unexpectedly hangs up.
Receiver,
Io(Arc<io::Error>),
/// The server hung up.
ConnectionBroken,
}
@@ -32,8 +25,8 @@ pub enum Error {
impl convert::From<bincode::serde::SerializeError> for Error {
fn from(err: bincode::serde::SerializeError) -> Error {
match err {
bincode::serde::SerializeError::IoError(err) => Error::Io(err),
err => Error::Serialize(err),
bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)),
err => panic!("Unexpected error during serialization: {:?}", err),
}
}
}
@@ -41,21 +34,16 @@ impl convert::From<bincode::serde::SerializeError> for Error {
impl convert::From<bincode::serde::DeserializeError> for Error {
fn from(err: bincode::serde::DeserializeError) -> Error {
match err {
bincode::serde::DeserializeError::IoError(err) => Error::Io(err),
err => Error::Deserialize(err),
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)),
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken,
err => panic!("Unexpected error during deserialization: {:?}", err),
}
}
}
impl convert::From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl convert::From<sync::mpsc::RecvError> for Error {
fn from(_: sync::mpsc::RecvError) -> Error {
Error::Receiver
Error::Io(Arc::new(err))
}
}
@@ -171,9 +159,7 @@ impl ConnectionHandler {
}
}
Err(e) => {
warn!("ConnectionHandler: closing client connection due to error while \
serving: {:?}",
e);
warn!("ConnectionHandler: closing client connection due to {:?}", e);
return Err(e.into());
}
}
@@ -307,7 +293,7 @@ struct Packet<T> {
}
struct Reader<Reply> {
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
}
impl<Reply> Reader<Reply> {
@@ -332,6 +318,9 @@ impl<Reply> Reader<Reply> {
warn!("Client: reader thread encountered an unexpected error while parsing; \
returning now. Error: {:?}",
err);
let mut guard = self.requests.lock().unwrap();
let map = mem::replace(&mut *guard, Err(err.into()));
map.unwrap().clear();
break;
}
}
@@ -339,15 +328,6 @@ impl<Reply> Reader<Reply> {
}
}
impl<Reply> Drop for Reader<Reply> {
fn drop(&mut self) {
let mut guard = self.requests.lock().unwrap();
guard.as_mut().unwrap().clear();
// remove the hashmap so no one can put more senders and accidentally block
guard.take();
}
}
fn increment(cur_id: &mut u64) -> u64 {
let id = *cur_id;
*cur_id += 1;
@@ -364,7 +344,7 @@ pub struct Client<Request, Reply>
where Request: serde::ser::Serialize
{
synced_state: Mutex<SyncedClientState>,
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
reader_guard: Option<thread::JoinHandle<()>>,
timeout: Option<Duration>,
_request: PhantomData<Request>,
@@ -374,10 +354,11 @@ impl<Request, Reply> Client<Request, Reply>
where Reply: serde::de::Deserialize + Send + 'static,
Request: serde::ser::Serialize
{
/// Create a new client that connects to `addr`
/// Create a new client that connects to `addr`. The client uses the given timeout
/// for both reads and writes.
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
let requests = Arc::new(Mutex::new(Some(HashMap::new())));
let requests = Arc::new(Mutex::new(Ok(HashMap::new())));
let reader_stream = try!(stream.try_clone());
let reader = Reader { requests: requests.clone() };
let reader_guard = thread::spawn(move || reader.read(reader_stream));
@@ -401,10 +382,11 @@ impl<Request, Reply> Client<Request, Reply>
let mut state = self.synced_state.lock().unwrap();
let id = increment(&mut state.next_id);
{
if let Some(ref mut requests) = *self.requests.lock().unwrap() {
requests.insert(id, tx);
} else {
return Err(Error::ConnectionBroken);
match *self.requests.lock().unwrap() {
Ok(ref mut requests) => {
requests.insert(id, tx);
}
Err(ref e) => return Err(e.clone()),
}
}
let packet = Packet {
@@ -420,17 +402,22 @@ impl<Request, Reply> Client<Request, Reply>
warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}",
packet,
err);
if let Some(requests) = self.requests.lock().unwrap().as_mut() {
requests.remove(&id);
} else {
warn!("Client: couldn't remove sender for request {} because reader thread \
returned.",
id);
match *self.requests.lock().unwrap() {
Ok(ref mut requests) => {
requests.remove(&id);
return Err(err.into());
}
Err(ref e) => return Err(e.clone()),
}
return Err(err.into());
}
drop(state);
Ok(try!(rx.recv()))
match rx.recv() {
Ok(msg) => Ok(msg),
Err(_) => {
let guard = self.requests.lock().unwrap();
Err(guard.as_ref().err().unwrap().clone())
}
}
}
}
@@ -574,7 +561,10 @@ mod test {
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<Request, Reply>> = Arc::new(Client::new(addr, None).unwrap());
serve_handle.shutdown();
let _ = client.rpc(&Request::Increment); // First failure will trigger reader to shutdown
match client.rpc(&Request::Increment) {
Err(super::Error::ConnectionBroken) => {}, // success
otherwise => panic!("Expected Err(ConnectionBroken), got {:?}", otherwise),
}
let _ = client.rpc(&Request::Increment); // Test whether second failure hangs
}