mirror of
https://github.com/OMGeeky/tarpc.git
synced 2026-01-04 10:32:24 +01:00
Use expect() instead of unwrap()
This commit is contained in:
@@ -64,6 +64,10 @@ extern crate test;
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
macro_rules! pos {
|
||||
() => (concat!(file!(), ":", line!()))
|
||||
}
|
||||
|
||||
/// Provides the tarpc client and server, which implements the tarpc protocol.
|
||||
/// The protocol is defined by the implementation.
|
||||
pub mod protocol;
|
||||
|
||||
@@ -62,7 +62,7 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
where Request: serde::ser::Serialize + fmt::Debug + Send + 'static
|
||||
{
|
||||
let (tx, rx) = channel();
|
||||
self.outbound.send((request, tx)).unwrap();
|
||||
self.outbound.send((request, tx)).expect(pos!());
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
{
|
||||
self.rpc_internal(request)
|
||||
.recv()
|
||||
.map_err(|_| self.requests.lock().unwrap().get_error())
|
||||
.map_err(|_| self.requests.lock().expect(pos!()).get_error())
|
||||
.and_then(|reply| reply)
|
||||
}
|
||||
|
||||
@@ -100,7 +100,10 @@ impl<Request, Reply> Drop for Client<Request, Reply>
|
||||
// We only join if we know the TcpStream was shut down. Otherwise we might never
|
||||
// finish.
|
||||
debug!("Joining writer and reader.");
|
||||
reader_guard.take().unwrap().join().unwrap();
|
||||
reader_guard.take()
|
||||
.expect(pos!())
|
||||
.join()
|
||||
.expect(pos!());
|
||||
debug!("Successfully joined writer and reader.");
|
||||
}
|
||||
}
|
||||
@@ -118,7 +121,7 @@ impl<T> Future<T> {
|
||||
pub fn get(self) -> Result<T> {
|
||||
let requests = self.requests;
|
||||
self.rx.recv()
|
||||
.map_err(|_| requests.lock().unwrap().get_error())
|
||||
.map_err(|_| requests.lock().expect(pos!()).get_error())
|
||||
.and_then(|reply| reply)
|
||||
}
|
||||
}
|
||||
@@ -151,7 +154,7 @@ impl<Reply> RpcFutures<Reply> {
|
||||
}
|
||||
|
||||
fn complete_reply(&mut self, id: u64, reply: Reply) {
|
||||
if let Some(tx) = self.0.as_mut().unwrap().remove(&id) {
|
||||
if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&id) {
|
||||
if let Err(e) = tx.send(Ok(reply)) {
|
||||
info!("Reader: could not complete reply: {:?}", e);
|
||||
}
|
||||
@@ -165,7 +168,7 @@ impl<Reply> RpcFutures<Reply> {
|
||||
}
|
||||
|
||||
fn get_error(&self) -> Error {
|
||||
self.0.as_ref().err().unwrap().clone()
|
||||
self.0.as_ref().err().expect(pos!()).clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,7 +188,7 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
}
|
||||
Ok(request) => request,
|
||||
};
|
||||
if let Err(e) = requests.lock().unwrap().insert_tx(next_id, tx.clone()) {
|
||||
if let Err(e) = requests.lock().expect(pos!()).insert_tx(next_id, tx.clone()) {
|
||||
report_error(&tx, e);
|
||||
// Once insert_tx returns Err, it will continue to do so. However, continue here so
|
||||
// that any other clients who sent requests will also recv the Err.
|
||||
@@ -205,7 +208,7 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
|
||||
// Typically we'd want to notify the client of any Err returned by remove_tx, but in
|
||||
// this case the client already hit an Err, and doesn't need to know about this one, as
|
||||
// well.
|
||||
let _ = requests.lock().unwrap().remove_tx(id);
|
||||
let _ = requests.lock().expect(pos!()).remove_tx(id);
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = stream.flush() {
|
||||
@@ -240,13 +243,13 @@ fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
|
||||
message: reply
|
||||
}) => {
|
||||
debug!("Client: received message, id={}", id);
|
||||
requests.lock().unwrap().complete_reply(id, reply);
|
||||
requests.lock().expect(pos!()).complete_reply(id, reply);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Client: reader thread encountered an unexpected error while parsing; \
|
||||
returning now. Error: {:?}",
|
||||
err);
|
||||
requests.lock().unwrap().set_error(err);
|
||||
requests.lock().expect(pos!()).set_error(err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve {
|
||||
rpc_id: rpc_id,
|
||||
message: reply
|
||||
};
|
||||
let mut write_stream = write_stream.lock().unwrap();
|
||||
let mut write_stream = write_stream.lock().expect(pos!());
|
||||
if let Err(e) =
|
||||
bincode::serde::serialize_into(&mut *write_stream,
|
||||
&reply_packet,
|
||||
@@ -151,7 +151,7 @@ pub struct ServeHandle {
|
||||
impl ServeHandle {
|
||||
/// Block until the server completes
|
||||
pub fn wait(self) {
|
||||
self.join_handle.join().unwrap();
|
||||
self.join_handle.join().expect(pos!());
|
||||
}
|
||||
|
||||
/// Returns the address the server is bound to
|
||||
@@ -163,9 +163,9 @@ impl ServeHandle {
|
||||
/// gracefully close open connections.
|
||||
pub fn shutdown(self) {
|
||||
info!("ServeHandle: attempting to shut down the server.");
|
||||
self.tx.send(()).unwrap();
|
||||
self.tx.send(()).expect(pos!());
|
||||
if let Ok(_) = TcpStream::connect(self.addr) {
|
||||
self.join_handle.join().unwrap();
|
||||
self.join_handle.join().expect(pos!());
|
||||
} else {
|
||||
warn!("ServeHandle: best effort shutdown of serve thread failed");
|
||||
}
|
||||
@@ -218,7 +218,7 @@ pub fn serve_async<A, S>(addr: A,
|
||||
inflight_rpcs.increment();
|
||||
scope.execute(|| {
|
||||
let mut handler = ConnectionHandler {
|
||||
read_stream: BufReader::new(conn.try_clone().unwrap()),
|
||||
read_stream: BufReader::new(conn.try_clone().expect(pos!())),
|
||||
write_stream: Mutex::new(BufWriter::new(conn)),
|
||||
shutdown: &shutdown,
|
||||
inflight_rpcs: &inflight_rpcs,
|
||||
|
||||
Reference in New Issue
Block a user