From b3ed2ef0baef8d727072407bdb4051b3eaaba286 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 14 Jan 2016 20:41:29 -0800 Subject: [PATCH] Remove all shutdown logic. Just exit and deal with it. --- tarpc/src/protocol.rs | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 2c19d4a..279bf6e 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -121,12 +121,6 @@ struct ConnectionHandler { impl Drop for ConnectionHandler { fn drop(&mut self) { - if let Err(e) = bincode::serde::serialize_into(&mut self.read_stream, - &Packet::Shutdown::<()>, - bincode::SizeLimit::Infinite) { - warn!("ConnectionHandler: could not notify client of shutdown: {:?}", - e); - } trace!("ConnectionHandler: finished serving client."); self.open_connections.decrement_and_notify(); } @@ -146,15 +140,14 @@ impl ConnectionHandler { trace!("ConnectionHandler: serving client..."); loop { match self.read() { - Ok(Packet::Shutdown) => break, - Ok(Packet::Message(id, message)) => { + Ok(Packet(id, message)) => { let f = f.clone(); let open_connections = self.open_connections.clone(); open_connections.increment(); let stream = self.write_stream.clone(); thread::spawn(move || { let reply = f.serve(message); - let reply_packet = Packet::Message(id, reply); + let reply_packet = Packet(id, reply); let mut stream = stream.lock().unwrap(); if let Err(e) = bincode::serde::serialize_into(&mut *stream, @@ -318,10 +311,7 @@ impl Serve for Arc where S: Serve } #[derive(Debug, Clone, Serialize, Deserialize)] -enum Packet { - Message(u64, T), - Shutdown, -} +struct Packet(u64, T); struct Reader { requests: Arc>>>>, @@ -335,19 +325,20 @@ impl Reader { let packet: bincode::serde::DeserializeResult> = bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite); match packet { - Ok(Packet::Message(id, reply)) => { + Ok(Packet(id, reply)) => { debug!("Client: received message, id={}", id); let mut requests = self.requests.lock().unwrap(); let mut requests = requests.as_mut().unwrap(); let reply_tx = requests.remove(&id).unwrap(); reply_tx.send(reply).unwrap(); } - Ok(Packet::Shutdown) => { - info!("Client: got shutdown message."); + // TODO: This shutdown logic is janky.. What's the right way to do this? + Err(err) => { + warn!("Client: reader thread encountered an unexpected error while parsing; \ + returning now. Error: {:?}", + err); break; } - // TODO: This shutdown logic is janky.. What's the right way to do this? - Err(err) => panic!("unexpected error while parsing!: {:?}", err), } } } @@ -421,7 +412,7 @@ impl Client return Err(Error::ConnectionBroken); } } - let packet = Packet::Message(id, request); + let packet = Packet(id, request); try!(state.stream.set_write_timeout(self.timeout)); try!(state.stream.set_read_timeout(self.timeout)); debug!("Client: calling rpc({:?})", request); @@ -449,14 +440,8 @@ impl Drop for Client where Request: serde::ser::Serialize { fn drop(&mut self) { - { - let mut state = self.synced_state.lock().unwrap(); - let packet: Packet = Packet::Shutdown; - if let Err(err) = bincode::serde::serialize_into(&mut state.stream, - &packet, - bincode::SizeLimit::Infinite) { - warn!("While disconnecting client from server: {:?}", err); - } + if let Err(e) = self.synced_state.lock().unwrap().stream.shutdown(::std::net::Shutdown::Both) { + warn!("Client: couldn't shutdown reader thread: {:?}", e); } self.reader_guard.take().unwrap().join().unwrap(); }