rustfmt the things

This commit is contained in:
Tim Kuehn
2016-02-15 02:34:20 -08:00
parent 104196a90e
commit 7dae99d7b5
6 changed files with 41 additions and 30 deletions

View File

@@ -1 +1,2 @@
ideal_width = 100
reorder_imports = true

View File

@@ -8,7 +8,7 @@ pub mod serde {
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Deserialization re-exports required by macros. Not for general use.
pub mod de {
pub use serde::de::{EnumVisitor, Error, Visitor, VariantVisitor};
pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor};
}
}
@@ -482,7 +482,7 @@ mod functional_test {
#[test]
fn simple() {
let _ = env_logger::init();
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client = Client::new(handle.local_addr(), None).unwrap();
assert_eq!(3, client.add(1, 2).unwrap());
drop(client);
@@ -501,7 +501,7 @@ mod functional_test {
#[test]
fn try_clone() {
let handle = serve( "localhost:0", Server, test_timeout()).unwrap();
let handle = serve("localhost:0", Server, test_timeout()).unwrap();
let client1 = Client::new(handle.local_addr(), None).unwrap();
let client2 = client1.try_clone().unwrap();
assert_eq!(3, client1.add(1, 2).unwrap());

View File

@@ -14,7 +14,7 @@ use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
use std::time::Duration;
use super::{Serialize, Deserialize, Error, Packet, Result};
use super::{Deserialize, Error, Packet, Result, Serialize};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
@@ -100,15 +100,16 @@ impl<Request, Reply> Drop for Client<Request, Reply>
if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) {
debug!("Attempting to shut down writer and reader threads.");
if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) {
warn!("Client: couldn't shutdown writer and reader threads: {:?}", e);
warn!("Client: couldn't shutdown writer and reader threads: {:?}",
e);
} else {
// We only join if we know the TcpStream was shut down. Otherwise we might never
// finish.
debug!("Joining writer and reader.");
reader_guard.take()
.expect(pos!())
.join()
.expect(pos!());
.expect(pos!())
.join()
.expect(pos!());
debug!("Successfully joined writer and reader.");
}
}
@@ -118,14 +119,15 @@ impl<Request, Reply> Drop for Client<Request, Reply>
/// An asynchronous RPC call
pub struct Future<T> {
rx: Receiver<Result<T>>,
requests: Arc<Mutex<RpcFutures<T>>>
requests: Arc<Mutex<RpcFutures<T>>>,
}
impl<T> Future<T> {
/// Block until the result of the RPC call is available
pub fn get(self) -> Result<T> {
let requests = self.requests;
self.rx.recv()
self.rx
.recv()
.map_err(|_| requests.lock().expect(pos!()).get_error())
.and_then(|reply| reply)
}
@@ -164,7 +166,8 @@ impl<Reply> RpcFutures<Reply> {
info!("Reader: could not complete reply: {:?}", e);
}
} else {
warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id);
warn!("RpcFutures: expected sender for id {} but got None!",
packet.rpc_id);
}
}
@@ -178,10 +181,10 @@ impl<Reply> RpcFutures<Reply> {
}
fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
requests: Arc<Mutex<RpcFutures<Reply>>>,
stream: TcpStream)
where Request: serde::Serialize,
Reply: serde::Deserialize,
Reply: serde::Deserialize
{
let mut next_id = 0;
let mut stream = BufWriter::new(stream);
@@ -221,8 +224,8 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
{
// Clone the err so we can log it if sending fails
if let Err(e2) = tx.send(Err(e.clone())) {
debug!("Error encountered while trying to send an error. \
Initial error: {:?}; Send error: {:?}",
debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \
error: {:?}",
e,
e2);
}

View File

@@ -59,8 +59,7 @@ pub type Result<T> = ::std::result::Result<T, Error>;
trait Deserialize: Read + Sized {
fn deserialize<T: serde::Deserialize>(&mut self) -> Result<T> {
deserialize_from(self, SizeLimit::Infinite)
.map_err(Error::from)
deserialize_from(self, SizeLimit::Infinite).map_err(Error::from)
}
}
@@ -208,7 +207,9 @@ mod test {
pool.scoped(|scope| {
for _ in 0..concurrency {
let client = client.try_clone().unwrap();
scope.execute(move || { client.rpc(()).unwrap(); });
scope.execute(move || {
client.rpc(()).unwrap();
});
}
});
assert_eq!(concurrency as u64, server.count());

View File

@@ -19,10 +19,11 @@ impl<T: Serialize> Serialize for Packet<T> {
fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error>
where S: Serializer
{
serializer.visit_struct(PACKET, MapVisitor {
value: self,
state: 0,
})
serializer.visit_struct(PACKET,
MapVisitor {
value: self,
state: 0,
})
}
}
@@ -44,7 +45,7 @@ impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> {
self.state += 1;
Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message))))
}
_ => {
_ => {
Ok(None)
}
}
@@ -91,7 +92,10 @@ fn serde() {
use bincode;
let _ = env_logger::init();
let packet = Packet { rpc_id: 1, message: () };
let packet = Packet {
rpc_id: 1,
message: (),
};
let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap();
let de = bincode::serde::deserialize(&ser);
assert_eq!(packet, de.unwrap());

View File

@@ -44,7 +44,7 @@ impl<'a, S> ConnectionHandler<'a, S>
let reply = server.serve(message);
let reply_packet = Packet {
rpc_id: rpc_id,
message: reply
message: reply,
};
tx.send(reply_packet).expect(pos!());
});
@@ -55,8 +55,8 @@ impl<'a, S> ConnectionHandler<'a, S>
}
Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => {
if !shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: read timed out ({:?}). Server not \
shutdown, so retrying read.",
info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \
retrying read.",
err);
continue;
} else {
@@ -142,7 +142,9 @@ struct Server<'a, S: 'a> {
impl<'a, S: 'a> Server<'a, S>
where S: Serve + 'static
{
fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b {
fn serve<'b>(self, scope: &Scope<'b>)
where 'a: 'b
{
for conn in self.listener.incoming() {
match self.die_rx.try_recv() {
Ok(_) => {
@@ -169,7 +171,7 @@ impl<'a, S: 'a> Server<'a, S>
let read_conn = match conn.try_clone() {
Err(err) => {
error!("serve: could not clone tcp stream; possibly out of file descriptors? \
Err: {:?}",
Err: {:?}",
err);
continue;
}