Fix bug wherein the Codec was clearing the buf after decoding a message. Don't do thatgit stash pop!

This commit is contained in:
Tim Kuehn
2016-12-16 14:15:31 -08:00
parent be156f4d6b
commit f2bf1adf8b
3 changed files with 45 additions and 14 deletions

View File

@@ -12,7 +12,7 @@ use std::mem;
use tokio_core::io::{EasyBuf, Framed, Io};
use tokio_proto::streaming::multiplex::{self, RequestId};
use tokio_proto::multiplex::{ClientProto, ServerProto};
use util::Never;
use util::{Debugger, Never};
/// The type of message sent and received by the transport.
pub type Frame<T> = multiplex::Frame<T, Never, io::Error>;
@@ -48,44 +48,60 @@ impl<Req, Resp> tokio_core::io::Codec for Codec<Req, Resp>
fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
buf.write_u64::<BigEndian>(id).unwrap();
trace!("Encoded request id = {} as {:?}", id, buf);
buf.write_u64::<BigEndian>(bincode::serialized_size(&message)).unwrap();
bincode::serialize_into(buf,
&message,
SizeLimit::Infinite)
// TODO(tikue): handle err
.expect("In bincode::serialize_into");
trace!("Encoded buffer: {:?}", buf);
Ok(())
}
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error> {
use self::CodecState::*;
trace!("Codec::decode: {:?}", buf.as_slice());
loop {
match self.state {
Id if buf.len() < mem::size_of::<u64>() => return Ok(None),
Id => {
let id_buf = buf.drain_to(mem::size_of::<u64>());
self.state = Len { id: Cursor::new(id_buf).read_u64::<BigEndian>()? };
Id if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len());
return Ok(None)
}
Id => {
let mut id_buf = buf.drain_to(mem::size_of::<u64>());
let id = Cursor::new(&mut id_buf).read_u64::<BigEndian>()?;
trace!("--> Parsed id = {} from {:?}", id, id_buf.as_slice());
self.state = Len { id: id };
}
Len { .. } if buf.len() < mem::size_of::<u64>() => {
trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len());
return Ok(None)
}
Len { .. } if buf.len() < mem::size_of::<u64>() => return Ok(None),
Len { id } => {
let len_buf = buf.drain_to(mem::size_of::<u64>());
let len = Cursor::new(len_buf).read_u64::<BigEndian>()?;
trace!("--> Parsed payload length = {}, remaining buffer length = {}",
len, buf.len());
self.state = Payload {
id: id,
len: Cursor::new(len_buf).read_u64::<BigEndian>()?,
len: len,
};
}
Payload { len, .. } if buf.len() < len as usize => return Ok(None),
Payload { id, .. } => {
let mut buf = buf.get_mut();
let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf),
Payload { len, .. } if buf.len() < len as usize => {
trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len);
return Ok(None)
}
Payload { id, len } => {
let payload = buf.drain_to(len as usize);
let result = bincode::deserialize_from(&mut Cursor::new(payload),
SizeLimit::Infinite);
// Clear any unread bytes so we don't read garbage on next request.
buf.clear();
// Reset the state machine because, either way, we're done processing this
// message.
self.state = Id;
trace!("--> Parsed message: {:?}", Debugger(&result));
return Ok(Some((id, result)));
}
}

View File

@@ -59,7 +59,7 @@
//! ```
//!
#![deny(missing_docs)]
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)]
#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)]
#![plugin(tarpc_plugins)]
extern crate byteorder;

View File

@@ -136,3 +136,18 @@ pub fn spawn_core() -> reactor::Remote {
});
rx.recv().unwrap()
}
/// A struct that will format as the contained type if the type impls Debug.
pub struct Debugger<'a, T: 'a>(pub &'a T);
impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{:?}", self.0)
}
}
impl<'a, T> fmt::Debug for Debugger<'a, T> {
default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{{not debuggable}}")
}
}