diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 370885e..3d65976 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -26,6 +26,7 @@ full = ["serde1", "tokio1", "serde-transport", "tcp"] travis-ci = { repository = "google/tarpc" } [dependencies] +anyhow = "1.0" fnv = "1.0" futures = "0.3" humantime = "1.0" diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index 916eb49..ec81195 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -8,7 +8,7 @@ use crate::{ context, trace::SpanId, util::{Compact, TimeUntil}, - ClientMessage, PollIo, Request, Response, Transport, + ClientMessage, PollContext, PollIo, Request, Response, Transport, }; use fnv::FnvHashMap; use futures::{ @@ -440,11 +440,18 @@ impl Future for RequestDispatch where C: Transport, Response>, { - type Output = io::Result<()>; + type Output = anyhow::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match (self.as_mut().pump_read(cx)?, self.as_mut().pump_write(cx)?) { + match ( + self.as_mut() + .pump_read(cx) + .context("failed to read from transport")?, + self.as_mut() + .pump_write(cx) + .context("failed to write to transport")?, + ) { (read, Poll::Ready(None)) => { if self.as_mut().project().in_flight_requests.is_empty() { info!("Shutdown: write half closed, and no requests in flight."); diff --git a/tarpc/src/rpc/client/mod.rs b/tarpc/src/rpc/client/mod.rs index 47dd2d3..4c12737 100644 --- a/tarpc/src/rpc/client/mod.rs +++ b/tarpc/src/rpc/client/mod.rs @@ -135,9 +135,10 @@ pub struct NewClient { pub dispatch: D, } -impl NewClient +impl NewClient where - D: Future> + Send + 'static, + D: Future> + Send + 'static, + E: std::fmt::Display, { /// Helper method to spawn the dispatch on the default executor. #[cfg(feature = "tokio1")] diff --git a/tarpc/src/rpc/mod.rs b/tarpc/src/rpc/mod.rs index 2690157..b48b7a5 100644 --- a/tarpc/src/rpc/mod.rs +++ b/tarpc/src/rpc/mod.rs @@ -32,8 +32,9 @@ pub(crate) mod util; pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport}; +use anyhow::Context as _; use futures::task::*; -use std::{io, time::SystemTime}; +use std::{fmt::Display, io, time::SystemTime}; /// A message from a client to a server. #[derive(Debug)] @@ -118,3 +119,30 @@ impl Request { } pub(crate) type PollIo = Poll>>; +pub(crate) trait PollContext { + fn context(self, context: C) -> Poll>> + where + C: Display + Send + Sync + 'static; + + fn with_context(self, f: F) -> Poll>> + where + C: Display + Send + Sync + 'static, + F: FnOnce() -> C; +} + +impl PollContext for PollIo { + fn context(self, context: C) -> Poll>> + where + C: Display + Send + Sync + 'static, + { + self.map(|o| o.map(|r| r.context(context))) + } + + fn with_context(self, f: F) -> Poll>> + where + C: Display + Send + Sync + 'static, + F: FnOnce() -> C, + { + self.map(|o| o.map(|r| r.with_context(f))) + } +} diff --git a/tarpc/src/rpc/server/mod.rs b/tarpc/src/rpc/server/mod.rs index f1df204..334114f 100644 --- a/tarpc/src/rpc/server/mod.rs +++ b/tarpc/src/rpc/server/mod.rs @@ -20,7 +20,7 @@ use futures::{ task::*, }; use humantime::format_rfc3339; -use log::{debug, trace}; +use log::{debug, info, trace}; use pin_project::pin_project; use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime}; use tokio::time::Timeout; @@ -659,12 +659,11 @@ where #[cfg(feature = "tokio1")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio1")))] pub fn execute(self) -> impl Future { - use log::info; - self.try_for_each(|request_handler| async { tokio::spawn(request_handler); Ok(()) }) + .map_ok(|()| info!("ClientHandler finished.")) .unwrap_or_else(|e| info!("ClientHandler errored out: {}", e)) } } @@ -695,8 +694,6 @@ where type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - use log::info; - while let Some(channel) = ready!(self.as_mut().project().incoming.poll_next(cx)) { tokio::spawn( channel