From fe164ca3682062c5bc4a6abac4edd0d89820a8dc Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 22 May 2019 01:27:32 -0700 Subject: [PATCH] Fix bug where expired request wasn't propagating cancellation. DispatchResponse was incorrectly marking itself as complete even when expiring without receiving a response. This can cause a chain of deleterious effects: - Request cancellation won't propagate when request timers expire. - Which causes client dispatch to have an inconsistent in-flight request map containing stale IDs. - Which can cause clients to hang rather than exiting. --- rpc/Cargo.toml | 1 + rpc/src/client/channel.rs | 62 +++++++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 520b6db..e2cb909 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -31,3 +31,4 @@ serde = { optional = true, version = "1.0" } futures-test-preview = { version = "0.3.0-alpha.16" } env_logger = "0.6" tokio = "0.1" +tokio-executor = "0.1" diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 0dbc572..3577e31 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -185,10 +185,11 @@ impl Future for DispatchResponse { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let resp = ready!(self.response.poll_unpin(cx)); - self.complete = true; - Poll::Ready(match resp { - Ok(resp) => Ok(resp.message?), + Ok(resp) => { + self.complete = true; + Ok(resp.message?) + } Err(e) => Err({ let trace_id = *self.as_mut().ctx().trace_id(); let server_addr = *self.as_mut().server_addr(); @@ -216,7 +217,10 @@ impl Future for DispatchResponse { ) } } else if e.is_inner() { - // The oneshot is Canceled when the dispatch task ends. + // The oneshot is Canceled when the dispatch task ends. In that case, + // there's nothing listening on the other side, so there's no point in + // propagating cancellation. + self.complete = true; io::Error::from(io::ErrorKind::ConnectionReset) } else { panic!( @@ -549,6 +553,12 @@ where ); return Poll::Ready(Ok(())); } + let addr = *self.as_mut().server_addr(); + info!( + "[{}] {} requests in flight.", + addr, + self.as_mut().in_flight_requests().len() + ); match read { Poll::Ready(Some(())) => continue, _ => { @@ -819,25 +829,65 @@ where #[cfg(test)] mod tests { use super::{ - CanceledRequests, Channel, DispatchResponse, RequestCancellation, RequestDispatch, + cancellations, CanceledRequests, Channel, DispatchResponse, RequestCancellation, + RequestDispatch, }; use crate::{ client::Config, context, transport::{self, channel::UnboundedChannel}, + util::deadline_compat, ClientMessage, Response, }; use fnv::FnvHashMap; - use futures::{channel::mpsc, prelude::*, task::Context, Poll}; + use futures::{ + channel::{mpsc, oneshot}, + prelude::*, + task::Context, + Poll, + }; use futures_test::task::noop_waker_ref; + use std::time::Duration; use std::{ marker, net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, sync::atomic::AtomicU64, sync::Arc, + time::Instant, }; + #[test] + fn dispatch_response_cancels_on_timeout() { + let past_deadline = Instant::now() - Duration::from_secs(1); + let (_response_completion, response) = oneshot::channel(); + let (cancellation, mut canceled_requests) = cancellations(); + let resp = DispatchResponse:: { + // Deadline in the past should cause resp to error out when polled. + response: deadline_compat::Deadline::new(response, past_deadline), + complete: false, + request_id: 3, + cancellation, + ctx: context::current(), + server_addr: SocketAddr::from(([0, 0, 0, 0], 9999)), + }; + { + pin_utils::pin_mut!(resp); + let timer = tokio_timer::Timer::default(); + tokio_timer::with_default( + &timer.handle(), + &mut tokio_executor::enter().unwrap(), + |_| { + let _ = resp + .as_mut() + .poll(&mut Context::from_waker(&noop_waker_ref())); + }, + ); + // End of block should cause resp.drop() to run, which should send a cancel message. + } + assert!(canceled_requests.0.try_next().unwrap() == Some(3)); + } + #[test] fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up();