From 66419db6fda5a1c7b0fc1b909b4c9c30484dc149 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 7 Mar 2021 23:45:32 -0800 Subject: [PATCH] Don't send a deadline-exceeded response. The deadline-exceeded response was largely redundant, because the client shouldn't normally be waiting for such a response, anyway -- the normal client will automatically remove the in-flight request when it reaches the deadline. This also allows for internalizing the expiration+cleanup logic entirely within BaseChannel, without having it leak into the Channel trait and requiring action taken by the Requests struct. --- tarpc/src/server.rs | 38 +++++++++++------------------------- tarpc/src/server/filter.rs | 5 ----- tarpc/src/server/testing.rs | 6 +----- tarpc/src/server/throttle.rs | 9 +-------- 4 files changed, 13 insertions(+), 45 deletions(-) diff --git a/tarpc/src/server.rs b/tarpc/src/server.rs index abfc39e..9c5a75f 100644 --- a/tarpc/src/server.rs +++ b/tarpc/src/server.rs @@ -6,7 +6,7 @@ //! Provides a server that concurrently handles many connections sending multiplexed requests. -use crate::{context, ClientMessage, PollIo, Request, Response, ServerError, Transport}; +use crate::{context, ClientMessage, PollIo, Request, Response, Transport}; use futures::{ future::{AbortRegistration, Abortable}, prelude::*, @@ -235,9 +235,6 @@ where deadline: SystemTime, ) -> Result; - /// Yields a request that has expired, aborting any ongoing processing of that request. - fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo; - /// Returns a stream of requests that automatically handle request cancellation and response /// routing. fn requests(self) -> Requests @@ -277,8 +274,15 @@ where type Item = io::Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let self_ = self.as_mut().project(); + while let Poll::Ready(Some(request_id)) = self_.in_flight_requests.poll_expired(cx)? { + // No need to send a response, since the client wouldn't be waiting for one anymore. + debug!("Request {} did not complete before deadline", request_id); + } + loop { - match ready!(self.as_mut().project().transport.poll_next(cx)?) { + let self_ = self.as_mut().project(); + match ready!(self_.transport.poll_next(cx)?) { Some(message) => match message { ClientMessage::Request(request) => { return Poll::Ready(Some(Ok(request))); @@ -287,13 +291,8 @@ where trace_context, request_id, } => { - if self - .as_mut() - .project() - .in_flight_requests - .cancel_request(request_id) - { - let remaining = self.in_flight_requests.len(); + if self_.in_flight_requests.cancel_request(request_id) { + let remaining = self_.in_flight_requests.len(); trace!( "[{}] Request canceled. In-flight requests = {}", trace_context.trace_id, @@ -371,10 +370,6 @@ where .in_flight_requests .start_request(id, deadline) } - - fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo { - self.project().in_flight_requests.poll_expired(cx) - } } /// A stream of requests coming over a channel. @@ -449,17 +444,6 @@ where cx: &mut Context<'_>, read_half_closed: bool, ) -> PollIo<()> { - if let Poll::Ready(Some(request_id)) = self.channel_pin_mut().poll_expired(cx)? { - debug!("Request {} did not complete before deadline", request_id); - self.channel_pin_mut().start_send(Response { - request_id, - message: Err(ServerError { - kind: io::ErrorKind::TimedOut, - detail: Some(String::from("Request did not complete before deadline.")), - }), - })?; - return Poll::Ready(Some(Ok(()))); - } match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((context, response))) => { trace!( diff --git a/tarpc/src/server/filter.rs b/tarpc/src/server/filter.rs index 0bee5ec..18935e2 100644 --- a/tarpc/src/server/filter.rs +++ b/tarpc/src/server/filter.rs @@ -7,7 +7,6 @@ use crate::{ server::{self, Channel}, util::Compact, - PollIo, }; use fnv::FnvHashMap; use futures::{future::AbortRegistration, prelude::*, ready, stream::Fuse, task::*}; @@ -120,10 +119,6 @@ where ) -> Result { self.inner_pin_mut().start_request(id, deadline) } - - fn poll_expired(mut self: Pin<&mut Self>, cx: &mut Context) -> PollIo { - self.inner_pin_mut().poll_expired(cx) - } } impl TrackedChannel { diff --git a/tarpc/src/server/testing.rs b/tarpc/src/server/testing.rs index 9b0aca4..97ba49f 100644 --- a/tarpc/src/server/testing.rs +++ b/tarpc/src/server/testing.rs @@ -7,7 +7,7 @@ use crate::{ context, server::{Channel, Config}, - PollIo, Request, Response, + Request, Response, }; use futures::{future::AbortRegistration, task::*, Sink, Stream}; use pin_project::pin_project; @@ -88,10 +88,6 @@ where .in_flight_requests .start_request(id, deadline) } - - fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo { - self.project().in_flight_requests.poll_expired(cx) - } } impl FakeChannel>, Response> { diff --git a/tarpc/src/server/throttle.rs b/tarpc/src/server/throttle.rs index f640c0e..05dec4c 100644 --- a/tarpc/src/server/throttle.rs +++ b/tarpc/src/server/throttle.rs @@ -5,7 +5,7 @@ // https://opensource.org/licenses/MIT. use super::{Channel, Config}; -use crate::{PollIo, Response, ServerError}; +use crate::{Response, ServerError}; use futures::{future::AbortRegistration, prelude::*, ready, task::*}; use log::debug; use pin_project::pin_project; @@ -128,10 +128,6 @@ where ) -> Result { self.project().inner.start_request(id, deadline) } - - fn poll_expired(self: Pin<&mut Self>, cx: &mut Context) -> PollIo { - self.project().inner.poll_expired(cx) - } } /// A stream of throttling channels. @@ -317,9 +313,6 @@ fn throttler_poll_next_throttled_sink_not_ready() { ) -> Result { unimplemented!() } - fn poll_expired(self: Pin<&mut Self>, _cx: &mut Context) -> PollIo { - unimplemented!() - } } }