From d4a760e7c851a6aebdf57112be057fe007a9e9ef Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 15 Jan 2016 21:52:25 -0800 Subject: [PATCH 01/11] Get rid of Error::{Sender, Receiver, Serializer, Deserializer} --- tarpc/src/lib.rs | 31 +--------------- tarpc/src/macros.rs | 2 +- tarpc/src/protocol.rs | 84 ++++++++++++++++++------------------------- 3 files changed, 37 insertions(+), 80 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 05823a0..24eccd0 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -54,9 +54,6 @@ extern crate bincode; #[macro_use] extern crate log; -use std::io; -use std::convert::From; - /// Provides the tarpc client and server, which implements the tarpc protocol. /// The protocol is defined by the implementation. pub mod protocol; @@ -64,30 +61,4 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -/// An error that occurred while processing an RPC request -#[derive(Debug)] -pub enum Error { - #[doc="An IO error occurred."] - Io(io::Error), - - #[doc="An unexpected internal error. Typically a bug in the server impl."] - InternalError, -} - -impl From for Error { - fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl From for Error { - fn from(err: protocol::Error) -> Error { - match err { - protocol::Error::Io(err) => Error::Io(err), - _ => Error::InternalError, - } - } -} - -///The result of an RPC call; either the successful result or the error -pub type Result = ::std::result::Result; +pub use protocol::{Error, Result}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index a71d412..b04feea 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -19,7 +19,7 @@ macro_rules! request_fns { if let __Reply::$fn_name(reply) = reply { Ok(reply) } else { - Err($crate::Error::InternalError) + panic!("Unexpected reply to {}: {:?}", stringify!($fn_name), reply); } } )*); diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 3f4b973..109a1a6 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -5,26 +5,19 @@ use std::io::{self, Read}; use std::convert; use std::collections::HashMap; use std::marker::PhantomData; +use std::mem; use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; -use std::sync::{self, Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::sync::mpsc::{channel, Sender, TryRecvError}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; /// Client errors that can occur during rpc calls -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Error { /// An IO-related error - Io(io::Error), - /// An error in serialization - Serialize(bincode::serde::SerializeError), - /// An error in deserialization - Deserialize(bincode::serde::DeserializeError), - /// An internal message failed to be received. - /// Channels are used for the client's inter-thread communication. This message is - /// propagated if the sender unexpectedly hangs up. - Receiver, + Io(Arc), /// The server hung up. ConnectionBroken, } @@ -32,8 +25,8 @@ pub enum Error { impl convert::From for Error { fn from(err: bincode::serde::SerializeError) -> Error { match err { - bincode::serde::SerializeError::IoError(err) => Error::Io(err), - err => Error::Serialize(err), + bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)), + err => panic!("Unexpected error during serialization: {:?}", err), } } } @@ -41,21 +34,16 @@ impl convert::From for Error { impl convert::From for Error { fn from(err: bincode::serde::DeserializeError) -> Error { match err { - bincode::serde::DeserializeError::IoError(err) => Error::Io(err), - err => Error::Deserialize(err), + bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), + bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken, + err => panic!("Unexpected error during deserialization: {:?}", err), } } } impl convert::From for Error { fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl convert::From for Error { - fn from(_: sync::mpsc::RecvError) -> Error { - Error::Receiver + Error::Io(Arc::new(err)) } } @@ -171,9 +159,7 @@ impl ConnectionHandler { } } Err(e) => { - warn!("ConnectionHandler: closing client connection due to error while \ - serving: {:?}", - e); + warn!("ConnectionHandler: closing client connection due to {:?}", e); return Err(e.into()); } } @@ -307,7 +293,7 @@ struct Packet { } struct Reader { - requests: Arc>>>>, + requests: Arc>>>>, } impl Reader { @@ -332,6 +318,9 @@ impl Reader { warn!("Client: reader thread encountered an unexpected error while parsing; \ returning now. Error: {:?}", err); + let mut guard = self.requests.lock().unwrap(); + let map = mem::replace(&mut *guard, Err(err.into())); + map.unwrap().clear(); break; } } @@ -339,15 +328,6 @@ impl Reader { } } -impl Drop for Reader { - fn drop(&mut self) { - let mut guard = self.requests.lock().unwrap(); - guard.as_mut().unwrap().clear(); - // remove the hashmap so no one can put more senders and accidentally block - guard.take(); - } -} - fn increment(cur_id: &mut u64) -> u64 { let id = *cur_id; *cur_id += 1; @@ -364,7 +344,7 @@ pub struct Client where Request: serde::ser::Serialize { synced_state: Mutex, - requests: Arc>>>>, + requests: Arc>>>>, reader_guard: Option>, timeout: Option, _request: PhantomData, @@ -377,7 +357,7 @@ impl Client /// Create a new client that connects to `addr` pub fn new(addr: A, timeout: Option) -> io::Result { let stream = try!(TcpStream::connect(addr)); - let requests = Arc::new(Mutex::new(Some(HashMap::new()))); + let requests = Arc::new(Mutex::new(Ok(HashMap::new()))); let reader_stream = try!(stream.try_clone()); let reader = Reader { requests: requests.clone() }; let reader_guard = thread::spawn(move || reader.read(reader_stream)); @@ -401,10 +381,11 @@ impl Client let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); { - if let Some(ref mut requests) = *self.requests.lock().unwrap() { - requests.insert(id, tx); - } else { - return Err(Error::ConnectionBroken); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.insert(id, tx); + } + Err(ref e) => return Err(e.clone()), } } let packet = Packet { @@ -420,17 +401,22 @@ impl Client warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}", packet, err); - if let Some(requests) = self.requests.lock().unwrap().as_mut() { - requests.remove(&id); - } else { - warn!("Client: couldn't remove sender for request {} because reader thread \ - returned.", - id); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.remove(&id); + return Err(err.into()); + } + Err(ref e) => return Err(e.clone()), } - return Err(err.into()); } drop(state); - Ok(try!(rx.recv())) + match rx.recv() { + Ok(msg) => Ok(msg), + Err(_) => { + let guard = self.requests.lock().unwrap(); + Err(guard.as_ref().err().unwrap().clone()) + } + } } } From 667eef4066e558df13da05fe438591ac98b9052a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 16 Jan 2016 01:14:46 -0800 Subject: [PATCH 02/11] Clarify timeout arg --- tarpc/src/protocol.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 109a1a6..8143be2 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -354,7 +354,8 @@ impl Client where Reply: serde::de::Deserialize + Send + 'static, Request: serde::ser::Serialize { - /// Create a new client that connects to `addr` + /// Create a new client that connects to `addr`. The client uses the given timeout + /// for both reads and writes. pub fn new(addr: A, timeout: Option) -> io::Result { let stream = try!(TcpStream::connect(addr)); let requests = Arc::new(Mutex::new(Ok(HashMap::new()))); From 15f2547ec7b3e202cda46d47c5bf726f87bcd3cd Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 16 Jan 2016 01:20:03 -0800 Subject: [PATCH 03/11] Update code in readme. Haven't checked if it runs --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c08bc9e..8277e77 100644 --- a/README.md +++ b/README.md @@ -17,15 +17,16 @@ rpc! { } } -impl hello_service::Service for () { +struct HelloService; +impl hello_service::Service for HelloService { fn hello(&self, name: String) -> String { format!("Hello, {}!", s) } } fn main() { - let server_handle = hello_service::serve("0.0.0.0:0", ()).unwrap(); - let client = hello_service::Client::new(server_handle.local_addr()).unwrap(); + let server_handle = hello_service::serve("0.0.0.0:0", HelloService, None).unwrap(); + let client = hello_service::Client::new(server_handle.local_addr(), None).unwrap(); assert_eq!("Hello, Mom!".into(), client.hello("Mom".into()).unwrap()); drop(client); server_handle.shutdown(); From e54a9df152c6d26881b873ff0d45121bc4bcf983 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 16 Jan 2016 01:22:22 -0800 Subject: [PATCH 04/11] Add checkboxes to planned improvements --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8277e77..6f1db32 100644 --- a/README.md +++ b/README.md @@ -37,9 +37,9 @@ The `rpc!` macro generates a module in the current module. In the above example, ## Planned Improvements (actively being worked on) -- Automatically reconnect on the client side when the connection cuts out. -- Allow omitting the return type in rpc definitions when the type is `()`. -- Allow users to specify imports inside the `rpc!` macro -- Support arbitrary serialization. (currently `serde_json` is used for all serialization) -- Support asynchronous server implementations (currently thread per connection). -- Support doc comments on rpc method definitions +- [ ] Automatically reconnect on the client side when the connection cuts out. +- [ ] Allow omitting the return type in rpc definitions when the type is `()`. +- [x] Allow users to specify imports inside the `rpc!` macro +- [ ] Support arbitrary serialization (currently `bincode` is used for all serialization). +- [ ] Support asynchronous server implementations (currently thread per connection). +- [x] Support doc comments on rpc method definitions From 80fb9922cdf6b2921912671da4dfcdd5dfcc349d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sat, 16 Jan 2016 16:19:11 -0800 Subject: [PATCH 05/11] Put doc attributes defined on rpc methods on both the Service trait fns as well as the Client method stubs. --- README.md | 2 +- tarpc/src/lib.rs | 1 - tarpc/src/macros.rs | 24 ++++++++++++++++++------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 6f1db32..3377d31 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ fn main() { } ``` -The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples. +The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. Any doc attributes on rpc items in the `service { }` block will appear as documentation on both the Service trait methods as well as on the Client's stub methods. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples. ## Planned Improvements (actively being worked on) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 24eccd0..2a71330 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -3,7 +3,6 @@ //! Example usage: //! //! ``` -//! # #![feature(custom_derive)] //! # #![feature(custom_derive, plugin)] //! # #![plugin(serde_macros)] //! # #[macro_use] extern crate tarpc; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index b04feea..4516575 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -2,18 +2,25 @@ macro_rules! as_item { ($i:item) => {$i} } // Required because if-let can't be used with irrefutable patterns, so it needs -// to be special -// cased. +// to be special cased. #[macro_export] -macro_rules! request_fns { - ($fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty) => ( +macro_rules! client_stubs { + ( + { $(#[$attr:meta])* } + $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty + ) => ( + $(#[$attr])* pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*))); let __Reply::$fn_name(reply) = reply; Ok(reply) } ); - ($( $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty)*) => ( $( + ($( + { $(#[$attr:meta])* } + $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty + )*) => ( $( + $(#[$attr])* pub fn $fn_name(&self, $($arg: $in_),*) -> $crate::Result<$out> { let reply = try!((self.0).rpc(&request_variant!($fn_name $($arg),*))); if let __Reply::$fn_name(reply) = reply { @@ -133,7 +140,12 @@ macro_rules! rpc { Ok(Client(inner)) } - request_fns!($($fn_name($($arg: $in_),*) -> $out)*); + client_stubs!( + $( + { $(#[$attr])* } + $fn_name($($arg: $in_),*) -> $out + )* + ); } struct __Server(S); From b93752aeaa4ac3b2f0d874b3b23000d8f2a9c9f5 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 13:14:53 -0800 Subject: [PATCH 06/11] Remove unused feature enables --- tarpc/src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 2a71330..4b4eb5d 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -41,9 +41,6 @@ //! } //! ``` -#![feature(trace_macros)] -#![feature(const_fn)] -#![feature(braced_empty_structs)] #![feature(custom_derive, plugin)] #![plugin(serde_macros)] #![deny(missing_docs)] From 26ddd3e610bc846bca921d791b9c76d0f1487093 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 16:38:11 -0800 Subject: [PATCH 07/11] Put deny(missing_docs) at the top. I think it looks better there. --- tarpc/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 4b4eb5d..3de785b 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -41,9 +41,9 @@ //! } //! ``` +#![deny(missing_docs)] #![feature(custom_derive, plugin)] #![plugin(serde_macros)] -#![deny(missing_docs)] extern crate serde; extern crate bincode; From a5a9ec3d4f4f61ef9f10d305f5a1fab7a0bc06ae Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 19:45:13 -0800 Subject: [PATCH 08/11] Make separate section in the readme for additional features --- README.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 3377d31..08ed60b 100644 --- a/README.md +++ b/README.md @@ -33,13 +33,14 @@ fn main() { } ``` -The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. Any doc attributes on rpc items in the `service { }` block will appear as documentation on both the Service trait methods as well as on the Client's stub methods. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples. +The `rpc!` macro generates a module in the current module. In the above example, the module is named `hello_service`. This module will contain a `Client` type, a `Service` trait, and a `serve` function. `serve` can be used to start a server listening on a tcp port. A `Client` can connect to such a service. Any type implementing the `Service` trait can be passed to `serve`. These generated types are specific to the echo service, and make it easy and ergonomic to write servers without dealing with sockets or serialization directly. See the tarpc_examples package for more sophisticated examples. + +## Additional Features +- Imports can be specified in an `item {}` block that appears above the `service {}` block. +- Attributes can be specified on rpc methods. These will be included on both the `Service` trait methods as well as on the `Client`'s stub methods. ## Planned Improvements (actively being worked on) - -- [ ] Automatically reconnect on the client side when the connection cuts out. -- [ ] Allow omitting the return type in rpc definitions when the type is `()`. -- [x] Allow users to specify imports inside the `rpc!` macro -- [ ] Support arbitrary serialization (currently `bincode` is used for all serialization). -- [ ] Support asynchronous server implementations (currently thread per connection). -- [x] Support doc comments on rpc method definitions +- Automatically reconnect on the client side when the connection cuts out. +- Allow omitting the return type in rpc definitions when the type is `()`. +- Support arbitrary serialization (currently `bincode` is used for all serialization). +- Support asynchronous server implementations (currently thread per connection). From f92277e019940b518e5df8a2b75fbe4b6f905434 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 19:47:48 -0800 Subject: [PATCH 09/11] Clarify error message --- tarpc/src/macros.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 4516575..42f8191 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -26,7 +26,7 @@ macro_rules! client_stubs { if let __Reply::$fn_name(reply) = reply { Ok(reply) } else { - panic!("Unexpected reply to {}: {:?}", stringify!($fn_name), reply); + panic!("Incorrect reply variant returned from protocol::Clientrpc; expected `{}`, but got {:?}", stringify!($fn_name), reply); } } )*); From 4a35a86cbc730ee6bbc49a2f97db928b34f1ccce Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 21:56:30 -0800 Subject: [PATCH 10/11] Rename client_stubs => client_methods. Modify unit test to check for the Error variant returned on shutdown. --- tarpc/src/macros.rs | 4 ++-- tarpc/src/protocol.rs | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 42f8191..57eb1e8 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -4,7 +4,7 @@ macro_rules! as_item { ($i:item) => {$i} } // Required because if-let can't be used with irrefutable patterns, so it needs // to be special cased. #[macro_export] -macro_rules! client_stubs { +macro_rules! client_methods { ( { $(#[$attr:meta])* } $fn_name:ident( $( $arg:ident : $in_:ty ),* ) -> $out:ty @@ -140,7 +140,7 @@ macro_rules! rpc { Ok(Client(inner)) } - client_stubs!( + client_methods!( $( { $(#[$attr])* } $fn_name($($arg: $in_),*) -> $out diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 8143be2..3ba2397 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -561,7 +561,14 @@ mod test { let addr = serve_handle.local_addr().clone(); let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); serve_handle.shutdown(); - let _ = client.rpc(&Request::Increment); // First failure will trigger reader to shutdown + match client.rpc(&Request::Increment) { + ok @ Ok(_) => panic!("Expected Err, got {:?}", ok), + Err(e) => if let super::Error::ConnectionBroken = e { + /* success */ + } else { + panic!("Expected Error::ConnectionBroken, got {:?}", e); + }, + } let _ = client.rpc(&Request::Increment); // Test whether second failure hangs } From 541dd8f9e6f473a9e827aa86d93d047cfc4b53f1 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 17 Jan 2016 22:07:49 -0800 Subject: [PATCH 11/11] Use pattern matching better --- tarpc/src/protocol.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 3ba2397..b91b6b0 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -562,12 +562,8 @@ mod test { let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); serve_handle.shutdown(); match client.rpc(&Request::Increment) { - ok @ Ok(_) => panic!("Expected Err, got {:?}", ok), - Err(e) => if let super::Error::ConnectionBroken = e { - /* success */ - } else { - panic!("Expected Error::ConnectionBroken, got {:?}", e); - }, + Err(super::Error::ConnectionBroken) => {}, // success + otherwise => panic!("Expected Err(ConnectionBroken), got {:?}", otherwise), } let _ = client.rpc(&Request::Increment); // Test whether second failure hangs }