mirror of
https://github.com/OMGeeky/tarpc.git
synced 2025-12-28 23:27:25 +01:00
Remove all shutdown logic. Just exit and deal with it.
This commit is contained in:
@@ -121,12 +121,6 @@ struct ConnectionHandler {
|
||||
|
||||
impl Drop for ConnectionHandler {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = bincode::serde::serialize_into(&mut self.read_stream,
|
||||
&Packet::Shutdown::<()>,
|
||||
bincode::SizeLimit::Infinite) {
|
||||
warn!("ConnectionHandler: could not notify client of shutdown: {:?}",
|
||||
e);
|
||||
}
|
||||
trace!("ConnectionHandler: finished serving client.");
|
||||
self.open_connections.decrement_and_notify();
|
||||
}
|
||||
@@ -146,15 +140,14 @@ impl ConnectionHandler {
|
||||
trace!("ConnectionHandler: serving client...");
|
||||
loop {
|
||||
match self.read() {
|
||||
Ok(Packet::Shutdown) => break,
|
||||
Ok(Packet::Message(id, message)) => {
|
||||
Ok(Packet(id, message)) => {
|
||||
let f = f.clone();
|
||||
let open_connections = self.open_connections.clone();
|
||||
open_connections.increment();
|
||||
let stream = self.write_stream.clone();
|
||||
thread::spawn(move || {
|
||||
let reply = f.serve(message);
|
||||
let reply_packet = Packet::Message(id, reply);
|
||||
let reply_packet = Packet(id, reply);
|
||||
let mut stream = stream.lock().unwrap();
|
||||
if let Err(e) =
|
||||
bincode::serde::serialize_into(&mut *stream,
|
||||
@@ -318,10 +311,7 @@ impl<S> Serve for Arc<S> where S: Serve
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum Packet<T> {
|
||||
Message(u64, T),
|
||||
Shutdown,
|
||||
}
|
||||
struct Packet<T>(u64, T);
|
||||
|
||||
struct Reader<Reply> {
|
||||
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
|
||||
@@ -335,19 +325,20 @@ impl<Reply> Reader<Reply> {
|
||||
let packet: bincode::serde::DeserializeResult<Packet<Reply>> =
|
||||
bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite);
|
||||
match packet {
|
||||
Ok(Packet::Message(id, reply)) => {
|
||||
Ok(Packet(id, reply)) => {
|
||||
debug!("Client: received message, id={}", id);
|
||||
let mut requests = self.requests.lock().unwrap();
|
||||
let mut requests = requests.as_mut().unwrap();
|
||||
let reply_tx = requests.remove(&id).unwrap();
|
||||
reply_tx.send(reply).unwrap();
|
||||
}
|
||||
Ok(Packet::Shutdown) => {
|
||||
info!("Client: got shutdown message.");
|
||||
// TODO: This shutdown logic is janky.. What's the right way to do this?
|
||||
Err(err) => {
|
||||
warn!("Client: reader thread encountered an unexpected error while parsing; \
|
||||
returning now. Error: {:?}",
|
||||
err);
|
||||
break;
|
||||
}
|
||||
// TODO: This shutdown logic is janky.. What's the right way to do this?
|
||||
Err(err) => panic!("unexpected error while parsing!: {:?}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -421,7 +412,7 @@ impl<Request, Reply> Client<Request, Reply>
|
||||
return Err(Error::ConnectionBroken);
|
||||
}
|
||||
}
|
||||
let packet = Packet::Message(id, request);
|
||||
let packet = Packet(id, request);
|
||||
try!(state.stream.set_write_timeout(self.timeout));
|
||||
try!(state.stream.set_read_timeout(self.timeout));
|
||||
debug!("Client: calling rpc({:?})", request);
|
||||
@@ -449,14 +440,8 @@ impl<Request, Reply> Drop for Client<Request, Reply>
|
||||
where Request: serde::ser::Serialize
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
{
|
||||
let mut state = self.synced_state.lock().unwrap();
|
||||
let packet: Packet<Request> = Packet::Shutdown;
|
||||
if let Err(err) = bincode::serde::serialize_into(&mut state.stream,
|
||||
&packet,
|
||||
bincode::SizeLimit::Infinite) {
|
||||
warn!("While disconnecting client from server: {:?}", err);
|
||||
}
|
||||
if let Err(e) = self.synced_state.lock().unwrap().stream.shutdown(::std::net::Shutdown::Both) {
|
||||
warn!("Client: couldn't shutdown reader thread: {:?}", e);
|
||||
}
|
||||
self.reader_guard.take().unwrap().join().unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user