From d4a760e7c851a6aebdf57112be057fe007a9e9ef Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 15 Jan 2016 21:52:25 -0800 Subject: [PATCH] Get rid of Error::{Sender, Receiver, Serializer, Deserializer} --- tarpc/src/lib.rs | 31 +--------------- tarpc/src/macros.rs | 2 +- tarpc/src/protocol.rs | 84 ++++++++++++++++++------------------------- 3 files changed, 37 insertions(+), 80 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 05823a0..24eccd0 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -54,9 +54,6 @@ extern crate bincode; #[macro_use] extern crate log; -use std::io; -use std::convert::From; - /// Provides the tarpc client and server, which implements the tarpc protocol. /// The protocol is defined by the implementation. pub mod protocol; @@ -64,30 +61,4 @@ pub mod protocol; /// Provides the macro used for constructing rpc services and client stubs. pub mod macros; -/// An error that occurred while processing an RPC request -#[derive(Debug)] -pub enum Error { - #[doc="An IO error occurred."] - Io(io::Error), - - #[doc="An unexpected internal error. Typically a bug in the server impl."] - InternalError, -} - -impl From for Error { - fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl From for Error { - fn from(err: protocol::Error) -> Error { - match err { - protocol::Error::Io(err) => Error::Io(err), - _ => Error::InternalError, - } - } -} - -///The result of an RPC call; either the successful result or the error -pub type Result = ::std::result::Result; +pub use protocol::{Error, Result}; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index a71d412..b04feea 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -19,7 +19,7 @@ macro_rules! request_fns { if let __Reply::$fn_name(reply) = reply { Ok(reply) } else { - Err($crate::Error::InternalError) + panic!("Unexpected reply to {}: {:?}", stringify!($fn_name), reply); } } )*); diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 3f4b973..109a1a6 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -5,26 +5,19 @@ use std::io::{self, Read}; use std::convert; use std::collections::HashMap; use std::marker::PhantomData; +use std::mem; use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; -use std::sync::{self, Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::sync::mpsc::{channel, Sender, TryRecvError}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; /// Client errors that can occur during rpc calls -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Error { /// An IO-related error - Io(io::Error), - /// An error in serialization - Serialize(bincode::serde::SerializeError), - /// An error in deserialization - Deserialize(bincode::serde::DeserializeError), - /// An internal message failed to be received. - /// Channels are used for the client's inter-thread communication. This message is - /// propagated if the sender unexpectedly hangs up. - Receiver, + Io(Arc), /// The server hung up. ConnectionBroken, } @@ -32,8 +25,8 @@ pub enum Error { impl convert::From for Error { fn from(err: bincode::serde::SerializeError) -> Error { match err { - bincode::serde::SerializeError::IoError(err) => Error::Io(err), - err => Error::Serialize(err), + bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)), + err => panic!("Unexpected error during serialization: {:?}", err), } } } @@ -41,21 +34,16 @@ impl convert::From for Error { impl convert::From for Error { fn from(err: bincode::serde::DeserializeError) -> Error { match err { - bincode::serde::DeserializeError::IoError(err) => Error::Io(err), - err => Error::Deserialize(err), + bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), + bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken, + err => panic!("Unexpected error during deserialization: {:?}", err), } } } impl convert::From for Error { fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - -impl convert::From for Error { - fn from(_: sync::mpsc::RecvError) -> Error { - Error::Receiver + Error::Io(Arc::new(err)) } } @@ -171,9 +159,7 @@ impl ConnectionHandler { } } Err(e) => { - warn!("ConnectionHandler: closing client connection due to error while \ - serving: {:?}", - e); + warn!("ConnectionHandler: closing client connection due to {:?}", e); return Err(e.into()); } } @@ -307,7 +293,7 @@ struct Packet { } struct Reader { - requests: Arc>>>>, + requests: Arc>>>>, } impl Reader { @@ -332,6 +318,9 @@ impl Reader { warn!("Client: reader thread encountered an unexpected error while parsing; \ returning now. Error: {:?}", err); + let mut guard = self.requests.lock().unwrap(); + let map = mem::replace(&mut *guard, Err(err.into())); + map.unwrap().clear(); break; } } @@ -339,15 +328,6 @@ impl Reader { } } -impl Drop for Reader { - fn drop(&mut self) { - let mut guard = self.requests.lock().unwrap(); - guard.as_mut().unwrap().clear(); - // remove the hashmap so no one can put more senders and accidentally block - guard.take(); - } -} - fn increment(cur_id: &mut u64) -> u64 { let id = *cur_id; *cur_id += 1; @@ -364,7 +344,7 @@ pub struct Client where Request: serde::ser::Serialize { synced_state: Mutex, - requests: Arc>>>>, + requests: Arc>>>>, reader_guard: Option>, timeout: Option, _request: PhantomData, @@ -377,7 +357,7 @@ impl Client /// Create a new client that connects to `addr` pub fn new(addr: A, timeout: Option) -> io::Result { let stream = try!(TcpStream::connect(addr)); - let requests = Arc::new(Mutex::new(Some(HashMap::new()))); + let requests = Arc::new(Mutex::new(Ok(HashMap::new()))); let reader_stream = try!(stream.try_clone()); let reader = Reader { requests: requests.clone() }; let reader_guard = thread::spawn(move || reader.read(reader_stream)); @@ -401,10 +381,11 @@ impl Client let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); { - if let Some(ref mut requests) = *self.requests.lock().unwrap() { - requests.insert(id, tx); - } else { - return Err(Error::ConnectionBroken); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.insert(id, tx); + } + Err(ref e) => return Err(e.clone()), } } let packet = Packet { @@ -420,17 +401,22 @@ impl Client warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}", packet, err); - if let Some(requests) = self.requests.lock().unwrap().as_mut() { - requests.remove(&id); - } else { - warn!("Client: couldn't remove sender for request {} because reader thread \ - returned.", - id); + match *self.requests.lock().unwrap() { + Ok(ref mut requests) => { + requests.remove(&id); + return Err(err.into()); + } + Err(ref e) => return Err(e.clone()), } - return Err(err.into()); } drop(state); - Ok(try!(rx.recv())) + match rx.recv() { + Ok(msg) => Ok(msg), + Err(_) => { + let guard = self.requests.lock().unwrap(); + Err(guard.as_ref().err().unwrap().clone()) + } + } } }