From f2bf1adf8b8ea784790cd74d04e173dbe9ff6139 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Fri, 16 Dec 2016 14:15:31 -0800 Subject: [PATCH] Fix bug wherein the Codec was clearing the buf after decoding a message. Don't do thatgit stash pop! --- src/framed.rs | 42 +++++++++++++++++++++++++++++------------- src/lib.rs | 2 +- src/util.rs | 15 +++++++++++++++ 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/framed.rs b/src/framed.rs index 502b584..3437561 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -12,7 +12,7 @@ use std::mem; use tokio_core::io::{EasyBuf, Framed, Io}; use tokio_proto::streaming::multiplex::{self, RequestId}; use tokio_proto::multiplex::{ClientProto, ServerProto}; -use util::Never; +use util::{Debugger, Never}; /// The type of message sent and received by the transport. pub type Frame = multiplex::Frame; @@ -48,44 +48,60 @@ impl tokio_core::io::Codec for Codec fn encode(&mut self, (id, message): Self::Out, buf: &mut Vec) -> io::Result<()> { buf.write_u64::(id).unwrap(); + trace!("Encoded request id = {} as {:?}", id, buf); buf.write_u64::(bincode::serialized_size(&message)).unwrap(); bincode::serialize_into(buf, &message, SizeLimit::Infinite) // TODO(tikue): handle err .expect("In bincode::serialize_into"); + trace!("Encoded buffer: {:?}", buf); Ok(()) } fn decode(&mut self, buf: &mut EasyBuf) -> Result, io::Error> { use self::CodecState::*; + trace!("Codec::decode: {:?}", buf.as_slice()); loop { match self.state { - Id if buf.len() < mem::size_of::() => return Ok(None), - Id => { - let id_buf = buf.drain_to(mem::size_of::()); - self.state = Len { id: Cursor::new(id_buf).read_u64::()? }; + Id if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse id.", buf.len()); + return Ok(None) + } + Id => { + let mut id_buf = buf.drain_to(mem::size_of::()); + let id = Cursor::new(&mut id_buf).read_u64::()?; + trace!("--> Parsed id = {} from {:?}", id, id_buf.as_slice()); + self.state = Len { id: id }; + } + Len { .. } if buf.len() < mem::size_of::() => { + trace!("--> Buf len is {}; waiting for 8 to parse packet length.", buf.len()); + return Ok(None) } - Len { .. } if buf.len() < mem::size_of::() => return Ok(None), Len { id } => { let len_buf = buf.drain_to(mem::size_of::()); + let len = Cursor::new(len_buf).read_u64::()?; + trace!("--> Parsed payload length = {}, remaining buffer length = {}", + len, buf.len()); self.state = Payload { id: id, - len: Cursor::new(len_buf).read_u64::()?, + len: len, }; } - Payload { len, .. } if buf.len() < len as usize => return Ok(None), - Payload { id, .. } => { - let mut buf = buf.get_mut(); - let result = bincode::deserialize_from(&mut Cursor::new(&mut *buf), + Payload { len, .. } if buf.len() < len as usize => { + trace!("--> Buf len is {}; waiting for {} to parse payload.", buf.len(), len); + return Ok(None) + } + Payload { id, len } => { + let payload = buf.drain_to(len as usize); + let result = bincode::deserialize_from(&mut Cursor::new(payload), SizeLimit::Infinite); - // Clear any unread bytes so we don't read garbage on next request. - buf.clear(); // Reset the state machine because, either way, we're done processing this // message. self.state = Id; + trace!("--> Parsed message: {:?}", Debugger(&result)); return Ok(Some((id, result))); } } diff --git a/src/lib.rs b/src/lib.rs index 6aa202a..abc14e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ //! ``` //! #![deny(missing_docs)] -#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits)] +#![feature(plugin, conservative_impl_trait, never_type, proc_macro, unboxed_closures, fn_traits, specialization)] #![plugin(tarpc_plugins)] extern crate byteorder; diff --git a/src/util.rs b/src/util.rs index f6dd550..6fd9f2e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -136,3 +136,18 @@ pub fn spawn_core() -> reactor::Remote { }); rx.recv().unwrap() } + +/// A struct that will format as the contained type if the type impls Debug. +pub struct Debugger<'a, T: 'a>(pub &'a T); + +impl<'a, T: fmt::Debug> fmt::Debug for Debugger<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{:?}", self.0) + } +} + +impl<'a, T> fmt::Debug for Debugger<'a, T> { + default fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{{not debuggable}}") + } +}