Add some error context to client dispatch.

I'm taking this opportunity to experiment with anyhow. So far, results are promising. It was a bit hard to use with Poll<Option<Result<T, E>>> types, so I added a crate-internal helper trait for that.
This commit is contained in:
Tim Kuehn
2020-07-29 12:02:36 -07:00
parent ebd245a93d
commit e6597fab03
5 changed files with 46 additions and 12 deletions

View File

@@ -26,6 +26,7 @@ full = ["serde1", "tokio1", "serde-transport", "tcp"]
travis-ci = { repository = "google/tarpc" }
[dependencies]
anyhow = "1.0"
fnv = "1.0"
futures = "0.3"
humantime = "1.0"

View File

@@ -8,7 +8,7 @@ use crate::{
context,
trace::SpanId,
util::{Compact, TimeUntil},
ClientMessage, PollIo, Request, Response, Transport,
ClientMessage, PollContext, PollIo, Request, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
@@ -440,11 +440,18 @@ impl<Req, Resp, C> Future for RequestDispatch<Req, Resp, C>
where
C: Transport<ClientMessage<Req>, Response<Resp>>,
{
type Output = io::Result<()>;
type Output = anyhow::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<anyhow::Result<()>> {
loop {
match (self.as_mut().pump_read(cx)?, self.as_mut().pump_write(cx)?) {
match (
self.as_mut()
.pump_read(cx)
.context("failed to read from transport")?,
self.as_mut()
.pump_write(cx)
.context("failed to write to transport")?,
) {
(read, Poll::Ready(None)) => {
if self.as_mut().project().in_flight_requests.is_empty() {
info!("Shutdown: write half closed, and no requests in flight.");

View File

@@ -135,9 +135,10 @@ pub struct NewClient<C, D> {
pub dispatch: D,
}
impl<C, D> NewClient<C, D>
impl<C, D, E> NewClient<C, D>
where
D: Future<Output = io::Result<()>> + Send + 'static,
D: Future<Output = Result<(), E>> + Send + 'static,
E: std::fmt::Display,
{
/// Helper method to spawn the dispatch on the default executor.
#[cfg(feature = "tokio1")]

View File

@@ -32,8 +32,9 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport};
use anyhow::Context as _;
use futures::task::*;
use std::{io, time::SystemTime};
use std::{fmt::Display, io, time::SystemTime};
/// A message from a client to a server.
#[derive(Debug)]
@@ -118,3 +119,30 @@ impl<T> Request<T> {
}
pub(crate) type PollIo<T> = Poll<Option<io::Result<T>>>;
pub(crate) trait PollContext<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static;
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C;
}
impl<T> PollContext<T> for PollIo<T> {
fn context<C>(self, context: C) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
{
self.map(|o| o.map(|r| r.context(context)))
}
fn with_context<C, F>(self, f: F) -> Poll<Option<anyhow::Result<T>>>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C,
{
self.map(|o| o.map(|r| r.with_context(f)))
}
}

View File

@@ -20,7 +20,7 @@ use futures::{
task::*,
};
use humantime::format_rfc3339;
use log::{debug, trace};
use log::{debug, info, trace};
use pin_project::pin_project;
use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime};
use tokio::time::Timeout;
@@ -659,12 +659,11 @@ where
#[cfg(feature = "tokio1")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))]
pub fn execute(self) -> impl Future<Output = ()> {
use log::info;
self.try_for_each(|request_handler| async {
tokio::spawn(request_handler);
Ok(())
})
.map_ok(|()| info!("ClientHandler finished."))
.unwrap_or_else(|e| info!("ClientHandler errored out: {}", e))
}
}
@@ -695,8 +694,6 @@ where
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use log::info;
while let Some(channel) = ready!(self.as_mut().project().incoming.poll_next(cx)) {
tokio::spawn(
channel