Cargo fmt

This commit is contained in:
Tim Kuehn
2016-01-14 01:55:25 -08:00
parent 2644bf0d9b
commit 8c51d2ca1b
2 changed files with 35 additions and 17 deletions

View File

@@ -31,7 +31,8 @@
//!
//! fn main() {
//! let addr = "127.0.0.1:9000";
//! let shutdown = my_server::serve(addr, (), Some(Duration::from_secs(30))).unwrap();
//! let shutdown = my_server::serve(addr, (),
//! Some(Duration::from_secs(30))).unwrap();
//! let client = Client::new(addr, None).unwrap();
//! assert_eq!(3, client.add(1, 2).unwrap());
//! assert_eq!("Hello, Mom!".to_string(),

View File

@@ -117,17 +117,20 @@ impl ConnectionHandler {
bincode::serde::serialize_into(&mut *my_stream,
&reply_packet,
bincode::SizeLimit::Infinite)
.unwrap();
.unwrap();
});
}
Err(bincode::serde::DeserializeError::IoError(ref err))
if Self::timed_out(err.kind()) =>
{
if Self::timed_out(err.kind()) => {
if !self.shutdown() {
warn!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so retrying read.", err);
warn!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \
retrying read.",
err);
continue;
} else {
warn!("ConnectionHandler: read timed out ({:?}). Server shutdown, so closing connection.", err);
warn!("ConnectionHandler: read timed out ({:?}). Server shutdown, so \
closing connection.",
err);
let mut stream = stream.lock().unwrap();
try!(bincode::serde::serialize_into(&mut *stream,
&Packet::Shutdown::<Reply>,
@@ -136,7 +139,9 @@ impl ConnectionHandler {
}
}
Err(e) => {
warn!("ConnectionHandler: closing client connection due to error while serving: {:?}", e);
warn!("ConnectionHandler: closing client connection due to error while \
serving: {:?}",
e);
return Err(e.into());
}
}
@@ -188,7 +193,10 @@ impl ServeHandle {
}
/// Start
pub fn serve_async<A, F, Request, Reply>(addr: A, f: F, read_timeout: Option<Duration>) -> io::Result<ServeHandle>
pub fn serve_async<A, F, Request, Reply>(addr: A,
f: F,
read_timeout: Option<Duration>)
-> io::Result<ServeHandle>
where A: ToSocketAddrs,
Request: 'static + fmt::Debug + Send + serde::de::Deserialize + serde::ser::Serialize,
Reply: 'static + fmt::Debug + serde::ser::Serialize,
@@ -204,14 +212,16 @@ pub fn serve_async<A, F, Request, Reply>(addr: A, f: F, read_timeout: Option<Dur
for conn in listener.incoming() {
match die_rx.try_recv() {
Ok(_) => {
info!("serve_async: shutdown received. Waiting for open connections to return...");
info!("serve_async: shutdown received. Waiting for open connections to \
return...");
shutdown.store(true, Ordering::SeqCst);
let &(ref count, ref cvar) = &*open_connections;
let mut count = count.lock().unwrap();
while *count != 0 {
count = cvar.wait(count).unwrap();
}
info!("serve_async: shutdown complete ({} connections alive)", *count);
info!("serve_async: shutdown complete ({} connections alive)",
*count);
break;
}
Err(TryRecvError::Disconnected) => {
@@ -272,11 +282,13 @@ enum Packet<T> {
}
struct Reader<Reply> {
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>
requests: Arc<Mutex<Option<HashMap<u64, Sender<Reply>>>>>,
}
impl<Reply> Reader<Reply> {
fn read(self, mut stream: TcpStream) where Reply: serde::Deserialize {
fn read(self, mut stream: TcpStream)
where Reply: serde::Deserialize
{
loop {
let packet: bincode::serde::DeserializeResult<Packet<Reply>> =
bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite);
@@ -380,7 +392,9 @@ impl<Request, Reply> Client<Request, Reply>
if let Some(requests) = self.requests.lock().unwrap().as_mut() {
requests.remove(&id);
} else {
warn!("Client: couldn't remove sender for request {} because reader thread returned.", id);
warn!("Client: couldn't remove sender for request {} because reader thread \
returned.",
id);
}
return Err(err.into());
}
@@ -513,10 +527,12 @@ mod test {
let serve_handle = serve_async("0.0.0.0:0", server, Some(Duration::new(0, 10))).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<Request, Reply>> = Arc::new(Client::new(addr, test_timeout())
.unwrap());
.unwrap());
let thread = thread::spawn(move || serve_handle.shutdown());
info!("force_shutdown:: rpc1: {:?}", client.rpc(&Request::Increment));
info!("force_shutdown:: rpc2: {:?}", client.rpc(&Request::Increment));
info!("force_shutdown:: rpc1: {:?}",
client.rpc(&Request::Increment));
info!("force_shutdown:: rpc2: {:?}",
client.rpc(&Request::Increment));
thread.join().unwrap();
}
@@ -526,7 +542,8 @@ mod test {
let server = Arc::new(BarrierServer::new(10));
let serve_handle = serve_async("0.0.0.0:0", server.clone(), test_timeout()).unwrap();
let addr = serve_handle.local_addr().clone();
let client: Arc<Client<Request, Reply>> = Arc::new(Client::new(addr, test_timeout()).unwrap());
let client: Arc<Client<Request, Reply>> = Arc::new(Client::new(addr, test_timeout())
.unwrap());
let mut join_handles = vec![];
for _ in 0..10 {
let my_client = client.clone();