From 7dae99d7b5073fc91664a61e4529b890b29d6946 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Mon, 15 Feb 2016 02:34:20 -0800 Subject: [PATCH] rustfmt the things --- tarpc/rustfmt.toml | 1 + tarpc/src/macros.rs | 6 +++--- tarpc/src/protocol/client.rs | 29 ++++++++++++++++------------- tarpc/src/protocol/mod.rs | 7 ++++--- tarpc/src/protocol/packet.rs | 16 ++++++++++------ tarpc/src/protocol/server.rs | 12 +++++++----- 6 files changed, 41 insertions(+), 30 deletions(-) diff --git a/tarpc/rustfmt.toml b/tarpc/rustfmt.toml index 44148a2..60f7208 100644 --- a/tarpc/rustfmt.toml +++ b/tarpc/rustfmt.toml @@ -1 +1,2 @@ +ideal_width = 100 reorder_imports = true diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index b25c0cf..55af464 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -8,7 +8,7 @@ pub mod serde { pub use serde::{Deserialize, Deserializer, Serialize, Serializer}; /// Deserialization re-exports required by macros. Not for general use. pub mod de { - pub use serde::de::{EnumVisitor, Error, Visitor, VariantVisitor}; + pub use serde::de::{EnumVisitor, Error, VariantVisitor, Visitor}; } } @@ -482,7 +482,7 @@ mod functional_test { #[test] fn simple() { let _ = env_logger::init(); - let handle = serve( "localhost:0", Server, test_timeout()).unwrap(); + let handle = serve("localhost:0", Server, test_timeout()).unwrap(); let client = Client::new(handle.local_addr(), None).unwrap(); assert_eq!(3, client.add(1, 2).unwrap()); drop(client); @@ -501,7 +501,7 @@ mod functional_test { #[test] fn try_clone() { - let handle = serve( "localhost:0", Server, test_timeout()).unwrap(); + let handle = serve("localhost:0", Server, test_timeout()).unwrap(); let client1 = Client::new(handle.local_addr(), None).unwrap(); let client2 = client1.try_clone().unwrap(); assert_eq!(3, client1.add(1, 2).unwrap()); diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index f1250b0..f6098ce 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -14,7 +14,7 @@ use std::sync::mpsc::{Receiver, Sender, channel}; use std::thread; use std::time::Duration; -use super::{Serialize, Deserialize, Error, Packet, Result}; +use super::{Deserialize, Error, Packet, Result, Serialize}; /// A client stub that connects to a server to run rpcs. pub struct Client @@ -100,15 +100,16 @@ impl Drop for Client if let Some(reader_guard) = Arc::get_mut(&mut self.reader_guard) { debug!("Attempting to shut down writer and reader threads."); if let Err(e) = self.shutdown.shutdown(::std::net::Shutdown::Both) { - warn!("Client: couldn't shutdown writer and reader threads: {:?}", e); + warn!("Client: couldn't shutdown writer and reader threads: {:?}", + e); } else { // We only join if we know the TcpStream was shut down. Otherwise we might never // finish. debug!("Joining writer and reader."); reader_guard.take() - .expect(pos!()) - .join() - .expect(pos!()); + .expect(pos!()) + .join() + .expect(pos!()); debug!("Successfully joined writer and reader."); } } @@ -118,14 +119,15 @@ impl Drop for Client /// An asynchronous RPC call pub struct Future { rx: Receiver>, - requests: Arc>> + requests: Arc>>, } impl Future { /// Block until the result of the RPC call is available pub fn get(self) -> Result { let requests = self.requests; - self.rx.recv() + self.rx + .recv() .map_err(|_| requests.lock().expect(pos!()).get_error()) .and_then(|reply| reply) } @@ -164,7 +166,8 @@ impl RpcFutures { info!("Reader: could not complete reply: {:?}", e); } } else { - warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id); + warn!("RpcFutures: expected sender for id {} but got None!", + packet.rpc_id); } } @@ -178,10 +181,10 @@ impl RpcFutures { } fn write(outbound: Receiver<(Request, Sender>)>, - requests: Arc>>, - stream: TcpStream) + requests: Arc>>, + stream: TcpStream) where Request: serde::Serialize, - Reply: serde::Deserialize, + Reply: serde::Deserialize { let mut next_id = 0; let mut stream = BufWriter::new(stream); @@ -221,8 +224,8 @@ fn write(outbound: Receiver<(Request, Sender>)>, { // Clone the err so we can log it if sending fails if let Err(e2) = tx.send(Err(e.clone())) { - debug!("Error encountered while trying to send an error. \ - Initial error: {:?}; Send error: {:?}", + debug!("Error encountered while trying to send an error. Initial error: {:?}; Send \ + error: {:?}", e, e2); } diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 45f322f..6995700 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -59,8 +59,7 @@ pub type Result = ::std::result::Result; trait Deserialize: Read + Sized { fn deserialize(&mut self) -> Result { - deserialize_from(self, SizeLimit::Infinite) - .map_err(Error::from) + deserialize_from(self, SizeLimit::Infinite).map_err(Error::from) } } @@ -208,7 +207,9 @@ mod test { pool.scoped(|scope| { for _ in 0..concurrency { let client = client.try_clone().unwrap(); - scope.execute(move || { client.rpc(()).unwrap(); }); + scope.execute(move || { + client.rpc(()).unwrap(); + }); } }); assert_eq!(concurrency as u64, server.count()); diff --git a/tarpc/src/protocol/packet.rs b/tarpc/src/protocol/packet.rs index 63c351a..86b555d 100644 --- a/tarpc/src/protocol/packet.rs +++ b/tarpc/src/protocol/packet.rs @@ -19,10 +19,11 @@ impl Serialize for Packet { fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> where S: Serializer { - serializer.visit_struct(PACKET, MapVisitor { - value: self, - state: 0, - }) + serializer.visit_struct(PACKET, + MapVisitor { + value: self, + state: 0, + }) } } @@ -44,7 +45,7 @@ impl <'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> { self.state += 1; Ok(Some(try!(serializer.visit_struct_elt(MESSAGE, &self.value.message)))) } - _ => { + _ => { Ok(None) } } @@ -91,7 +92,10 @@ fn serde() { use bincode; let _ = env_logger::init(); - let packet = Packet { rpc_id: 1, message: () }; + let packet = Packet { + rpc_id: 1, + message: (), + }; let ser = bincode::serde::serialize(&packet, bincode::SizeLimit::Infinite).unwrap(); let de = bincode::serde::deserialize(&ser); assert_eq!(packet, de.unwrap()); diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index e04b6e7..a6db662 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -44,7 +44,7 @@ impl<'a, S> ConnectionHandler<'a, S> let reply = server.serve(message); let reply_packet = Packet { rpc_id: rpc_id, - message: reply + message: reply, }; tx.send(reply_packet).expect(pos!()); }); @@ -55,8 +55,8 @@ impl<'a, S> ConnectionHandler<'a, S> } Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => { if !shutdown.load(Ordering::SeqCst) { - info!("ConnectionHandler: read timed out ({:?}). Server not \ - shutdown, so retrying read.", + info!("ConnectionHandler: read timed out ({:?}). Server not shutdown, so \ + retrying read.", err); continue; } else { @@ -142,7 +142,9 @@ struct Server<'a, S: 'a> { impl<'a, S: 'a> Server<'a, S> where S: Serve + 'static { - fn serve<'b>(self, scope: &Scope<'b>) where 'a: 'b { + fn serve<'b>(self, scope: &Scope<'b>) + where 'a: 'b + { for conn in self.listener.incoming() { match self.die_rx.try_recv() { Ok(_) => { @@ -169,7 +171,7 @@ impl<'a, S: 'a> Server<'a, S> let read_conn = match conn.try_clone() { Err(err) => { error!("serve: could not clone tcp stream; possibly out of file descriptors? \ - Err: {:?}", + Err: {:?}", err); continue; }