diff --git a/tarpc/src/server.rs b/tarpc/src/server.rs index acb0cf1..1181eed 100644 --- a/tarpc/src/server.rs +++ b/tarpc/src/server.rs @@ -333,6 +333,7 @@ where type Item = Result, ChannelError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + #[derive(Debug)] enum ReceiverStatus { Ready, Pending, @@ -388,6 +389,11 @@ where Poll::Pending => Pending, }; + tracing::trace!( + "Expired requests: {:?}, Inbound: {:?}", + expiration_status, + request_status + ); match (expiration_status, request_status) { (Ready, _) | (_, Ready) => continue, (Closed, Closed) => return Poll::Ready(None), diff --git a/tarpc/src/server/in_flight_requests.rs b/tarpc/src/server/in_flight_requests.rs index 581eac3..912243c 100644 --- a/tarpc/src/server/in_flight_requests.rs +++ b/tarpc/src/server/in_flight_requests.rs @@ -98,6 +98,11 @@ impl InFlightRequests { &mut self, cx: &mut Context, ) -> Poll>> { + if self.deadlines.is_empty() { + // TODO(https://github.com/tokio-rs/tokio/issues/4161) + // This is a workaround for DelayQueue not always treating this case correctly. + return Poll::Ready(None); + } self.deadlines.poll_expired(cx).map_ok(|expired| { if let Some(RequestData { abort_handle, span, .. @@ -184,6 +189,8 @@ mod tests { #[tokio::test] async fn remove_request_doesnt_abort() { let mut in_flight_requests = InFlightRequests::default(); + assert!(in_flight_requests.deadlines.is_empty()); + let abort_registration = in_flight_requests .start_request( 0, @@ -198,9 +205,11 @@ mod tests { in_flight_requests.poll_expired(&mut noop_context()), Poll::Pending ); + assert!(!in_flight_requests.deadlines.is_empty()); assert_matches!(in_flight_requests.remove_request(0), Some(_)); // Postcondition: No pending expirations + assert!(in_flight_requests.deadlines.is_empty()); assert_matches!( in_flight_requests.poll_expired(&mut noop_context()), Poll::Ready(None)