From 5d8d04d52121c0181d2c6d5b343755c8422d474f Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 31 Jan 2016 22:05:04 -0800 Subject: [PATCH] Use expect() instead of unwrap() --- tarpc/src/lib.rs | 4 ++++ tarpc/src/protocol/client.rs | 23 +++++++++++++---------- tarpc/src/protocol/server.rs | 10 +++++----- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index a8e6486..e57bc8b 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -64,6 +64,10 @@ extern crate test; #[macro_use] extern crate lazy_static; +macro_rules! pos { + () => (concat!(file!(), ":", line!())) +} + /// Provides the tarpc client and server, which implements the tarpc protocol. /// The protocol is defined by the implementation. pub mod protocol; diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index ee895e9..4a83ecb 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -62,7 +62,7 @@ impl Client where Request: serde::ser::Serialize + fmt::Debug + Send + 'static { let (tx, rx) = channel(); - self.outbound.send((request, tx)).unwrap(); + self.outbound.send((request, tx)).expect(pos!()); rx } @@ -72,7 +72,7 @@ impl Client { self.rpc_internal(request) .recv() - .map_err(|_| self.requests.lock().unwrap().get_error()) + .map_err(|_| self.requests.lock().expect(pos!()).get_error()) .and_then(|reply| reply) } @@ -100,7 +100,10 @@ impl Drop for Client // We only join if we know the TcpStream was shut down. Otherwise we might never // finish. debug!("Joining writer and reader."); - reader_guard.take().unwrap().join().unwrap(); + reader_guard.take() + .expect(pos!()) + .join() + .expect(pos!()); debug!("Successfully joined writer and reader."); } } @@ -118,7 +121,7 @@ impl Future { pub fn get(self) -> Result { let requests = self.requests; self.rx.recv() - .map_err(|_| requests.lock().unwrap().get_error()) + .map_err(|_| requests.lock().expect(pos!()).get_error()) .and_then(|reply| reply) } } @@ -151,7 +154,7 @@ impl RpcFutures { } fn complete_reply(&mut self, id: u64, reply: Reply) { - if let Some(tx) = self.0.as_mut().unwrap().remove(&id) { + if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&id) { if let Err(e) = tx.send(Ok(reply)) { info!("Reader: could not complete reply: {:?}", e); } @@ -165,7 +168,7 @@ impl RpcFutures { } fn get_error(&self) -> Error { - self.0.as_ref().err().unwrap().clone() + self.0.as_ref().err().expect(pos!()).clone() } } @@ -185,7 +188,7 @@ fn write(outbound: Receiver<(Request, Sender>)>, } Ok(request) => request, }; - if let Err(e) = requests.lock().unwrap().insert_tx(next_id, tx.clone()) { + if let Err(e) = requests.lock().expect(pos!()).insert_tx(next_id, tx.clone()) { report_error(&tx, e); // Once insert_tx returns Err, it will continue to do so. However, continue here so // that any other clients who sent requests will also recv the Err. @@ -205,7 +208,7 @@ fn write(outbound: Receiver<(Request, Sender>)>, // Typically we'd want to notify the client of any Err returned by remove_tx, but in // this case the client already hit an Err, and doesn't need to know about this one, as // well. - let _ = requests.lock().unwrap().remove_tx(id); + let _ = requests.lock().expect(pos!()).remove_tx(id); continue; } if let Err(e) = stream.flush() { @@ -240,13 +243,13 @@ fn read(requests: Arc>>, stream: TcpStream) message: reply }) => { debug!("Client: received message, id={}", id); - requests.lock().unwrap().complete_reply(id, reply); + requests.lock().expect(pos!()).complete_reply(id, reply); } Err(err) => { warn!("Client: reader thread encountered an unexpected error while parsing; \ returning now. Error: {:?}", err); - requests.lock().unwrap().set_error(err); + requests.lock().expect(pos!()).set_error(err); break; } } diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index ce165d1..15eb068 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -51,7 +51,7 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve { rpc_id: rpc_id, message: reply }; - let mut write_stream = write_stream.lock().unwrap(); + let mut write_stream = write_stream.lock().expect(pos!()); if let Err(e) = bincode::serde::serialize_into(&mut *write_stream, &reply_packet, @@ -151,7 +151,7 @@ pub struct ServeHandle { impl ServeHandle { /// Block until the server completes pub fn wait(self) { - self.join_handle.join().unwrap(); + self.join_handle.join().expect(pos!()); } /// Returns the address the server is bound to @@ -163,9 +163,9 @@ impl ServeHandle { /// gracefully close open connections. pub fn shutdown(self) { info!("ServeHandle: attempting to shut down the server."); - self.tx.send(()).unwrap(); + self.tx.send(()).expect(pos!()); if let Ok(_) = TcpStream::connect(self.addr) { - self.join_handle.join().unwrap(); + self.join_handle.join().expect(pos!()); } else { warn!("ServeHandle: best effort shutdown of serve thread failed"); } @@ -218,7 +218,7 @@ pub fn serve_async(addr: A, inflight_rpcs.increment(); scope.execute(|| { let mut handler = ConnectionHandler { - read_stream: BufReader::new(conn.try_clone().unwrap()), + read_stream: BufReader::new(conn.try_clone().expect(pos!())), write_stream: Mutex::new(BufWriter::new(conn)), shutdown: &shutdown, inflight_rpcs: &inflight_rpcs,