Use more structured errors in client.

This commit is contained in:
Tim Kuehn
2021-04-21 12:58:55 -07:00
parent 4151d0abd3
commit eb67c540b9
2 changed files with 80 additions and 67 deletions

View File

@@ -60,15 +60,16 @@ pub struct NewClient<C, D> {
impl<C, D, E> NewClient<C, D> impl<C, D, E> NewClient<C, D>
where where
D: Future<Output = Result<(), E>> + Send + 'static, D: Future<Output = Result<(), E>> + Send + 'static,
E: std::fmt::Display, E: std::error::Error + Send + Sync + 'static,
{ {
/// Helper method to spawn the dispatch on the default executor. /// Helper method to spawn the dispatch on the default executor.
#[cfg(feature = "tokio1")] #[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))] #[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub fn spawn(self) -> C { pub fn spawn(self) -> C {
let dispatch = self let dispatch = self.dispatch.unwrap_or_else(move |e| {
.dispatch let e = anyhow::Error::new(e);
.unwrap_or_else(move |e| tracing::warn!("Connection broken: {}", e)); tracing::warn!("Connection broken: {:?}", e);
});
tokio::spawn(dispatch); tokio::spawn(dispatch);
self.client self.client
} }
@@ -278,11 +279,20 @@ pub enum ChannelError<E>
where where
E: Error + Send + Sync + 'static, E: Error + Send + Sync + 'static,
{ {
/// An error occurred reading from, or writing to, the transport. /// Could not read from the transport.
#[error("an error occurred in the transport: {0}")] #[error("could not read from the transport")]
Transport(#[source] E), Read(#[source] E),
/// An error occurred while polling expired requests. /// Could not ready the transport for writes.
#[error("an error occurred while polling expired requests: {0}")] #[error("could not ready the transport for writes")]
Ready(#[source] E),
/// Could not write to the transport.
#[error("could not write to the transport")]
Write(#[source] E),
/// Could not flush the transport.
#[error("could not flush the transport")]
Flush(#[source] E),
/// Could not poll expired requests.
#[error("could not poll expired requests")]
Timer(#[source] tokio::time::error::Error), Timer(#[source] tokio::time::error::Error),
} }
@@ -298,6 +308,33 @@ where
self.as_mut().project().transport self.as_mut().project().transport
} }
fn poll_ready<'a>(
self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ChannelError<C::Error>>> {
self.transport_pin_mut()
.poll_ready(cx)
.map_err(ChannelError::Ready)
}
fn start_send<'a>(
self: &'a mut Pin<&mut Self>,
message: ClientMessage<Req>,
) -> Result<(), ChannelError<C::Error>> {
self.transport_pin_mut()
.start_send(message)
.map_err(ChannelError::Write)
}
fn poll_flush<'a>(
self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), ChannelError<C::Error>>> {
self.transport_pin_mut()
.poll_flush(cx)
.map_err(ChannelError::Flush)
}
fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests { fn canceled_requests_mut<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut CanceledRequests {
self.as_mut().project().canceled_requests self.as_mut().project().canceled_requests
} }
@@ -314,7 +351,7 @@ where
) -> Poll<Option<Result<(), ChannelError<C::Error>>>> { ) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
self.transport_pin_mut() self.transport_pin_mut()
.poll_next(cx) .poll_next(cx)
.map_err(ChannelError::Transport) .map_err(ChannelError::Read)
.map_ok(|response| { .map_ok(|response| {
self.complete(response); self.complete(response);
}) })
@@ -329,21 +366,13 @@ where
Closed, Closed,
} }
let pending_requests_status = match self let pending_requests_status = match self.as_mut().poll_write_request(cx)? {
.as_mut()
.poll_write_request(cx)
.map_err(ChannelError::Transport)?
{
Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))), Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))),
Poll::Ready(None) => ReceiverStatus::Closed, Poll::Ready(None) => ReceiverStatus::Closed,
Poll::Pending => ReceiverStatus::Pending, Poll::Pending => ReceiverStatus::Pending,
}; };
let canceled_requests_status = match self let canceled_requests_status = match self.as_mut().poll_write_cancel(cx)? {
.as_mut()
.poll_write_cancel(cx)
.map_err(ChannelError::Transport)?
{
Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))), Poll::Ready(Some(())) => return Poll::Ready(Some(Ok(()))),
Poll::Ready(None) => ReceiverStatus::Closed, Poll::Ready(None) => ReceiverStatus::Closed,
Poll::Pending => ReceiverStatus::Pending, Poll::Pending => ReceiverStatus::Pending,
@@ -365,18 +394,12 @@ where
match (pending_requests_status, canceled_requests_status) { match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => { (ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self ready!(self.poll_flush(cx)?);
.transport_pin_mut()
.poll_flush(cx)
.map_err(ChannelError::Transport)?);
Poll::Ready(None) Poll::Ready(None)
} }
(ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => { (ReceiverStatus::Pending, _) | (_, ReceiverStatus::Pending) => {
// No more messages to process, so flush any messages buffered in the transport. // No more messages to process, so flush any messages buffered in the transport.
ready!(self ready!(self.poll_flush(cx)?);
.transport_pin_mut()
.poll_flush(cx)
.map_err(ChannelError::Transport)?);
// Even if we fully-flush, we return Pending, because we have no more requests // Even if we fully-flush, we return Pending, because we have no more requests
// or cancellations right now. // or cancellations right now.
@@ -392,7 +415,7 @@ where
fn poll_next_request( fn poll_next_request(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<DispatchRequest<Req, Resp>, C::Error>>> { ) -> Poll<Option<Result<DispatchRequest<Req, Resp>, ChannelError<C::Error>>>> {
if self.in_flight_requests().len() >= self.config.max_in_flight_requests { if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
tracing::info!( tracing::info!(
"At in-flight request capacity ({}/{}).", "At in-flight request capacity ({}/{}).",
@@ -430,7 +453,7 @@ where
fn poll_next_cancellation( fn poll_next_cancellation(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<(context::Context, Span, u64), C::Error>>> { ) -> Poll<Option<Result<(context::Context, Span, u64), ChannelError<C::Error>>>> {
ready!(self.ensure_writeable(cx)?); ready!(self.ensure_writeable(cx)?);
loop { loop {
@@ -452,9 +475,9 @@ where
fn ensure_writeable<'a>( fn ensure_writeable<'a>(
self: &'a mut Pin<&mut Self>, self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<(), C::Error>>> { ) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
while self.transport_pin_mut().poll_ready(cx)?.is_pending() { while self.poll_ready(cx)?.is_pending() {
ready!(self.transport_pin_mut().poll_flush(cx)?); ready!(self.poll_flush(cx)?);
} }
Poll::Ready(Some(Ok(()))) Poll::Ready(Some(Ok(())))
} }
@@ -462,7 +485,7 @@ where
fn poll_write_request<'a>( fn poll_write_request<'a>(
self: &'a mut Pin<&mut Self>, self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<(), C::Error>>> { ) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
let DispatchRequest { let DispatchRequest {
ctx, ctx,
span, span,
@@ -486,7 +509,7 @@ where
trace_context: ctx.trace_context, trace_context: ctx.trace_context,
}, },
}); });
self.transport_pin_mut().start_send(request)?; self.start_send(request)?;
let deadline = ctx.deadline; let deadline = ctx.deadline;
tracing::info!( tracing::info!(
tarpc.deadline = %humantime::format_rfc3339(deadline), tarpc.deadline = %humantime::format_rfc3339(deadline),
@@ -503,7 +526,7 @@ where
fn poll_write_cancel<'a>( fn poll_write_cancel<'a>(
self: &'a mut Pin<&mut Self>, self: &'a mut Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<(), C::Error>>> { ) -> Poll<Option<Result<(), ChannelError<C::Error>>>> {
let (context, span, request_id) = match ready!(self.as_mut().poll_next_cancellation(cx)?) { let (context, span, request_id) = match ready!(self.as_mut().poll_next_cancellation(cx)?) {
Some(triple) => triple, Some(triple) => triple,
None => return Poll::Ready(None), None => return Poll::Ready(None),
@@ -514,7 +537,7 @@ where
trace_context: context.trace_context, trace_context: context.trace_context,
request_id, request_id,
}; };
self.transport_pin_mut().start_send(cancel)?; self.start_send(cancel)?;
tracing::info!("CancelRequest"); tracing::info!("CancelRequest");
Poll::Ready(Some(Ok(()))) Poll::Ready(Some(Ok(())))
} }

View File

@@ -42,12 +42,10 @@ where
type Item = io::Result<Item>; type Item = io::Result<Item>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Item>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Item>>> {
self.project().inner.poll_next(cx).map_err(|e| { self.project()
io::Error::new( .inner
io::ErrorKind::Other, .poll_next(cx)
format!("while reading from transport: {}", e.into()), .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
)
})
} }
} }
@@ -63,39 +61,31 @@ where
type Error = io::Error; type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_ready(cx).map_err(|e| { self.project()
io::Error::new( .inner
io::ErrorKind::Other, .poll_ready(cx)
format!("while readying write half of transport: {}", e.into()), .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
)
})
} }
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> { fn start_send(self: Pin<&mut Self>, item: SinkItem) -> io::Result<()> {
self.project().inner.start_send(item).map_err(|e| { self.project()
io::Error::new( .inner
io::ErrorKind::Other, .start_send(item)
format!("while writing to transport: {}", e.into()), .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
)
})
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx).map_err(|e| { self.project()
io::Error::new( .inner
io::ErrorKind::Other, .poll_flush(cx)
format!("while flushing transport: {}", e.into()), .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
)
})
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx).map_err(|e| { self.project()
io::Error::new( .inner
io::ErrorKind::Other, .poll_close(cx)
format!("while closing write half of transport: {}", e.into()), .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
)
})
} }
} }