Get rid of Error::{Sender, Receiver, Serializer, Deserializer}

This commit is contained in:
Tim Kuehn
2016-01-15 21:52:25 -08:00
parent e74058dbde
commit d4a760e7c8
3 changed files with 37 additions and 80 deletions

View File

@@ -54,9 +54,6 @@ extern crate bincode;
#[macro_use]
extern crate log;
use std::io;
use std::convert::From;
/// Provides the tarpc client and server, which implements the tarpc protocol.
/// The protocol is defined by the implementation.
pub mod protocol;
@@ -64,30 +61,4 @@ pub mod protocol;
/// Provides the macro used for constructing rpc services and client stubs.
pub mod macros;
/// An error that occurred while processing an RPC request
#[derive(Debug)]
pub enum Error {
#[doc="An IO error occurred."]
Io(io::Error),
#[doc="An unexpected internal error. Typically a bug in the server impl."]
InternalError,
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl From<protocol::Error> for Error {
fn from(err: protocol::Error) -> Error {
match err {
protocol::Error::Io(err) => Error::Io(err),
_ => Error::InternalError,
}
}
}
///The result of an RPC call; either the successful result or the error
pub type Result<T> = ::std::result::Result<T, Error>;
pub use protocol::{Error, Result};

View File

@@ -19,7 +19,7 @@ macro_rules! request_fns {
if let __Reply::$fn_name(reply) = reply {
Ok(reply)
} else {
Err($crate::Error::InternalError)
panic!("Unexpected reply to {}: {:?}", stringify!($fn_name), reply);
}
}
)*);

View File

@@ -5,26 +5,19 @@ use std::io::{self, Read};
use std::convert;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs};
use std::sync::{self, Arc, Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::mpsc::{channel, Sender, TryRecvError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
/// Client errors that can occur during rpc calls
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Error {
/// An IO-related error
Io(io::Error),
/// An error in serialization
Serialize(bincode::serde::SerializeError),
/// An error in deserialization
Deserialize(bincode::serde::DeserializeError),
/// An internal message failed to be received.
/// Channels are used for the client's inter-thread communication. This message is
/// propagated if the sender unexpectedly hangs up.
Receiver,
Io(Arc<io::Error>),
/// The server hung up.
ConnectionBroken,
}
@@ -32,8 +25,8 @@ pub enum Error {
impl convert::From<bincode::serde::SerializeError> for Error {
fn from(err: bincode::serde::SerializeError) -> Error {
match err {
bincode::serde::SerializeError::IoError(err) => Error::Io(err),
err => Error::Serialize(err),
bincode::serde::SerializeError::IoError(err) => Error::Io(Arc::new(err)),
err => panic!("Unexpected error during serialization: {:?}", err),
}
}
}
@@ -41,21 +34,16 @@ impl convert::From<bincode::serde::SerializeError> for Error {
impl convert::From<bincode::serde::DeserializeError> for Error {
fn from(err: bincode::serde::DeserializeError) -> Error {
match err {
bincode::serde::DeserializeError::IoError(err) => Error::Io(err),
err => Error::Deserialize(err),
bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)),
bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken,
err => panic!("Unexpected error during deserialization: {:?}", err),
}
}
}
impl convert::From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl convert::From<sync::mpsc::RecvError> for Error {
fn from(_: sync::mpsc::RecvError) -> Error {
Error::Receiver
Error::Io(Arc::new(err))
}
}
@@ -171,9 +159,7 @@ impl ConnectionHandler {
}
}
Err(e) => {
warn!("ConnectionHandler: closing client connection due to error while \
serving: {:?}",
e);
warn!("ConnectionHandler: closing client connection due to {:?}", e);
return Err(e.into());
}
}
@@ -307,7 +293,7 @@ struct Packet<T> {
}
struct Reader<Reply> {
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
}
impl<Reply> Reader<Reply> {
@@ -332,6 +318,9 @@ impl<Reply> Reader<Reply> {
warn!("Client: reader thread encountered an unexpected error while parsing; \
returning now. Error: {:?}",
err);
let mut guard = self.requests.lock().unwrap();
let map = mem::replace(&mut *guard, Err(err.into()));
map.unwrap().clear();
break;
}
}
@@ -339,15 +328,6 @@ impl<Reply> Reader<Reply> {
}
}
impl<Reply> Drop for Reader<Reply> {
fn drop(&mut self) {
let mut guard = self.requests.lock().unwrap();
guard.as_mut().unwrap().clear();
// remove the hashmap so no one can put more senders and accidentally block
guard.take();
}
}
fn increment(cur_id: &mut u64) -> u64 {
let id = *cur_id;
*cur_id += 1;
@@ -364,7 +344,7 @@ pub struct Client<Request, Reply>
where Request: serde::ser::Serialize
{
synced_state: Mutex<SyncedClientState>,
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
requests: Arc<Mutex<Result<HashMap<u64, Sender<Reply>>>>>,
reader_guard: Option<thread::JoinHandle<()>>,
timeout: Option<Duration>,
_request: PhantomData<Request>,
@@ -377,7 +357,7 @@ impl<Request, Reply> Client<Request, Reply>
/// Create a new client that connects to `addr`
pub fn new<A: ToSocketAddrs>(addr: A, timeout: Option<Duration>) -> io::Result<Self> {
let stream = try!(TcpStream::connect(addr));
let requests = Arc::new(Mutex::new(Some(HashMap::new())));
let requests = Arc::new(Mutex::new(Ok(HashMap::new())));
let reader_stream = try!(stream.try_clone());
let reader = Reader { requests: requests.clone() };
let reader_guard = thread::spawn(move || reader.read(reader_stream));
@@ -401,10 +381,11 @@ impl<Request, Reply> Client<Request, Reply>
let mut state = self.synced_state.lock().unwrap();
let id = increment(&mut state.next_id);
{
if let Some(ref mut requests) = *self.requests.lock().unwrap() {
requests.insert(id, tx);
} else {
return Err(Error::ConnectionBroken);
match *self.requests.lock().unwrap() {
Ok(ref mut requests) => {
requests.insert(id, tx);
}
Err(ref e) => return Err(e.clone()),
}
}
let packet = Packet {
@@ -420,17 +401,22 @@ impl<Request, Reply> Client<Request, Reply>
warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}",
packet,
err);
if let Some(requests) = self.requests.lock().unwrap().as_mut() {
requests.remove(&id);
} else {
warn!("Client: couldn't remove sender for request {} because reader thread \
returned.",
id);
match *self.requests.lock().unwrap() {
Ok(ref mut requests) => {
requests.remove(&id);
return Err(err.into());
}
Err(ref e) => return Err(e.clone()),
}
return Err(err.into());
}
drop(state);
Ok(try!(rx.recv()))
match rx.recv() {
Ok(msg) => Ok(msg),
Err(_) => {
let guard = self.requests.lock().unwrap();
Err(guard.as_ref().err().unwrap().clone())
}
}
}
}