diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 68f7f1e..d80fe21 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -6,10 +6,10 @@ license = "MIT" description = "tarpc is an RPC framework for rust with a focus on ease of use." [dependencies] -serde = "*" bincode = "*" -serde_macros = "*" -log = "*" env_logger = "*" -scoped-pool = "*" lazy_static = "*" +log = "*" +scoped-pool = "*" +serde = "*" +serde_macros = "*" diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index da510ac..26077ff 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -53,13 +53,13 @@ extern crate serde; extern crate bincode; +#[cfg(test)] +#[macro_use] +extern crate lazy_static; #[macro_use] extern crate log; extern crate scoped_pool; extern crate test; -#[cfg(test)] -#[macro_use] -extern crate lazy_static; macro_rules! pos { () => (concat!(file!(), ":", line!())) diff --git a/tarpc/src/protocol/client.rs b/tarpc/src/protocol/client.rs index 9f99262..bca7c55 100644 --- a/tarpc/src/protocol/client.rs +++ b/tarpc/src/protocol/client.rs @@ -2,19 +2,19 @@ // // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -// -use bincode; + use serde; use std::fmt; -use std::io::{self, BufReader, BufWriter, Read, Write}; +use std::io::{self, BufReader, BufWriter, Read}; use std::collections::HashMap; use std::mem; use std::net::{TcpStream, ToSocketAddrs}; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; -use std::time::Duration; use std::thread; -use super::{Error, Packet, Result}; +use std::time::Duration; + +use super::{Serialize, Deserialize, Error, Packet, Result}; /// A client stub that connects to a server to run rpcs. pub struct Client @@ -158,18 +158,18 @@ impl RpcFutures { } } - fn complete_reply(&mut self, id: u64, reply: Reply) { - if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&id) { - if let Err(e) = tx.send(Ok(reply)) { + fn complete_reply(&mut self, packet: Packet) { + if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&packet.rpc_id) { + if let Err(e) = tx.send(Ok(packet.message)) { info!("Reader: could not complete reply: {:?}", e); } } else { - warn!("RpcFutures: expected sender for id {} but got None!", id); + warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id); } } - fn set_error(&mut self, err: bincode::serde::DeserializeError) { - let _ = mem::replace(&mut self.0, Err(err.into())); + fn set_error(&mut self, err: Error) { + let _ = mem::replace(&mut self.0, Err(err)); } fn get_error(&self) -> Error { @@ -206,9 +206,7 @@ fn write(outbound: Receiver<(Request, Sender>)>, message: request, }; debug!("Writer: calling rpc({:?})", id); - if let Err(e) = bincode::serde::serialize_into(&mut stream, - &packet, - bincode::SizeLimit::Infinite) { + if let Err(e) = stream.serialize(&packet) { report_error(&tx, e.into()); // Typically we'd want to notify the client of any Err returned by remove_tx, but in // this case the client already hit an Err, and doesn't need to know about this one, as @@ -216,9 +214,6 @@ fn write(outbound: Receiver<(Request, Sender>)>, let _ = requests.lock().expect(pos!()).remove_tx(id); continue; } - if let Err(e) = stream.flush() { - report_error(&tx, e.into()); - } } fn report_error(tx: &Sender>, e: Error) @@ -240,15 +235,10 @@ fn read(requests: Arc>>, stream: TcpStream) { let mut stream = BufReader::new(stream); loop { - let packet: bincode::serde::DeserializeResult> = - bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite); - match packet { - Ok(Packet { - rpc_id: id, - message: reply - }) => { - debug!("Client: received message, id={}", id); - requests.lock().expect(pos!()).complete_reply(id, reply); + match stream.deserialize::>() { + Ok(packet) => { + debug!("Client: received message, id={}", packet.rpc_id); + requests.lock().expect(pos!()).complete_reply(packet); } Err(err) => { warn!("Client: reader thread encountered an unexpected error while parsing; \ diff --git a/tarpc/src/protocol/mod.rs b/tarpc/src/protocol/mod.rs index 45eb4de..2ce4369 100644 --- a/tarpc/src/protocol/mod.rs +++ b/tarpc/src/protocol/mod.rs @@ -3,8 +3,10 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; -use std::io; +use bincode::{self, SizeLimit}; +use bincode::serde::{deserialize_from, serialize_into}; +use serde; +use std::io::{self, Read, Write}; use std::convert; use std::sync::Arc; @@ -59,6 +61,25 @@ struct Packet { message: T, } +trait Deserialize: Read + Sized { + fn deserialize(&mut self) -> Result { + deserialize_from(self, SizeLimit::Infinite) + .map_err(Error::from) + } +} + +impl Deserialize for R {} + +trait Serialize: Write + Sized { + fn serialize(&mut self, value: &T) -> Result<()> { + try!(serialize_into(self, value, SizeLimit::Infinite)); + try!(self.flush()); + Ok(()) + } +} + +impl Serialize for W {} + #[cfg(test)] mod test { extern crate env_logger; diff --git a/tarpc/src/protocol/server.rs b/tarpc/src/protocol/server.rs index 37b5e77..e04b6e7 100644 --- a/tarpc/src/protocol/server.rs +++ b/tarpc/src/protocol/server.rs @@ -3,17 +3,16 @@ // Licensed under the MIT License, . // This file may not be copied, modified, or distributed except according to those terms. -use bincode; use serde; use scoped_pool::{Pool, Scope}; use std::fmt; -use std::io::{self, BufReader, BufWriter, Write}; +use std::io::{self, BufReader, BufWriter}; use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread::{self, JoinHandle}; -use super::{Packet, Result}; +use super::{Deserialize, Error, Packet, Result, Serialize}; struct ConnectionHandler<'a, S> where S: Serve @@ -38,7 +37,7 @@ impl<'a, S> ConnectionHandler<'a, S> let (tx, rx) = channel(); scope.execute(move || Self::write(rx, write_stream)); loop { - match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) { + match read_stream.deserialize() { Ok(Packet { rpc_id, message, }) => { let tx = tx.clone(); scope.execute(move || { @@ -54,8 +53,7 @@ impl<'a, S> ConnectionHandler<'a, S> break; } } - Err(bincode::serde::DeserializeError::IoError(ref err)) - if Self::timed_out(err.kind()) => { + Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => { if !shutdown.load(Ordering::SeqCst) { info!("ConnectionHandler: read timed out ({:?}). Server not \ shutdown, so retrying read.", @@ -93,16 +91,8 @@ impl<'a, S> ConnectionHandler<'a, S> return; } Ok(reply_packet) => { - if let Err(e) = - bincode::serde::serialize_into(stream, - &reply_packet, - bincode::SizeLimit::Infinite) { - warn!("Writer: failed to write reply to Client: {:?}", - e); - } - if let Err(e) = stream.flush() { - warn!("Writer: failed to flush reply to Client: {:?}", - e); + if let Err(e) = stream.serialize(&reply_packet) { + warn!("Writer: failed to write reply to Client: {:?}", e); } } }