From 8c51d2ca1bc0508e92c0706eeaec2ccc8a15b63d Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 14 Jan 2016 01:55:25 -0800 Subject: [PATCH] Cargo fmt --- tarpc/src/lib.rs | 3 ++- tarpc/src/protocol.rs | 49 +++++++++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 8bf82b5..05823a0 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -31,7 +31,8 @@ //! //! fn main() { //! let addr = "127.0.0.1:9000"; -//! let shutdown = my_server::serve(addr, (), Some(Duration::from_secs(30))).unwrap(); +//! let shutdown = my_server::serve(addr, (), +//! Some(Duration::from_secs(30))).unwrap(); //! let client = Client::new(addr, None).unwrap(); //! assert_eq!(3, client.add(1, 2).unwrap()); //! assert_eq!("Hello, Mom!".to_string(), diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 0e1cab7..c4a4378 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -117,17 +117,20 @@ impl ConnectionHandler { bincode::serde::serialize_into(&mut *my_stream, &reply_packet, bincode::SizeLimit::Infinite) - .unwrap(); + .unwrap(); }); } Err(bincode::serde::DeserializeError::IoError(ref err)) - if Self::timed_out(err.kind()) => - { + if Self::timed_out(err.kind()) => { if !self.shutdown() { - warn!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so retrying read.", err); + warn!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \ + retrying read.", + err); continue; } else { - warn!("ConnectionHandler: read timed out ({:?}). Server shutdown, so closing connection.", err); + warn!("ConnectionHandler: read timed out ({:?}). Server shutdown, so \ + closing connection.", + err); let mut stream = stream.lock().unwrap(); try!(bincode::serde::serialize_into(&mut *stream, &Packet::Shutdown::, @@ -136,7 +139,9 @@ impl ConnectionHandler { } } Err(e) => { - warn!("ConnectionHandler: closing client connection due to error while serving: {:?}", e); + warn!("ConnectionHandler: closing client connection due to error while \ + serving: {:?}", + e); return Err(e.into()); } } @@ -188,7 +193,10 @@ impl ServeHandle { } /// Start -pub fn serve_async(addr: A, f: F, read_timeout: Option) -> io::Result +pub fn serve_async(addr: A, + f: F, + read_timeout: Option) + -> io::Result where A: ToSocketAddrs, Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize, Reply: 'static + fmt::Debug + serde::ser::Serialize, @@ -204,14 +212,16 @@ pub fn serve_async(addr: A, f: F, read_timeout: Option { - info!("serve_async: shutdown received. Waiting for open connections to return..."); + info!("serve_async: shutdown received. Waiting for open connections to \ + return..."); shutdown.store(true, Ordering::SeqCst); let &(ref count, ref cvar) = &*open_connections; let mut count = count.lock().unwrap(); while *count != 0 { count = cvar.wait(count).unwrap(); } - info!("serve_async: shutdown complete ({} connections alive)", *count); + info!("serve_async: shutdown complete ({} connections alive)", + *count); break; } Err(TryRecvError::Disconnected) => { @@ -272,11 +282,13 @@ enum Packet { } struct Reader { - requests: Arc>>>> + requests: Arc>>>>, } impl Reader { - fn read(self, mut stream: TcpStream) where Reply: serde::Deserialize { + fn read(self, mut stream: TcpStream) + where Reply: serde::Deserialize + { loop { let packet: bincode::serde::DeserializeResult> = bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite); @@ -380,7 +392,9 @@ impl Client if let Some(requests) = self.requests.lock().unwrap().as_mut() { requests.remove(&id); } else { - warn!("Client: couldn't remove sender for request {} because reader thread returned.", id); + warn!("Client: couldn't remove sender for request {} because reader thread \ + returned.", + id); } return Err(err.into()); } @@ -513,10 +527,12 @@ mod test { let serve_handle = serve_async("0.0.0.0:0", server, Some(Duration::new(0, 10))).unwrap(); let addr = serve_handle.local_addr().clone(); let client: Arc> = Arc::new(Client::new(addr, test_timeout()) - .unwrap()); + .unwrap()); let thread = thread::spawn(move || serve_handle.shutdown()); - info!("force_shutdown:: rpc1: {:?}", client.rpc(&Request::Increment)); - info!("force_shutdown:: rpc2: {:?}", client.rpc(&Request::Increment)); + info!("force_shutdown:: rpc1: {:?}", + client.rpc(&Request::Increment)); + info!("force_shutdown:: rpc2: {:?}", + client.rpc(&Request::Increment)); thread.join().unwrap(); } @@ -526,7 +542,8 @@ mod test { let server = Arc::new(BarrierServer::new(10)); let serve_handle = serve_async("0.0.0.0:0", server.clone(), test_timeout()).unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr, test_timeout()).unwrap()); + let client: Arc> = Arc::new(Client::new(addr, test_timeout()) + .unwrap()); let mut join_handles = vec![]; for _ in 0..10 { let my_client = client.clone();