Fix bug where expired request wasn't propagating cancellation.

DispatchResponse was incorrectly marking itself as complete even when
expiring without receiving a response. This can cause a chain of
deleterious effects:

- Request cancellation won't propagate when request timers expire.
- Which causes client dispatch to have an inconsistent in-flight request
  map containing stale IDs.
- Which can cause clients to hang rather than exiting.
This commit is contained in:
Tim Kuehn
2019-05-22 01:27:32 -07:00
parent 950ad5187c
commit fe164ca368
2 changed files with 57 additions and 6 deletions

View File

@@ -31,3 +31,4 @@ serde = { optional = true, version = "1.0" }
futures-test-preview = { version = "0.3.0-alpha.16" }
env_logger = "0.6"
tokio = "0.1"
tokio-executor = "0.1"

View File

@@ -185,10 +185,11 @@ impl<Resp> Future for DispatchResponse<Resp> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Resp>> {
let resp = ready!(self.response.poll_unpin(cx));
self.complete = true;
Poll::Ready(match resp {
Ok(resp) => Ok(resp.message?),
Ok(resp) => {
self.complete = true;
Ok(resp.message?)
}
Err(e) => Err({
let trace_id = *self.as_mut().ctx().trace_id();
let server_addr = *self.as_mut().server_addr();
@@ -216,7 +217,10 @@ impl<Resp> Future for DispatchResponse<Resp> {
)
}
} else if e.is_inner() {
// The oneshot is Canceled when the dispatch task ends.
// The oneshot is Canceled when the dispatch task ends. In that case,
// there's nothing listening on the other side, so there's no point in
// propagating cancellation.
self.complete = true;
io::Error::from(io::ErrorKind::ConnectionReset)
} else {
panic!(
@@ -549,6 +553,12 @@ where
);
return Poll::Ready(Ok(()));
}
let addr = *self.as_mut().server_addr();
info!(
"[{}] {} requests in flight.",
addr,
self.as_mut().in_flight_requests().len()
);
match read {
Poll::Ready(Some(())) => continue,
_ => {
@@ -819,25 +829,65 @@ where
#[cfg(test)]
mod tests {
use super::{
CanceledRequests, Channel, DispatchResponse, RequestCancellation, RequestDispatch,
cancellations, CanceledRequests, Channel, DispatchResponse, RequestCancellation,
RequestDispatch,
};
use crate::{
client::Config,
context,
transport::{self, channel::UnboundedChannel},
util::deadline_compat,
ClientMessage, Response,
};
use fnv::FnvHashMap;
use futures::{channel::mpsc, prelude::*, task::Context, Poll};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
task::Context,
Poll,
};
use futures_test::task::noop_waker_ref;
use std::time::Duration;
use std::{
marker,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
sync::atomic::AtomicU64,
sync::Arc,
time::Instant,
};
#[test]
fn dispatch_response_cancels_on_timeout() {
let past_deadline = Instant::now() - Duration::from_secs(1);
let (_response_completion, response) = oneshot::channel();
let (cancellation, mut canceled_requests) = cancellations();
let resp = DispatchResponse::<u64> {
// Deadline in the past should cause resp to error out when polled.
response: deadline_compat::Deadline::new(response, past_deadline),
complete: false,
request_id: 3,
cancellation,
ctx: context::current(),
server_addr: SocketAddr::from(([0, 0, 0, 0], 9999)),
};
{
pin_utils::pin_mut!(resp);
let timer = tokio_timer::Timer::default();
tokio_timer::with_default(
&timer.handle(),
&mut tokio_executor::enter().unwrap(),
|_| {
let _ = resp
.as_mut()
.poll(&mut Context::from_waker(&noop_waker_ref()));
},
);
// End of block should cause resp.drop() to run, which should send a cancel message.
}
assert!(canceled_requests.0.try_next().unwrap() == Some(3));
}
#[test]
fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up();