From f211a4a2e7f9c09d56f003fe5a48950aad6697af Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 31 Jan 2016 20:04:35 -0800 Subject: [PATCH 1/2] Remove the struct InflightRpcs. We were previously doing a lot of accounting to make sure the server never exits before all open connection handlers. However, now that we're using scoped threads, that's taken care of by the scoped library, and we were essentially doing redundant work. --- tarpc/src/protocol/mod.rs | 5 +- tarpc/src/protocol/server.rs | 256 ++++++++++++++++------------------- 2 files changed, 118 insertions(+), 143 deletions(-) diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index d2137c6..677215b 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -38,8 +38,10 @@ impl convert::From for Error { impl convert::From for Error { fn from(err: bincode::serde::DeserializeError) -> Error { match err { - bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), + bincode::serde::DeserializeError::IoError(ref err) + if err.kind() == io::ErrorKind::ConnectionReset => Error::ConnectionBroken, bincode::serde::DeserializeError::EndOfStreamError => Error::ConnectionBroken, + bincode::serde::DeserializeError::IoError(err) => Error::Io(Arc::new(err)), err => panic!("Unexpected error during deserialization: {:?}", err), } } @@ -182,6 +184,7 @@ mod test { let serve_handle = serve_async("localhost:0", server, test_timeout()).unwrap(); let addr = serve_handle.local_addr().clone(); let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); + client.rpc(Request::Increment).unwrap(); serve_handle.shutdown(); match client.rpc(Request::Increment) { Err(super::Error::ConnectionBroken) => {} // success diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 1b65faa..871bf80 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -1,10 +1,9 @@ use bincode; use serde; -use scoped_pool::Pool; +use scoped_pool::{Pool, Scope}; use std::fmt; use std::io::{self, BufReader, BufWriter, Write}; use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; -use std::sync::{Condvar, Mutex}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; @@ -16,74 +15,63 @@ struct ConnectionHandler<'a, S> { read_stream: BufReader, write_stream: BufWriter, - shutdown: &'a AtomicBool, - inflight_rpcs: &'a InflightRpcs, server: S, - pool: &'a Pool, + shutdown: &'a AtomicBool, } -impl<'a, S> Drop for ConnectionHandler<'a, S> where S: Serve { - fn drop(&mut self) { - trace!("ConnectionHandler: finished serving client."); - self.inflight_rpcs.decrement_and_notify(); - } -} - -impl<'a, S> ConnectionHandler<'a, S> where S: Serve { - fn handle_conn(&mut self) -> Result<()> { +impl<'a, S> ConnectionHandler<'a, S> + where S: Serve +{ + fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> + { let ConnectionHandler { ref mut read_stream, ref mut write_stream, - shutdown, - ref inflight_rpcs, ref server, - pool, + shutdown, } = *self; trace!("ConnectionHandler: serving client..."); - pool.scoped(|scope| { - let (tx, rx) = channel(); - scope.execute(|| Self::write(rx, write_stream, inflight_rpcs)); - loop { - match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) { - Ok(Packet { rpc_id, message, }) => { - inflight_rpcs.increment(); - let tx = tx.clone(); - scope.execute(move || { - let reply = server.serve(message); - let reply_packet = Packet { - rpc_id: rpc_id, - message: reply - }; - tx.send(reply_packet).unwrap(); - }); - if shutdown.load(Ordering::SeqCst) { - info!("ConnectionHandler: server shutdown, so closing connection."); - break; - } - } - Err(bincode::serde::DeserializeError::IoError(ref err)) - if Self::timed_out(err.kind()) => { - if !shutdown.load(Ordering::SeqCst) { - info!("ConnectionHandler: read timed out ({:?}). Server not \ - shutdown, so retrying read.", - err); - continue; - } else { - info!("ConnectionHandler: read timed out ({:?}). Server shutdown, so \ - closing connection.", - err); - break; - } - } - Err(e) => { - warn!("ConnectionHandler: closing client connection due to {:?}", - e); - return Err(e.into()); + let (tx, rx) = channel(); + scope.execute(move || Self::write(rx, write_stream)); + loop { + match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) { + Ok(Packet { rpc_id, message, }) => { + let tx = tx.clone(); + scope.execute(move || { + let reply = server.serve(message); + let reply_packet = Packet { + rpc_id: rpc_id, + message: reply + }; + tx.send(reply_packet).unwrap(); + }); + if shutdown.load(Ordering::SeqCst) { + info!("ConnectionHandler: server shutdown, so closing connection."); + break; } } + Err(bincode::serde::DeserializeError::IoError(ref err)) + if Self::timed_out(err.kind()) => { + if !shutdown.load(Ordering::SeqCst) { + info!("ConnectionHandler: read timed out ({:?}). Server not \ + shutdown, so retrying read.", + err); + continue; + } else { + info!("ConnectionHandler: read timed out ({:?}). Server shutdown, so \ + closing connection.", + err); + break; + } + } + Err(e) => { + warn!("ConnectionHandler: closing client connection due to {:?}", + e); + return Err(e.into()); + } } - Ok(()) - }) + } + Ok(()) } fn timed_out(error_kind: io::ErrorKind) -> bool { @@ -93,9 +81,7 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve { } } - fn write(rx: Receiver::Reply>>, - stream: &mut BufWriter, - inflight_rpcs: &InflightRpcs) { + fn write(rx: Receiver::Reply>>, stream: &mut BufWriter) { loop { match rx.recv() { Err(e) => { @@ -114,51 +100,12 @@ impl<'a, S> ConnectionHandler<'a, S> where S: Serve { warn!("Writer: failed to flush reply to Client: {:?}", e); } - inflight_rpcs.decrement(); } } } } } -struct InflightRpcs { - count: Mutex, - cvar: Condvar, -} - -impl InflightRpcs { - fn new() -> InflightRpcs { - InflightRpcs { - count: Mutex::new(0), - cvar: Condvar::new(), - } - } - - fn wait_until_zero(&self) { - let mut count = self.count.lock().unwrap(); - while *count != 0 { - count = self.cvar.wait(count).unwrap(); - } - info!("serve_async: shutdown complete ({} connections alive)", - *count); - } - - fn increment(&self) { - *self.count.lock().unwrap() += 1; - } - - fn decrement(&self) { - *self.count.lock().unwrap() -= 1; - } - - - fn decrement_and_notify(&self) { - *self.count.lock().unwrap() -= 1; - self.cvar.notify_one(); - } - -} - /// Provides methods for blocking until the server completes, pub struct ServeHandle { tx: Sender<()>, @@ -190,6 +137,65 @@ impl ServeHandle { } } +struct Server<'a, S: 'a> { + server: &'a S, + listener: TcpListener, + read_timeout: Option, + die_rx: Receiver<()>, + shutdown: &'a AtomicBool, +} + +impl<'a, S: 'a> Server<'a, S> + where S: Serve + 'static +{ + fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b { + for conn in self.listener.incoming() { + match self.die_rx.try_recv() { + Ok(_) => { + info!("serve: shutdown received."); + return; + } + Err(TryRecvError::Disconnected) => { + info!("serve: sender disconnected."); + return; + } + _ => (), + } + let conn = match conn { + Err(err) => { + error!("serve: failed to accept connection: {:?}", err); + return; + } + Ok(c) => c, + }; + if let Err(err) = conn.set_read_timeout(self.read_timeout) { + info!("serve: could not set read timeout: {:?}", err); + return; + } + let mut handler = ConnectionHandler { + read_stream: BufReader::new(conn.try_clone().unwrap()), + write_stream: BufWriter::new(conn), + server: self.server, + shutdown: self.shutdown, + }; + scope.recurse(move |scope| { + scope.zoom(|scope| { + if let Err(err) = handler.handle_conn(scope) { + info!("ConnectionHandler: err in connection handling: {:?}", err); + } + }); + }); + } + } +} + +impl<'a, S> Drop for Server<'a, S> { + fn drop(&mut self) { + debug!("Shutting down connection handlers."); + self.shutdown.store(true, Ordering::SeqCst); + } +} + /// Start pub fn serve_async(addr: A, server: S, @@ -205,49 +211,15 @@ pub fn serve_async(addr: A, let join_handle = thread::spawn(move || { let pool = Pool::new(100); // TODO(tjk): make this configurable, and expire idle threads let shutdown = AtomicBool::new(false); - let inflight_rpcs = InflightRpcs::new(); + let server = Server { + server: &server, + listener: listener, + read_timeout: read_timeout, + die_rx: die_rx, + shutdown: &shutdown, + }; pool.scoped(|scope| { - for conn in listener.incoming() { - match die_rx.try_recv() { - Ok(_) => { - info!("serve_async: shutdown received. Waiting for open connections to \ - return..."); - shutdown.store(true, Ordering::SeqCst); - inflight_rpcs.wait_until_zero(); - break; - } - Err(TryRecvError::Disconnected) => { - info!("serve_async: sender disconnected."); - break; - } - _ => (), - } - let conn = match conn { - Err(err) => { - error!("serve_async: failed to accept connection: {:?}", err); - return; - } - Ok(c) => c, - }; - if let Err(err) = conn.set_read_timeout(read_timeout) { - info!("Server: could not set read timeout: {:?}", err); - return; - } - inflight_rpcs.increment(); - scope.execute(|| { - let mut handler = ConnectionHandler { - read_stream: BufReader::new(conn.try_clone().unwrap()), - write_stream: BufWriter::new(conn), - shutdown: &shutdown, - inflight_rpcs: &inflight_rpcs, - server: &server, - pool: &pool, - }; - if let Err(err) = handler.handle_conn() { - info!("ConnectionHandler: err in connection handling: {:?}", err); - } - }); - } + server.serve(scope); }); }); Ok(ServeHandle { From 42d876bf41c1572fbc961cca53d9b63e7d76e23b Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 11 Feb 2016 21:57:58 -0800 Subject: [PATCH 2/2] Don't panic if on Err from try_clone --- tarpc/src/protocol/server.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 68505f8..6d39f7d 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -27,8 +27,7 @@ struct ConnectionHandler<'a, S> impl<'a, S> ConnectionHandler<'a, S> where S: Serve { - fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> - { + fn handle_conn<'b>(&'b mut self, scope: &Scope<'b>) -> Result<()> { let ConnectionHandler { ref mut read_stream, ref mut write_stream, @@ -48,7 +47,7 @@ impl<'a, S> ConnectionHandler<'a, S> rpc_id: rpc_id, message: reply }; - tx.send(reply_packet).unwrap(); + tx.send(reply_packet).expect(pos!()); }); if shutdown.load(Ordering::SeqCst) { info!("ConnectionHandler: server shutdown, so closing connection."); @@ -161,7 +160,7 @@ impl<'a, S: 'a> Server<'a, S> return; } Err(TryRecvError::Disconnected) => { - info!("serve: sender disconnected."); + info!("serve: shutdown sender disconnected."); return; } _ => (), @@ -175,10 +174,19 @@ impl<'a, S: 'a> Server<'a, S> }; if let Err(err) = conn.set_read_timeout(self.read_timeout) { info!("serve: could not set read timeout: {:?}", err); - return; + continue; } + let read_conn = match conn.try_clone() { + Err(err) => { + error!("serve: could not clone tcp stream; possibly out of file descriptors? \ + Err: {:?}", + err); + continue; + } + Ok(conn) => conn, + }; let mut handler = ConnectionHandler { - read_stream: BufReader::new(conn.try_clone().unwrap()), + read_stream: BufReader::new(read_conn), write_stream: BufWriter::new(conn), server: self.server, shutdown: self.shutdown,