From 39706467ab406be4cbcd7898e542702e2f579d33 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 25 Jan 2016 20:48:26 -0800 Subject: [PATCH 1/4] Factor out HashMap in reader into its own struct. Precipitated by the clippy warning about complex types, but definitely makes the code more readable, as well. --- tarpc/src/protocol.rs | 84 +++++++++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 6eeaa47..9288a1b 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -304,8 +304,55 @@ struct Packet { message: T, } +struct RpcFutures(Result>>); + +impl RpcFutures { + fn new() -> RpcFutures { + RpcFutures(Ok(HashMap::new())) + } + + fn insert_tx(&mut self, id: u64, tx: Sender) -> Result<()> { + match self.0 { + Ok(ref mut requests) => { + requests.insert(id, tx); + Ok(()) + } + Err(ref e) => Err(e.clone()), + } + } + + fn remove_tx(&mut self, id: u64) -> Result<()> { + match self.0 { + Ok(ref mut requests) => { + requests.remove(&id); + Ok(()) + } + Err(ref e) => Err(e.clone()), + } + } + + fn complete_reply(&mut self, id: u64, reply: Reply) { + self.0 + .as_mut() + .unwrap() + .remove(&id) + .unwrap() + .send(reply) + .unwrap(); + } + + fn set_error(&mut self, err: bincode::serde::DeserializeError) { + let map = mem::replace(&mut self.0, Err(err.into())); + map.unwrap().clear(); + } + + fn get_error(&self) -> Error { + self.0.as_ref().err().unwrap().clone() + } +} + struct Reader { - requests: Arc>>>>, + requests: Arc>>, } impl Reader { @@ -321,18 +368,13 @@ impl Reader { message: reply }) => { debug!("Client: received message, id={}", id); - let mut requests = self.requests.lock().unwrap(); - let mut requests = requests.as_mut().unwrap(); - let reply_tx = requests.remove(&id).unwrap(); - reply_tx.send(reply).unwrap(); + self.requests.lock().unwrap().complete_reply(id, reply); } Err(err) => { warn!("Client: reader thread encountered an unexpected error while parsing; \ returning now. Error: {:?}", err); - let mut guard = self.requests.lock().unwrap(); - let map = mem::replace(&mut *guard, Err(err.into())); - map.unwrap().clear(); + self.requests.lock().unwrap().set_error(err); break; } } @@ -356,7 +398,7 @@ pub struct Client where Request: serde::ser::Serialize { synced_state: Mutex, - requests: Arc>>>>, + requests: Arc>>, reader_guard: Option>, timeout: Option, _request: PhantomData, @@ -370,7 +412,7 @@ impl Client /// for both reads and writes. pub fn new(addr: A, timeout: Option) -> io::Result { let stream = try!(TcpStream::connect(addr)); - let requests = Arc::new(Mutex::new(Ok(HashMap::new()))); + let requests = Arc::new(Mutex::new(RpcFutures::new())); let reader_stream = try!(stream.try_clone()); let reader = Reader { requests: requests.clone() }; let reader_guard = thread::spawn(move || reader.read(reader_stream)); @@ -393,12 +435,9 @@ impl Client let (tx, rx) = channel(); let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); - { - match *self.requests.lock().unwrap() { - Ok(ref mut requests) => { - requests.insert(id, tx); - } - Err(ref e) => return Err(e.clone()), + { // block required to drop lock asap + if let Err(e) = self.requests.lock().unwrap().insert_tx(id, tx) { + return Err(e); } } let packet = Packet { @@ -414,21 +453,14 @@ impl Client warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}", packet, err); - match *self.requests.lock().unwrap() { - Ok(ref mut requests) => { - requests.remove(&id); - return Err(err.into()); - } - Err(ref e) => return Err(e.clone()), + if let Err(e) = self.requests.lock().unwrap().remove_tx(id) { + return Err(e); } } drop(state); match rx.recv() { Ok(msg) => Ok(msg), - Err(_) => { - let guard = self.requests.lock().unwrap(); - Err(guard.as_ref().err().unwrap().clone()) - } + Err(_) => Err(self.requests.lock().unwrap().get_error()), } } } From 57c058dfbc47051fec35bf420a02254a39d46021 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 25 Jan 2016 22:50:23 -0800 Subject: [PATCH 2/4] Use try! where applicable --- tarpc/src/protocol.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 21c9cab..2aeff12 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -444,9 +444,7 @@ impl Client let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); { // block required to drop lock asap - if let Err(e) = self.requests.lock().unwrap().insert_tx(id, tx) { - return Err(e); - } + try!(self.requests.lock().unwrap().insert_tx(id, tx)); } let packet = Packet { rpc_id: id, @@ -461,9 +459,7 @@ impl Client warn!("Client: failed to write packet.\nPacket: {:?}\nError: {:?}", packet, err); - if let Err(e) = self.requests.lock().unwrap().remove_tx(id) { - return Err(e); - } + try!(self.requests.lock().unwrap().remove_tx(id)); } drop(state); match rx.recv() { From 72ab27713d74cebfc4ffa609a145dc7f3c5c309d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 25 Jan 2016 23:01:23 -0800 Subject: [PATCH 3/4] Remove superfluous block --- tarpc/src/protocol.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 2aeff12..d330f51 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -443,9 +443,7 @@ impl Client let (tx, rx) = channel(); let mut state = self.synced_state.lock().unwrap(); let id = increment(&mut state.next_id); - { // block required to drop lock asap - try!(self.requests.lock().unwrap().insert_tx(id, tx)); - } + try!(self.requests.lock().unwrap().insert_tx(id, tx)); let packet = Packet { rpc_id: id, message: request, From 0966b2c823778f5401bd3dceee6d085855f895f1 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 25 Jan 2016 23:09:14 -0800 Subject: [PATCH 4/4] Remove unnecessary call to HashMap::clear --- tarpc/src/protocol.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index d330f51..f984c48 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -350,8 +350,7 @@ impl RpcFutures { } fn set_error(&mut self, err: bincode::serde::DeserializeError) { - let map = mem::replace(&mut self.0, Err(err.into())); - map.unwrap().clear(); + let _ = mem::replace(&mut self.0, Err(err.into())); } fn get_error(&self) -> Error {