Centralize client-side request deadline handling.

Before this commit, each request future had its own timeout and would
communicate to the client Channel when a request was no longer being
listened to. Now, instead, the Channel tracks deadlines of in-flight
requests and completes requests with deadline-exceeded errors when they
expire.

This should be functionally equivalent to the previous way. It just cuts
down on the amount of two-way processing required. Unfortunately,
dropping a response future early still requires the client to send a
cancellation message to the Channel.
This commit is contained in:
Tim Kuehn
2021-03-07 15:31:17 -08:00
parent 3c978c5bf6
commit ce4fd49161
4 changed files with 214 additions and 88 deletions

View File

@@ -13,6 +13,7 @@ use std::io;
/// Provides a [`Client`] backed by a transport.
pub mod channel;
mod in_flight_requests;
pub use channel::{new, Channel};
/// Sends multiplexed requests to, and receives responses from, a server.

View File

@@ -5,12 +5,9 @@
// https://opensource.org/licenses/MIT.
use crate::{
context,
trace::SpanId,
util::{Compact, TimeUntil},
ClientMessage, PollContext, PollIo, Request, Response, Transport,
client::in_flight_requests::InFlightRequests, context, trace::SpanId, ClientMessage,
PollContext, PollIo, Request, Response, Transport,
};
use fnv::FnvHashMap;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
@@ -18,7 +15,7 @@ use futures::{
stream::Fuse,
task::*,
};
use log::{debug, info, trace};
use log::{info, trace};
use pin_project::{pin_project, pinned_drop};
use std::{
convert::TryFrom,
@@ -88,7 +85,7 @@ impl<'a, Req, Resp> Future for Send<'a, Req, Resp> {
#[must_use = "futures do nothing unless polled"]
pub struct Call<'a, Req, Resp> {
#[pin]
fut: tokio::time::Timeout<AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>>,
fut: AndThenIdent<Send<'a, Req, Resp>, DispatchResponse<Resp>>,
}
impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
@@ -96,13 +93,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let resp = ready!(self.as_mut().project().fut.poll(cx));
Poll::Ready(match resp {
Ok(resp) => resp,
Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client dropped expired request.".to_string(),
)),
})
Poll::Ready(resp)
}
}
@@ -140,15 +131,8 @@ impl<Req, Resp> Channel<Req, Resp> {
/// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that
/// resolves to the response.
pub fn call(&mut self, ctx: context::Context, request: Req) -> Call<Req, Resp> {
let timeout = ctx.deadline.time_until();
trace!(
"[{}] Queuing request with timeout {:?}.",
ctx.trace_id(),
timeout,
);
Call {
fut: tokio::time::timeout(timeout, AndThenIdent::new(self.send(ctx, request))),
fut: AndThenIdent::new(self.send(ctx, request)),
}
}
}
@@ -228,7 +212,7 @@ where
config,
canceled_requests,
transport: transport.fuse(),
in_flight_requests: FnvHashMap::default(),
in_flight_requests: InFlightRequests::default(),
pending_requests: pending_requests.fuse(),
},
}
@@ -249,7 +233,7 @@ pub struct RequestDispatch<Req, Resp, C> {
#[pin]
canceled_requests: Fuse<CanceledRequests>,
/// Requests already written to the wire that haven't yet received responses.
in_flight_requests: FnvHashMap<u64, InFlightData<Resp>>,
in_flight_requests: InFlightRequests<Resp>,
/// Configures limits to prevent unlimited resource usage.
config: Config,
}
@@ -258,6 +242,10 @@ impl<Req, Resp, C> RequestDispatch<Req, Resp, C>
where
C: Transport<ClientMessage<Req>, Response<Resp>>,
{
fn in_flight_requests<'a>(self: &'a mut Pin<&mut Self>) -> &'a mut InFlightRequests<Resp> {
self.as_mut().project().in_flight_requests
}
fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
Poll::Ready(
match ready!(self.as_mut().project().transport.poll_next(cx)?) {
@@ -294,6 +282,16 @@ where
Poll::Pending => ReceiverStatus::NotReady,
};
// Receiving Poll::Ready(None) when polling expired requests never indicates "Closed",
// because there can temporarily be zero in-flight rquests. Therefore, there is no need to
// track the status like is done with pending and cancelled requests.
if let Poll::Ready(Some(_)) = self.in_flight_requests().poll_expired(cx)? {
// Expired requests are considered complete; there is no compelling reason to send a
// cancellation message to the server, since it will have already exhausted its
// allotted processing time.
return Poll::Ready(Some(Ok(())));
}
match (pending_requests_status, canceled_requests_status) {
(ReceiverStatus::Closed, ReceiverStatus::Closed) => {
ready!(self.as_mut().project().transport.poll_flush(cx)?);
@@ -315,10 +313,10 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<DispatchRequest<Req, Resp>> {
if self.as_mut().project().in_flight_requests.len() >= self.config.max_in_flight_requests {
if self.in_flight_requests().len() >= self.config.max_in_flight_requests {
info!(
"At in-flight request capacity ({}/{}).",
self.as_mut().project().in_flight_requests.len(),
self.in_flight_requests().len(),
self.config.max_in_flight_requests
);
@@ -379,15 +377,8 @@ where
.poll_next_unpin(cx);
match ready!(cancellation) {
Some(request_id) => {
if let Some(in_flight_data) = self
.as_mut()
.project()
.in_flight_requests
.remove(&request_id)
{
self.as_mut().project().in_flight_requests.compact(0.1);
debug!("[{}] Removed request.", in_flight_data.ctx.trace_id());
return Poll::Ready(Some(Ok((in_flight_data.ctx, request_id))));
if let Some(ctx) = self.in_flight_requests().cancel_request(request_id) {
return Poll::Ready(Some(Ok((ctx, request_id))));
}
}
None => return Poll::Ready(None),
@@ -409,13 +400,13 @@ where
},
});
self.as_mut().project().transport.start_send(request)?;
self.as_mut().project().in_flight_requests.insert(
request_id,
InFlightData {
ctx: dispatch_request.ctx,
response_completion: dispatch_request.response_completion,
},
);
self.in_flight_requests()
.insert_request(
request_id,
dispatch_request.ctx,
dispatch_request.response_completion,
)
.expect("Request IDs should be unique");
Ok(())
}
@@ -436,26 +427,7 @@ where
/// Sends a server response to the client task that initiated the associated request.
fn complete(mut self: Pin<&mut Self>, response: Response<Resp>) -> bool {
if let Some(in_flight_data) = self
.as_mut()
.project()
.in_flight_requests
.remove(&response.request_id)
{
self.as_mut().project().in_flight_requests.compact(0.1);
trace!("[{}] Received response.", in_flight_data.ctx.trace_id());
let _ = in_flight_data.response_completion.send(response);
return true;
}
debug!(
"No in-flight request found for request_id = {}.",
response.request_id
);
// If the response completion was absent, then the request was already canceled.
false
self.in_flight_requests().complete_request(response)
}
}
@@ -480,13 +452,13 @@ where
return Poll::Ready(Ok(()));
}
(read, Poll::Ready(None)) => {
if self.as_mut().project().in_flight_requests.is_empty() {
if self.in_flight_requests().is_empty() {
info!("Shutdown: write half closed, and no requests in flight.");
return Poll::Ready(Ok(()));
}
info!(
"Shutdown: write half closed, and {} requests in flight.",
self.as_mut().project().in_flight_requests.len()
self.in_flight_requests().len()
);
match read {
Poll::Ready(Some(())) => continue,
@@ -504,16 +476,10 @@ where
/// the lifecycle of the request.
#[derive(Debug)]
struct DispatchRequest<Req, Resp> {
ctx: context::Context,
request_id: u64,
request: Req,
response_completion: oneshot::Sender<Response<Resp>>,
}
#[derive(Debug)]
struct InFlightData<Resp> {
ctx: context::Context,
response_completion: oneshot::Sender<Response<Resp>>,
pub ctx: context::Context,
pub request_id: u64,
pub request: Req,
pub response_completion: oneshot::Sender<Response<Resp>>,
}
/// Sends request cancellation signals.
@@ -733,12 +699,11 @@ mod tests {
RequestDispatch,
};
use crate::{
client::Config,
client::{in_flight_requests::InFlightRequests, Config},
context,
transport::{self, channel::UnboundedChannel},
ClientMessage, Response,
};
use fnv::FnvHashMap;
use futures::{
channel::{mpsc, oneshot},
prelude::*,
@@ -823,7 +788,7 @@ mod tests {
let req = send_request(&mut channel, "hi").await;
assert!(dispatch.as_mut().pump_write(cx).ready().is_some());
assert!(!dispatch.as_mut().project().in_flight_requests.is_empty());
assert!(!dispatch.in_flight_requests().is_empty());
// Test that a request future dropped after it's processed by dispatch will cause the request
// to be removed from the in-flight request map.
@@ -833,7 +798,7 @@ mod tests {
} else {
panic!("Expected request to be cancelled")
};
assert!(dispatch.project().in_flight_requests.is_empty());
assert!(dispatch.in_flight_requests().is_empty());
}
#[tokio::test]
@@ -866,7 +831,7 @@ mod tests {
transport: client_channel.fuse(),
pending_requests: pending_requests.fuse(),
canceled_requests: CanceledRequests(canceled_requests).fuse(),
in_flight_requests: FnvHashMap::default(),
in_flight_requests: InFlightRequests::default(),
config: Config::default(),
};

View File

@@ -0,0 +1,161 @@
use crate::{
context,
util::{Compact, TimeUntil},
PollIo, Response, ServerError,
};
use fnv::FnvHashMap;
use futures::{channel::oneshot, ready};
use log::{debug, trace};
use std::{
collections::hash_map,
io,
task::{Context, Poll},
};
use tokio_util::time::delay_queue::{self, DelayQueue};
/// Requests already written to the wire that haven't yet received responses.
#[derive(Debug)]
pub struct InFlightRequests<Resp> {
request_data: FnvHashMap<u64, RequestData<Resp>>,
deadlines: DelayQueue<u64>,
}
impl<Resp> Default for InFlightRequests<Resp> {
fn default() -> Self {
Self {
request_data: Default::default(),
deadlines: Default::default(),
}
}
}
#[derive(Debug)]
struct RequestData<Resp> {
ctx: context::Context,
response_completion: oneshot::Sender<Response<Resp>>,
/// The key to remove the timer for the request's deadline.
deadline_key: delay_queue::Key,
}
/// An error returned when an attempt is made to insert a request with an ID that is already in
/// use.
#[derive(Debug)]
pub struct AlreadyExistsError;
impl<Resp> InFlightRequests<Resp> {
/// Returns the number of in-flight requests.
pub fn len(&self) -> usize {
self.request_data.len()
}
/// Returns true iff there are no requests in flight.
pub fn is_empty(&self) -> bool {
self.request_data.is_empty()
}
/// Starts a request, unless a request with the same ID is already in flight.
pub fn insert_request(
&mut self,
request_id: u64,
ctx: context::Context,
response_completion: oneshot::Sender<Response<Resp>>,
) -> Result<(), AlreadyExistsError> {
match self.request_data.entry(request_id) {
hash_map::Entry::Vacant(vacant) => {
let timeout = ctx.deadline.time_until();
trace!(
"[{}] Queuing request with timeout {:?}.",
ctx.trace_id(),
timeout,
);
let deadline_key = self.deadlines.insert(request_id, timeout);
vacant.insert(RequestData {
ctx,
response_completion,
deadline_key,
});
Ok(())
}
hash_map::Entry::Occupied(_) => Err(AlreadyExistsError),
}
}
/// Removes a request without aborting. Returns true iff the request was found.
pub fn complete_request(&mut self, response: Response<Resp>) -> bool {
if let Some(request_data) = self.request_data.remove(&response.request_id) {
self.request_data.compact(0.1);
trace!("[{}] Received response.", request_data.ctx.trace_id());
self.deadlines.remove(&request_data.deadline_key);
request_data.complete(response);
return true;
}
debug!(
"No in-flight request found for request_id = {}.",
response.request_id
);
// If the response completion was absent, then the request was already canceled.
false
}
/// Cancels a request without completing (typically used when a request handle was dropped
/// before the request completed).
pub fn cancel_request(&mut self, request_id: u64) -> Option<context::Context> {
if let Some(request_data) = self.request_data.remove(&request_id) {
self.request_data.compact(0.1);
trace!("[{}] Cancelling request.", request_data.ctx.trace_id());
self.deadlines.remove(&request_data.deadline_key);
Some(request_data.ctx)
} else {
None
}
}
/// Yields a request that has expired, completing it with a TimedOut error.
/// The caller should send cancellation messages for any yielded request ID.
pub fn poll_expired(&mut self, cx: &mut Context) -> PollIo<u64> {
Poll::Ready(match ready!(self.deadlines.poll_expired(cx)) {
Some(Ok(expired)) => {
let request_id = expired.into_inner();
if let Some(request_data) = self.request_data.remove(&request_id) {
self.request_data.compact(0.1);
request_data.complete(Self::deadline_exceeded_error(request_id));
}
Some(Ok(request_id))
}
Some(Err(e)) => Some(Err(io::Error::new(io::ErrorKind::Other, e))),
None => None,
})
}
fn deadline_exceeded_error(request_id: u64) -> Response<Resp> {
Response {
request_id,
message: Err(ServerError {
kind: io::ErrorKind::TimedOut,
detail: Some("Client dropped expired request.".to_string()),
}),
}
}
}
/// When InFlightRequests is dropped, any outstanding requests are completed with a
/// deadline-exceeded error.
impl<Resp> Drop for InFlightRequests<Resp> {
fn drop(&mut self) {
let deadlines = &mut self.deadlines;
for (_, request_data) in self.request_data.drain() {
let expired = deadlines.remove(&request_data.deadline_key);
request_data.complete(Self::deadline_exceeded_error(expired.into_inner()));
}
}
}
impl<Resp> RequestData<Resp> {
fn complete(self, response: Response<Resp>) {
let _ = self.response_completion.send(response);
}
}

View File

@@ -38,6 +38,7 @@ struct RequestData {
pub struct AlreadyExistsError;
impl InFlightRequests {
/// Returns the number of in-flight requests.
pub fn len(&self) -> usize {
self.request_data.len()
}
@@ -48,21 +49,18 @@ impl InFlightRequests {
request_id: u64,
deadline: SystemTime,
) -> Result<AbortRegistration, AlreadyExistsError> {
let timeout = deadline.time_until();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let deadline_key = self.deadlines.insert(request_id, timeout);
match self.request_data.entry(request_id) {
hash_map::Entry::Vacant(vacant) => {
let timeout = deadline.time_until();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let deadline_key = self.deadlines.insert(request_id, timeout);
vacant.insert(RequestData {
abort_handle,
deadline_key,
});
Ok(abort_registration)
}
hash_map::Entry::Occupied(_) => {
self.deadlines.remove(&deadline_key);
Err(AlreadyExistsError)
}
hash_map::Entry::Occupied(_) => Err(AlreadyExistsError),
}
}
@@ -81,6 +79,7 @@ impl InFlightRequests {
}
/// Removes a request without aborting. Returns true iff the request was found.
/// This method should be used when a response is being sent.
pub fn remove_request(&mut self, request_id: u64) -> bool {
if let Some(request_data) = self.request_data.remove(&request_id) {
self.request_data.compact(0.1);
@@ -109,9 +108,9 @@ impl InFlightRequests {
}
}
/// When InFlightRequests is dropped, any requests still in flight are aborted.
/// When InFlightRequests is dropped, any outstanding requests are aborted.
impl Drop for InFlightRequests {
fn drop(self: &mut Self) {
fn drop(&mut self) {
self.request_data
.values()
.for_each(|request_data| request_data.abort_handle.abort())