diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 2545fa0..7980146 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -11,13 +11,13 @@ readme = "../README.md" description = "An RPC framework for Rust with a focus on ease of use." [dependencies] -bincode = "0.5" +bincode = "0.6" log = "0.3" -scoped-pool = "0.1" -serde = "0.7" +scoped-pool = "1.0" +serde = "0.8" unix_socket = "0.5" [dev-dependencies] -lazy_static = "0.1" +lazy_static = "0.2" env_logger = "0.3" tempdir = "0.3" diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index 4a6d73a..f1daa9b 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -500,7 +500,8 @@ macro_rules! service { } } -#[allow(dead_code)] // because we're just testing that the macro expansion compiles +#[allow(dead_code)] +// because we're just testing that the macro expansion compiles #[cfg(test)] mod syntax_test { // Tests a service definition with a fn that takes no args @@ -594,7 +595,7 @@ mod functional_test { fn async_try_clone_unix() { let temp_dir = tempdir::TempDir::new("tarpc").unwrap(); let temp_file = temp_dir.path() - .join("async_try_clone_unix.tmp"); + .join("async_try_clone_unix.tmp"); let handle = Server.spawn(UnixTransport(temp_file)).unwrap(); let client1 = AsyncClient::new(handle.dialer()).unwrap(); let client2 = client1.try_clone().unwrap(); diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index dd2ed56..b61e076 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -5,7 +5,7 @@ use serde; use std::fmt; -use std::io::{self, BufReader, BufWriter, Read}; +use std::io::{self, BufReader, BufWriter}; use std::collections::HashMap; use std::mem; use std::sync::{Arc, Mutex}; @@ -119,9 +119,9 @@ impl Drop for Client // finish. debug!("Joining writer and reader."); reader_guard.take() - .expect(pos!()) - .join() - .expect(pos!()); + .expect(pos!()) + .join() + .expect(pos!()); debug!("Successfully joined writer and reader."); } } diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 3a7b2fd..5cefea1 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -185,8 +185,8 @@ mod test { let _ = env_logger::init(); let server = Arc::new(Server::new()); let serve_handle = server.spawn_with_config("localhost:0", - Config { timeout: Some(Duration::new(0, 10)) }) - .unwrap(); + Config { timeout: Some(Duration::new(0, 10)) }) + .unwrap(); let client: Client<(), u64, _> = Client::new(serve_handle.dialer()).unwrap(); let thread = thread::spawn(move || serve_handle.shutdown()); info!("force_shutdown:: rpc1: {:?}", client.rpc(())); @@ -197,9 +197,9 @@ mod test { fn client_failed_rpc() { let _ = env_logger::init(); let server = Arc::new(Server::new()); - let serve_handle = server.spawn_with_config("localhost:0", - Config { timeout: test_timeout() }) - .unwrap(); + let serve_handle = + server.spawn_with_config("localhost:0", Config { timeout: test_timeout() }) + .unwrap(); let client: Arc> = Arc::new(Client::new(serve_handle.dialer()).unwrap()); client.rpc(()).unwrap(); serve_handle.shutdown(); diff --git a/tarpc/src/protocol/packet.rs b/tarpc/src/protocol/packet.rs index e0c1caf..ccffdea 100644 --- a/tarpc/src/protocol/packet.rs +++ b/tarpc/src/protocol/packet.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Deserializer, Serialize, Serializer, de, ser}; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de}; use std::marker::PhantomData; /// Packet shared between client and server. @@ -19,40 +19,10 @@ impl Serialize for Packet { fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> where S: Serializer { - serializer.serialize_struct(PACKET, - MapVisitor { - value: self, - state: 0, - }) - } -} - -struct MapVisitor<'a, T: 'a> { - value: &'a Packet, - state: u8, -} - -impl<'a, T: Serialize> ser::MapVisitor for MapVisitor<'a, T> { - #[inline] - fn visit(&mut self, serializer: &mut S) -> Result, S::Error> - where S: Serializer - { - match self.state { - 0 => { - self.state += 1; - Ok(Some(try!(serializer.serialize_struct_elt(RPC_ID, &self.value.rpc_id)))) - } - 1 => { - self.state += 1; - Ok(Some(try!(serializer.serialize_struct_elt(MESSAGE, &self.value.message)))) - } - _ => Ok(None), - } - } - - #[inline] - fn len(&self) -> Option { - Some(2) + let mut state = try!(serializer.serialize_struct(PACKET, 2)); + try!(serializer.serialize_struct_elt(&mut state, RPC_ID, &self.rpc_id)); + try!(serializer.serialize_struct_elt(&mut state, MESSAGE, &self.message)); + serializer.serialize_struct_end(state) } } diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index f635145..abeb2fc 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -41,7 +41,7 @@ impl<'a, S, St> ConnectionHandler<'a, S, St> scope.execute(move || Self::write(rx, write_stream)); loop { match read_stream.deserialize() { - Ok(Packet { rpc_id, message, }) => { + Ok(Packet { rpc_id, message }) => { let tx = tx.clone(); scope.execute(move || { let reply = server.serve(message); @@ -81,7 +81,8 @@ impl<'a, S, St> ConnectionHandler<'a, S, St> fn timed_out(error_kind: io::ErrorKind) -> bool { match error_kind { - io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => true, + io::ErrorKind::TimedOut | + io::ErrorKind::WouldBlock => true, _ => false, } } @@ -264,7 +265,6 @@ pub trait Serve: Send + Sync + Sized { dialer: dialer, }) } - } impl Serve for P