diff --git a/README.md b/README.md index c08bc9e..08ed60b 100644 --- a/README.md +++ b/README.md @@ -17,15 +17,16 @@ rpc! { } } -impl hello_service::Service for () { +struct HelloService; +impl hello_service::Service for HelloService { fn hello(&self, name: String) -> String { format!("Hello, {}!", s) } } fn main() { - let server_handle = hello_service::serve("0.0.0.0:0", ()).unwrap(); - let client = hello_service::Client::new(server_handle.local_addr()).unwrap(); + let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap(); + let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap(); assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap()); drop(client); server_handle.shutdown(); @@ -34,11 +35,12 @@ fn main() { The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples. -## Planned Improvements (actively being worked on) +## Additional Features +- Imports can be specified in an `item {}` block that appears above the `service {}` block. +- Attributes can be specified on rpc methods. These will be included on both the `Service` trait methods as well as on the `Client`'s stub methods. +## Planned Improvements (actively being worked on) - Automatically reconnect on the client side when the connection cuts out. - Allow omitting the return type in rpc definitions when the type is `()`. -- Allow users to specify imports inside the `rpc!` macro -- Support arbitrary serialization. (currently `serde_json` is used for all serialization) +- Support arbitrary serialization (currently `bincode` is used for all serialization). - Support asynchronous server implementations (currently thread per connection). -- Support doc comments on rpc method definitions diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 05823a0..3de785b 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -3,7 +3,6 @@ //! Example usage: //! //! ``` -//! # #![feature(custom_derive)] //! # #![feature(custom_derive, plugin)] //! # #![plugin(serde_macros)] //! # #[macro_use] extern crate tarpc; @@ -42,21 +41,15 @@ //! } //! ``` -#![feature(trace_macros)] -#![feature(const_fn)] -#![feature(braced_empty_structs)] +#![deny(missing_docs)] #![feature(custom_derive, plugin)] #![plugin(serde_macros)] -#![deny(missing_docs)] extern crate serde; extern crate bincode; #[macro_use] extern crate log; -use std::io; -use std::convert::From; - /// Provides the tarpc client and server, which implements the tarpc protocol. /// The protocol is defined by the implementation. pub mod protocol; @@ -64,30 +57,4 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -/// An error that occurred while processing an RPC request -#[derive(Debug)] -pub enum Error { - #[doc="An IO error occurred."] - Io(io::Error), - - #[doc="An unexpected internal error. Typically a bug in the server impl."] - InternalError, -} - -impl From for Error { - fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl From for Error { - fn from(err: protocol::Error) -> Error { - match err { - protocol::Error::Io(err) => Error::Io(err), - _ => Error::InternalError, - } - } -} - -///The result of an RPC call; either the successful result or the error -pub type Result = ::std::result::Result; +pub use protocol::{Error, Result}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 4ffdb43..db57768 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -3,25 +3,32 @@ macro_rules! as_item { ($i:item) => {$i} } // Required because if-let can't be used with irrefutable patterns, so it needs -// to be special -// cased. +// to be special cased. #[doc(hidden)] #[macro_export] -macro_rules! request_fns { - ($fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty) => ( +macro_rules! client_methods { + ( + { $(#[$attr:meta])* } + $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty + ) => ( + $(#[$attr])* pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*))); let __Reply::$fn_name(reply) = reply; Ok(reply) } ); - ($( $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty)*) => ( $( + ($( + { $(#[$attr:meta])* } + $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty + )*) => ( $( + $(#[$attr])* pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*))); if let __Reply::$fn_name(reply) = reply { Ok(reply) } else { - Err($crate::Error::InternalError) + panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply); } } )*); @@ -137,7 +144,12 @@ macro_rules! rpc { Ok(Client(inner)) } - request_fns!($($fn_name($($arg: $in_),*) -> $out)*); + client_methods!( + $( + { $(#[$attr])* } + $fn_name($($arg: $in_),*) -> $out + )* + ); } struct __Server(S); diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 3f4b973..b91b6b0 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -5,26 +5,19 @@ use std::io::{self, Read}; use std::convert; use std::collections::HashMap; use std::marker::PhantomData; +use std::mem; use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; -use std::sync::{self, Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::sync::mpsc::{channel, Sender, TryRecvError}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; /// Client errors that can occur during rpc calls -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Error { /// An IO-related error - Io(io::Error), - /// An error in serialization - Serialize(bincode::serde::SerializeError), - /// An error in deserialization - Deserialize(bincode::serde::DeserializeError), - /// An internal message failed to be received. - /// Channels are used for the client's inter-thread communication. This message is - /// propagated if the sender unexpectedly hangs up. - Receiver, + Io(Arc), /// The server hung up. ConnectionBroken, } @@ -32,8 +25,8 @@ pub enum Error { impl convert::From for Error { fn from(err: bincode::serde::SerializeError) -> Error { match err { - bincode::serde::SerializeError::IoError(err) => Error::Io(err), - err => Error::Serialize(err), + bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)), + err => panic!("Unexpected error during serialization: {:?}", err), } } } @@ -41,21 +34,16 @@ impl convert::From for Error { impl convert::From for Error { fn from(err: bincode::serde::DeserializeError) -> Error { match err { - bincode::serde::DeserializeError::IoError(err) => Error::Io(err), - err => Error::Deserialize(err), + bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), + bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken, + err => panic!("Unexpected error during deserialization: {:?}", err), } } } impl convert::From for Error { fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl convert::From for Error { - fn from(_: sync::mpsc::RecvError) -> Error { - Error::Receiver + Error::Io(Arc::new(err)) } } @@ -171,9 +159,7 @@ impl ConnectionHandler { } } Err(e) => { - warn!("ConnectionHandler: closing client connection due to error while \ - serving: {:?}", - e); + warn!("ConnectionHandler: closing client connection due to {:?}", e); return Err(e.into()); } } @@ -307,7 +293,7 @@ struct Packet { } struct Reader { - requests: Arc>>>>, + requests: Arc>>>>, } impl Reader { @@ -332,6 +318,9 @@ impl Reader { warn!("Client: reader thread encountered an unexpected error while parsing; \ returning now. Error: {:?}", err); + let mut guard = self.requests.lock().unwrap(); + let map = mem::replace(&mut *guard, Err(err.into())); + map.unwrap().clear(); break; } } @@ -339,15 +328,6 @@ impl Reader { } } -impl Drop for Reader { - fn drop(&mut self) { - let mut guard = self.requests.lock().unwrap(); - guard.as_mut().unwrap().clear(); - // remove the hashmap so no one can put more senders and accidentally block - guard.take(); - } -} - fn increment(cur_id: &mut u64) -> u64 { let id = *cur_id; *cur_id += 1; @@ -364,7 +344,7 @@ pub struct Client where Request: serde::ser::Serialize { synced_state: Mutex, - requests: Arc>>>>, + requests: Arc>>>>, reader_guard: Option>, timeout: Option, _request: PhantomData, @@ -374,10 +354,11 @@ impl Client where Reply: serde::de::Deserialize + Send + 'static, Request: serde::ser::Serialize { - /// Create a new client that connects to `addr` + /// Create a new client that connects to `addr`. The client uses the given timeout + /// for both reads and writes. pub fn new(addr: A, timeout: Option) -> io::Result { let stream = try!(TcpStream::connect(addr)); - let requests = Arc::new(Mutex::new(Some(HashMap::new()))); + let requests = Arc::new(Mutex::new(Ok(HashMap::new()))); let reader_stream = try!(stream.try_clone()); let reader = Reader { requests: requests.clone() }; let reader_guard = thread::spawn(move || reader.read(reader_stream)); @@ -401,10 +382,11 @@ impl Client let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); { - if let Some(ref mut requests) = *self.requests.lock().unwrap() { - requests.insert(id, tx); - } else { - return Err(Error::ConnectionBroken); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.insert(id, tx); + } + Err(ref e) => return Err(e.clone()), } } let packet = Packet { @@ -420,17 +402,22 @@ impl Client warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}", packet, err); - if let Some(requests) = self.requests.lock().unwrap().as_mut() { - requests.remove(&id); - } else { - warn!("Client: couldn't remove sender for request {} because reader thread \ - returned.", - id); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.remove(&id); + return Err(err.into()); + } + Err(ref e) => return Err(e.clone()), } - return Err(err.into()); } drop(state); - Ok(try!(rx.recv())) + match rx.recv() { + Ok(msg) => Ok(msg), + Err(_) => { + let guard = self.requests.lock().unwrap(); + Err(guard.as_ref().err().unwrap().clone()) + } + } } } @@ -574,7 +561,10 @@ mod test { let addr = serve_handle.local_addr().clone(); let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); serve_handle.shutdown(); - let _ = client.rpc(&Request::Increment); // First failure will trigger reader to shutdown + match client.rpc(&Request::Increment) { + Err(super::Error::ConnectionBroken) => {}, // success + otherwise => panic!("Expected Err(ConnectionBroken), got {:?}", otherwise), + } let _ = client.rpc(&Request::Increment); // Test whether second failure hangs }