Don't send a deadline-exceeded response.

The deadline-exceeded response was largely redundant, because the client
shouldn't normally be waiting for such a response, anyway -- the normal
client will automatically remove the in-flight request when it reaches
the deadline.

This also allows for internalizing the expiration+cleanup logic entirely
within BaseChannel, without having it leak into the Channel trait and
requiring action taken by the Requests struct.
This commit is contained in:
Tim Kuehn
2021-03-07 23:45:32 -08:00
parent 72d5dbba89
commit 66419db6fd
4 changed files with 13 additions and 45 deletions

View File

@@ -6,7 +6,7 @@
//! Provides a server that concurrently handles many connections sending multiplexed requests.
use crate::{context, ClientMessage, PollIo, Request, Response, ServerError, Transport};
use crate::{context, ClientMessage, PollIo, Request, Response, Transport};
use futures::{
future::{AbortRegistration, Abortable},
prelude::*,
@@ -235,9 +235,6 @@ where
deadline: SystemTime,
) -> Result<AbortRegistration, in_flight_requests::AlreadyExistsError>;
/// Yields a request that has expired, aborting any ongoing processing of that request.
fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo<u64>;
/// Returns a stream of requests that automatically handle request cancellation and response
/// routing.
fn requests(self) -> Requests<Self>
@@ -277,8 +274,15 @@ where
type Item = io::Result<Request<Req>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let self_ = self.as_mut().project();
while let Poll::Ready(Some(request_id)) = self_.in_flight_requests.poll_expired(cx)? {
// No need to send a response, since the client wouldn't be waiting for one anymore.
debug!("Request {} did not complete before deadline", request_id);
}
loop {
match ready!(self.as_mut().project().transport.poll_next(cx)?) {
let self_ = self.as_mut().project();
match ready!(self_.transport.poll_next(cx)?) {
Some(message) => match message {
ClientMessage::Request(request) => {
return Poll::Ready(Some(Ok(request)));
@@ -287,13 +291,8 @@ where
trace_context,
request_id,
} => {
if self
.as_mut()
.project()
.in_flight_requests
.cancel_request(request_id)
{
let remaining = self.in_flight_requests.len();
if self_.in_flight_requests.cancel_request(request_id) {
let remaining = self_.in_flight_requests.len();
trace!(
"[{}] Request canceled. In-flight requests = {}",
trace_context.trace_id,
@@ -371,10 +370,6 @@ where
.in_flight_requests
.start_request(id, deadline)
}
fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo<u64> {
self.project().in_flight_requests.poll_expired(cx)
}
}
/// A stream of requests coming over a channel.
@@ -449,17 +444,6 @@ where
cx: &mut Context<'_>,
read_half_closed: bool,
) -> PollIo<()> {
if let Poll::Ready(Some(request_id)) = self.channel_pin_mut().poll_expired(cx)? {
debug!("Request {} did not complete before deadline", request_id);
self.channel_pin_mut().start_send(Response {
request_id,
message: Err(ServerError {
kind: io::ErrorKind::TimedOut,
detail: Some(String::from("Request did not complete before deadline.")),
}),
})?;
return Poll::Ready(Some(Ok(())));
}
match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((context, response))) => {
trace!(

View File

@@ -7,7 +7,6 @@
use crate::{
server::{self, Channel},
util::Compact,
PollIo,
};
use fnv::FnvHashMap;
use futures::{future::AbortRegistration, prelude::*, ready, stream::Fuse, task::*};
@@ -120,10 +119,6 @@ where
) -> Result<AbortRegistration, super::in_flight_requests::AlreadyExistsError> {
self.inner_pin_mut().start_request(id, deadline)
}
fn poll_expired(mut self: Pin<&mut Self>, cx: &mut Context) -> PollIo<u64> {
self.inner_pin_mut().poll_expired(cx)
}
}
impl<C, K> TrackedChannel<C, K> {

View File

@@ -7,7 +7,7 @@
use crate::{
context,
server::{Channel, Config},
PollIo, Request, Response,
Request, Response,
};
use futures::{future::AbortRegistration, task::*, Sink, Stream};
use pin_project::pin_project;
@@ -88,10 +88,6 @@ where
.in_flight_requests
.start_request(id, deadline)
}
fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo<u64> {
self.project().in_flight_requests.poll_expired(cx)
}
}
impl<Req, Resp> FakeChannel<io::Result<Request<Req>>, Response<Resp>> {

View File

@@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.
use super::{Channel, Config};
use crate::{PollIo, Response, ServerError};
use crate::{Response, ServerError};
use futures::{future::AbortRegistration, prelude::*, ready, task::*};
use log::debug;
use pin_project::pin_project;
@@ -128,10 +128,6 @@ where
) -> Result<AbortRegistration, super::in_flight_requests::AlreadyExistsError> {
self.project().inner.start_request(id, deadline)
}
fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo<u64> {
self.project().inner.poll_expired(cx)
}
}
/// A stream of throttling channels.
@@ -317,9 +313,6 @@ fn throttler_poll_next_throttled_sink_not_ready() {
) -> Result<AbortRegistration, super::in_flight_requests::AlreadyExistsError> {
unimplemented!()
}
fn poll_expired(self: Pin<&mut Self>, _cx: &mut Context) -> PollIo<u64> {
unimplemented!()
}
}
}