Merge branch 'serialize-traits' into 'master'

Factor out serialization code into a Serialize and Deserialize trait

Makes the client and server code a bit nicer by sharing commonalities between them.

See merge request !20
This commit is contained in:
Adam Wright
2016-02-12 11:42:45 +05:30
5 changed files with 52 additions and 51 deletions

View File

@@ -6,10 +6,10 @@ license = "MIT"
description = "tarpc is an RPC framework for rust with a focus on ease of use."
[dependencies]
serde = "*"
bincode = "*"
serde_macros = "*"
log = "*"
env_logger = "*"
scoped-pool = "*"
lazy_static = "*"
log = "*"
scoped-pool = "*"
serde = "*"
serde_macros = "*"

View File

@@ -53,13 +53,13 @@
extern crate serde;
extern crate bincode;
#[cfg(test)]
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
extern crate scoped_pool;
extern crate test;
#[cfg(test)]
#[macro_use]
extern crate lazy_static;
macro_rules! pos {
() => (concat!(file!(), ":", line!()))

View File

@@ -2,19 +2,19 @@
//
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
//
use bincode;
use serde;
use std::fmt;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::io::{self, BufReader, BufWriter, Read};
use std::collections::HashMap;
use std::mem;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::time::Duration;
use std::thread;
use super::{Error, Packet, Result};
use std::time::Duration;
use super::{Serialize, Deserialize, Error, Packet, Result};
/// A client stub that connects to a server to run rpcs.
pub struct Client<Request, Reply>
@@ -158,18 +158,18 @@ impl<Reply> RpcFutures<Reply> {
}
}
fn complete_reply(&mut self, id: u64, reply: Reply) {
if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&id) {
if let Err(e) = tx.send(Ok(reply)) {
fn complete_reply(&mut self, packet: Packet<Reply>) {
if let Some(tx) = self.0.as_mut().expect(pos!()).remove(&packet.rpc_id) {
if let Err(e) = tx.send(Ok(packet.message)) {
info!("Reader: could not complete reply: {:?}", e);
}
} else {
warn!("RpcFutures: expected sender for id {} but got None!", id);
warn!("RpcFutures: expected sender for id {} but got None!", packet.rpc_id);
}
}
fn set_error(&mut self, err: bincode::serde::DeserializeError) {
let _ = mem::replace(&mut self.0, Err(err.into()));
fn set_error(&mut self, err: Error) {
let _ = mem::replace(&mut self.0, Err(err));
}
fn get_error(&self) -> Error {
@@ -206,9 +206,7 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
message: request,
};
debug!("Writer: calling rpc({:?})", id);
if let Err(e) = bincode::serde::serialize_into(&mut stream,
&packet,
bincode::SizeLimit::Infinite) {
if let Err(e) = stream.serialize(&packet) {
report_error(&tx, e.into());
// Typically we'd want to notify the client of any Err returned by remove_tx, but in
// this case the client already hit an Err, and doesn't need to know about this one, as
@@ -216,9 +214,6 @@ fn write<Request, Reply>(outbound: Receiver<(Request, Sender<Result<Reply>>)>,
let _ = requests.lock().expect(pos!()).remove_tx(id);
continue;
}
if let Err(e) = stream.flush() {
report_error(&tx, e.into());
}
}
fn report_error<Reply>(tx: &Sender<Result<Reply>>, e: Error)
@@ -240,15 +235,10 @@ fn read<Reply>(requests: Arc<Mutex<RpcFutures<Reply>>>, stream: TcpStream)
{
let mut stream = BufReader::new(stream);
loop {
let packet: bincode::serde::DeserializeResult<Packet<Reply>> =
bincode::serde::deserialize_from(&mut stream, bincode::SizeLimit::Infinite);
match packet {
Ok(Packet {
rpc_id: id,
message: reply
}) => {
debug!("Client: received message, id={}", id);
requests.lock().expect(pos!()).complete_reply(id, reply);
match stream.deserialize::<Packet<Reply>>() {
Ok(packet) => {
debug!("Client: received message, id={}", packet.rpc_id);
requests.lock().expect(pos!()).complete_reply(packet);
}
Err(err) => {
warn!("Client: reader thread encountered an unexpected error while parsing; \

View File

@@ -3,8 +3,10 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use bincode;
use std::io;
use bincode::{self, SizeLimit};
use bincode::serde::{deserialize_from, serialize_into};
use serde;
use std::io::{self, Read, Write};
use std::convert;
use std::sync::Arc;
@@ -59,6 +61,25 @@ struct Packet<T> {
message: T,
}
trait Deserialize: Read + Sized {
fn deserialize<T: serde::Deserialize>(&mut self) -> Result<T> {
deserialize_from(self, SizeLimit::Infinite)
.map_err(Error::from)
}
}
impl<R: Read> Deserialize for R {}
trait Serialize: Write + Sized {
fn serialize<T: serde::Serialize>(&mut self, value: &T) -> Result<()> {
try!(serialize_into(self, value, SizeLimit::Infinite));
try!(self.flush());
Ok(())
}
}
impl<W: Write> Serialize for W {}
#[cfg(test)]
mod test {
extern crate env_logger;

View File

@@ -3,17 +3,16 @@
// Licensed under the MIT License, <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed except according to those terms.
use bincode;
use serde;
use scoped_pool::{Pool, Scope};
use std::fmt;
use std::io::{self, BufReader, BufWriter, Write};
use std::io::{self, BufReader, BufWriter};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{Receiver, Sender, TryRecvError, channel};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread::{self, JoinHandle};
use super::{Packet, Result};
use super::{Deserialize, Error, Packet, Result, Serialize};
struct ConnectionHandler<'a, S>
where S: Serve
@@ -38,7 +37,7 @@ impl<'a, S> ConnectionHandler<'a, S>
let (tx, rx) = channel();
scope.execute(move || Self::write(rx, write_stream));
loop {
match bincode::serde::deserialize_from(read_stream, bincode::SizeLimit::Infinite) {
match read_stream.deserialize() {
Ok(Packet { rpc_id, message, }) => {
let tx = tx.clone();
scope.execute(move || {
@@ -54,8 +53,7 @@ impl<'a, S> ConnectionHandler<'a, S>
break;
}
}
Err(bincode::serde::DeserializeError::IoError(ref err))
if Self::timed_out(err.kind()) => {
Err(Error::Io(ref err)) if Self::timed_out(err.kind()) => {
if !shutdown.load(Ordering::SeqCst) {
info!("ConnectionHandler: read timed out ({:?}). Server not \
shutdown, so retrying read.",
@@ -93,16 +91,8 @@ impl<'a, S> ConnectionHandler<'a, S>
return;
}
Ok(reply_packet) => {
if let Err(e) =
bincode::serde::serialize_into(stream,
&reply_packet,
bincode::SizeLimit::Infinite) {
warn!("Writer: failed to write reply to Client: {:?}",
e);
}
if let Err(e) = stream.flush() {
warn!("Writer: failed to flush reply to Client: {:?}",
e);
if let Err(e) = stream.serialize(&reply_packet) {
warn!("Writer: failed to write reply to Client: {:?}", e);
}
}
}