diff --git a/tarpc/src/protocol.rs b/tarpc/src/protocol.rs index 3e54e7d..96424ea 100644 --- a/tarpc/src/protocol.rs +++ b/tarpc/src/protocol.rs @@ -79,9 +79,7 @@ struct OpenConnections { impl OpenConnections { fn new(mutex: Mutex, cvar: Condvar) -> OpenConnections { - OpenConnections { - open_connections: Arc::new((mutex, cvar)), - } + OpenConnections { open_connections: Arc::new((mutex, cvar)) } } fn wait_until_zero(&self) { @@ -90,7 +88,8 @@ impl OpenConnections { while *count != 0 { count = cvar.wait(count).unwrap(); } - info!("serve_async: shutdown complete ({} connections alive)", *count); + info!("serve_async: shutdown complete ({} connections alive)", + *count); } fn increment(&self) { @@ -123,9 +122,10 @@ struct ConnectionHandler { impl Drop for ConnectionHandler { fn drop(&mut self) { if let Err(e) = bincode::serde::serialize_into(&mut self.read_stream, - &Packet::Shutdown::<()>, - bincode::SizeLimit::Infinite) { - warn!("ConnectionHandler: could not notify client of shutdown: {:?}", e); + &Packet::Shutdown::<()>, + bincode::SizeLimit::Infinite) { + warn!("ConnectionHandler: could not notify client of shutdown: {:?}", + e); } trace!("ConnectionHandler: finished serving client."); self.open_connections.decrement_and_notify(); @@ -158,15 +158,18 @@ impl ConnectionHandler { let reply = f.serve(message); let reply_packet = Packet::Message(id, reply); let mut stream = stream.lock().unwrap(); - if let Err(e) = bincode::serde::serialize_into(&mut *stream, - &reply_packet, - bincode::SizeLimit::Infinite) { - warn!("ConnectionHandler: failed to write reply to Client: {:?}", e); + if let Err(e) = + bincode::serde::serialize_into(&mut *stream, + &reply_packet, + bincode::SizeLimit::Infinite) { + warn!("ConnectionHandler: failed to write reply to Client: {:?}", + e); } open_connections.decrement(); }); } - Err(bincode::serde::DeserializeError::IoError(ref err)) if Self::timed_out(err.kind()) => { + Err(bincode::serde::DeserializeError::IoError(ref err)) + if Self::timed_out(err.kind()) => { if !self.shutdown() { warn!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \ retrying read.", @@ -507,8 +510,7 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = serve_async("0.0.0.0:0", server.clone(), test_timeout()).unwrap(); - let client: Client = Client::new(serve_handle.local_addr().clone(), - None) + let client: Client = Client::new(serve_handle.local_addr().clone(), None) .expect(&line!().to_string()); drop(client); serve_handle.shutdown(); @@ -562,8 +564,7 @@ mod test { let server = Arc::new(Server::new()); let serve_handle = serve_async("0.0.0.0:0", server, Some(Duration::new(0, 10))).unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr, None) - .unwrap()); + let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(&Request::Increment)); @@ -578,8 +579,7 @@ mod test { let server = Arc::new(BarrierServer::new(10)); let serve_handle = serve_async("0.0.0.0:0", server.clone(), test_timeout()).unwrap(); let addr = serve_handle.local_addr().clone(); - let client: Arc> = Arc::new(Client::new(addr, None) - .unwrap()); + let client: Arc> = Arc::new(Client::new(addr, None).unwrap()); let mut join_handles = vec![]; for _ in 0..10 { let my_client = client.clone();