From 5d50ccfb6ff71ccc5f1e3e97d1887e4be710a19a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 25 Jan 2016 21:35:47 -0800 Subject: [PATCH] Make read_stream a field of ConnectionHandler --- tarpc/src/protocol.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 6eeaa47..7f94d5c 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -98,6 +98,7 @@ impl InflightRpcs { } struct ConnectionHandler<'a> { + read_stream: TcpStream, write_stream: Mutex, shutdown: &'a AtomicBool, inflight_rpcs: &'a InflightRpcs, @@ -121,27 +122,32 @@ impl<'a> ConnectionHandler<'a> { bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) } - fn handle_conn(&mut self, mut read_stream: TcpStream, f: F) -> Result<()> + fn handle_conn(&mut self, f: F) -> Result<()> where F: Serve { + let ConnectionHandler { + ref mut read_stream, + ref write_stream, + shutdown, + inflight_rpcs, + timeout + } = *self; let f = &f; trace!("ConnectionHandler: serving client..."); crossbeam::scope(|scope| { loop { - match Self::read(&mut read_stream, self.timeout) { + match Self::read(read_stream, timeout) { Ok(Packet { rpc_id, message, }) => { - let inflight_rpcs = &self.inflight_rpcs; inflight_rpcs.increment(); - let stream = &self.write_stream; scope.spawn(move || { let reply = f.serve(message); let reply_packet = Packet { rpc_id: rpc_id, message: reply }; - let mut stream = stream.lock().unwrap(); + let mut write_stream = write_stream.lock().unwrap(); if let Err(e) = - bincode::serde::serialize_into(&mut *stream, + bincode::serde::serialize_into(&mut *write_stream, &reply_packet, bincode::SizeLimit::Infinite) { warn!("ConnectionHandler: failed to write reply to Client: {:?}", @@ -149,14 +155,14 @@ impl<'a> ConnectionHandler<'a> { } inflight_rpcs.decrement(); }); - if self.shutdown.load(Ordering::SeqCst) { + if shutdown.load(Ordering::SeqCst) { info!("ConnectionHandler: server shutdown, so closing connection."); break; } } Err(bincode::serde::DeserializeError::IoError(ref err)) if Self::timed_out(err.kind()) => { - if !self.shutdown.load(Ordering::SeqCst) { + if !shutdown.load(Ordering::SeqCst) { info!("ConnectionHandler: read timed out ({:?}). Server not \ shutdown, so retrying read.", err); @@ -256,12 +262,13 @@ pub fn serve_async(addr: A, f: F, read_timeout: Option) -> io::R inflight_rpcs.increment(); scope.spawn(|| { let mut handler = ConnectionHandler { - write_stream: Mutex::new(conn.try_clone().unwrap()), + read_stream: conn.try_clone().unwrap(), + write_stream: Mutex::new(conn), shutdown: &shutdown, inflight_rpcs: &inflight_rpcs, timeout: read_timeout, }; - if let Err(err) = handler.handle_conn(conn, &f) { + if let Err(err) = handler.handle_conn(&f) { info!("ConnectionHandler: err in connection handling: {:?}", err); } });