From eb67c540b98faf49fa774ffb0f9a015cbc6ccca2 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 21 Apr 2021 12:58:55 -0700 Subject: [PATCH] Use more structured errors in client. --- tarpc/src/client.rs | 97 ++++++++++++++++++++++-------------- tarpc/src/serde_transport.rs | 50 ++++++++----------- 2 files changed, 80 insertions(+), 67 deletions(-) diff --git a/tarpc/src/client.rs b/tarpc/src/client.rs index 2bb32a5..ed3f824 100644 --- a/tarpc/src/client.rs +++ b/tarpc/src/client.rs @@ -60,15 +60,16 @@ pub struct NewClient { impl NewClient where D: Future> + Send + 'static, - E: std::fmt::Display, + E: std::error::Error + Send + Sync + 'static, { /// Helper method to spawn the dispatch on the default executor. #[cfg(feature = "tokio1")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))] pub fn spawn(self) -> C { - let dispatch = self - .dispatch - .unwrap_or_else(move |e| tracing::warn!("Connection broken: {}", e)); + let dispatch = self.dispatch.unwrap_or_else(move |e| { + let e = anyhow::Error::new(e); + tracing::warn!("Connection broken: {:?}", e); + }); tokio::spawn(dispatch); self.client } @@ -278,11 +279,20 @@ pub enum ChannelError where E: Error + Send + Sync + 'static, { - /// An error occurred reading from, or writing to, the transport. - #[error("an error occurred in the transport: {0}")] - Transport(#[source] E), - /// An error occurred while polling expired requests. - #[error("an error occurred while polling expired requests: {0}")] + /// Could not read from the transport. + #[error("could not read from the transport")] + Read(#[source] E), + /// Could not ready the transport for writes. + #[error("could not ready the transport for writes")] + Ready(#[source] E), + /// Could not write to the transport. + #[error("could not write to the transport")] + Write(#[source] E), + /// Could not flush the transport. + #[error("could not flush the transport")] + Flush(#[source] E), + /// Could not poll expired requests. + #[error("could not poll expired requests")] Timer(#[source] tokio::time::error::Error), } @@ -298,6 +308,33 @@ where self.as_mut().project().transport } + fn poll_ready<'a>( + self: &'a mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.transport_pin_mut() + .poll_ready(cx) + .map_err(ChannelError::Ready) + } + + fn start_send<'a>( + self: &'a mut Pin<&mut Self>, + message: ClientMessage, + ) -> Result<(), ChannelError> { + self.transport_pin_mut() + .start_send(message) + .map_err(ChannelError::Write) + } + + fn poll_flush<'a>( + self: &'a mut Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.transport_pin_mut() + .poll_flush(cx) + .map_err(ChannelError::Flush) + } + fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests { self.as_mut().project().canceled_requests } @@ -314,7 +351,7 @@ where ) -> Poll>>> { self.transport_pin_mut() .poll_next(cx) - .map_err(ChannelError::Transport) + .map_err(ChannelError::Read) .map_ok(|response| { self.complete(response); }) @@ -329,21 +366,13 @@ where Closed, } - let pending_requests_status = match self - .as_mut() - .poll_write_request(cx) - .map_err(ChannelError::Transport)? - { + let pending_requests_status = match self.as_mut().poll_write_request(cx)? { Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))), Poll::Ready(None) => ReceiverStatus::Closed, Poll::Pending => ReceiverStatus::Pending, }; - let canceled_requests_status = match self - .as_mut() - .poll_write_cancel(cx) - .map_err(ChannelError::Transport)? - { + let canceled_requests_status = match self.as_mut().poll_write_cancel(cx)? { Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))), Poll::Ready(None) => ReceiverStatus::Closed, Poll::Pending => ReceiverStatus::Pending, @@ -365,18 +394,12 @@ where match (pending_requests_status, canceled_requests_status) { (ReceiverStatus::Closed, ReceiverStatus::Closed) => { - ready!(self - .transport_pin_mut() - .poll_flush(cx) - .map_err(ChannelError::Transport)?); + ready!(self.poll_flush(cx)?); Poll::Ready(None) } (ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => { // No more messages to process, so flush any messages buffered in the transport. - ready!(self - .transport_pin_mut() - .poll_flush(cx) - .map_err(ChannelError::Transport)?); + ready!(self.poll_flush(cx)?); // Even if we fully-flush, we return Pending, because we have no more requests // or cancellations right now. @@ -392,7 +415,7 @@ where fn poll_next_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, C::Error>>> { + ) -> Poll, ChannelError>>> { if self.in_flight_requests().len() >= self.config.max_in_flight_requests { tracing::info!( "At in-flight request capacity ({}/{}).", @@ -430,7 +453,7 @@ where fn poll_next_cancellation( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>>> { ready!(self.ensure_writeable(cx)?); loop { @@ -452,9 +475,9 @@ where fn ensure_writeable<'a>( self: &'a mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - while self.transport_pin_mut().poll_ready(cx)?.is_pending() { - ready!(self.transport_pin_mut().poll_flush(cx)?); + ) -> Poll>>> { + while self.poll_ready(cx)?.is_pending() { + ready!(self.poll_flush(cx)?); } Poll::Ready(Some(Ok(()))) } @@ -462,7 +485,7 @@ where fn poll_write_request<'a>( self: &'a mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>>> { let DispatchRequest { ctx, span, @@ -486,7 +509,7 @@ where trace_context: ctx.trace_context, }, }); - self.transport_pin_mut().start_send(request)?; + self.start_send(request)?; let deadline = ctx.deadline; tracing::info!( tarpc.deadline = %humantime::format_rfc3339(deadline), @@ -503,7 +526,7 @@ where fn poll_write_cancel<'a>( self: &'a mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>>> { let (context, span, request_id) = match ready!(self.as_mut().poll_next_cancellation(cx)?) { Some(triple) => triple, None => return Poll::Ready(None), @@ -514,7 +537,7 @@ where trace_context: context.trace_context, request_id, }; - self.transport_pin_mut().start_send(cancel)?; + self.start_send(cancel)?; tracing::info!("CancelRequest"); Poll::Ready(Some(Ok(()))) } diff --git a/tarpc/src/serde_transport.rs b/tarpc/src/serde_transport.rs index 2dd5220..aae63a0 100644 --- a/tarpc/src/serde_transport.rs +++ b/tarpc/src/serde_transport.rs @@ -42,12 +42,10 @@ where type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - self.project().inner.poll_next(cx).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("while reading from transport: {}", e.into()), - ) - }) + self.project() + .inner + .poll_next(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } } @@ -63,39 +61,31 @@ where type Error = io::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_ready(cx).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("while readying write half of transport: {}", e.into()), - ) - }) + self.project() + .inner + .poll_ready(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { - self.project().inner.start_send(item).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("while writing to transport: {}", e.into()), - ) - }) + self.project() + .inner + .start_send(item) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_flush(cx).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("while flushing transport: {}", e.into()), - ) - }) + self.project() + .inner + .poll_flush(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_close(cx).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("while closing write half of transport: {}", e.into()), - ) - }) + self.project() + .inner + .poll_close(cx) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } }