diff --git a/tarpc/src/serde_transport.rs b/tarpc/src/serde_transport.rs index 94050d6..91e8f71 100644 --- a/tarpc/src/serde_transport.rs +++ b/tarpc/src/serde_transport.rs @@ -8,7 +8,7 @@ #![deny(missing_docs)] -use futures::{prelude::*, task::*}; +use futures::{prelude::*, ready, task::*}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{error::Error, io, pin::Pin}; @@ -42,14 +42,15 @@ where type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - match self.project().inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(Ok::<_, CodecError>(next))) => Poll::Ready(Some(Ok(next))), - Poll::Ready(Some(Err::<_, CodecError>(e))) => { - Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, e)))) - } - } + let next = ready!(self.project().inner.poll_next(cx)).map(|next| { + next.map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("while reading from transport: {}", e.into()), + ) + }) + }); + Poll::Ready(next) } } @@ -65,31 +66,48 @@ where type Error = io::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - convert(self.project().inner.poll_ready(cx)) + self.project().inner.poll_ready(cx).map(|ready| { + ready.map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("while readying write half of transport: {}", e.into()), + ) + }) + }) } fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { - self.project() - .inner - .start_send(item) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + self.project().inner.start_send(item).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("while writing to transport: {}", e.into()), + ) + }) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - convert(self.project().inner.poll_flush(cx)) + self.project().inner.poll_flush(cx).map(|ready| { + ready.map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("while flushing transport: {}", e.into()), + ) + }) + }) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - convert(self.project().inner.poll_close(cx)) + self.project().inner.poll_close(cx).map(|ready| { + ready.map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("while closing write half of transport: {}", e.into()), + ) + }) + }) } } -fn convert>>( - poll: Poll>, -) -> Poll> { - poll.map(|ready| ready.map_err(|e| io::Error::new(io::ErrorKind::Other, e))) -} - /// Constructs a new transport from a framed transport and a serialization codec. pub fn new( framed_io: Framed,