From 6a01e32a2d53044c614d7b01f9999f222b61ed5a Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Wed, 29 Jul 2020 12:09:44 -0700 Subject: [PATCH] Shut down client dispatch immediately when read half of transport is closed. Clients can't receive any responses when the read half is closed, which means they can't verify if their requests were served. Therefore, there is no point in writing further requests after the read half is closed. --- tarpc/examples/pubsub.rs | 19 +++++++++++++++---- tarpc/src/rpc/client/channel.rs | 4 ++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 00b208b..d8a85a1 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -3,7 +3,6 @@ // Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. - use futures::{ future::{self, AbortHandle}, prelude::*, @@ -47,7 +46,7 @@ struct Subscriber { #[tarpc::server] impl subscriber::Subscriber for Subscriber { async fn receive(self, _: context::Context, message: String) { - info!("{} received message: {}", self.local_addr, message); + info!("[{}] received message: {}", self.local_addr, message); } } @@ -68,7 +67,11 @@ impl Subscriber { .respond_with(Subscriber { local_addr }.serve()) .execute(), ); - tokio::spawn(handler); + tokio::spawn(async move { + match handler.await { + Ok(()) | Err(future::Aborted) => info!("[{}] subscriber shutdown.", local_addr), + } + }); Ok(SubscriberHandle(abort_handle)) } } @@ -92,6 +95,7 @@ impl Publisher { subscriptions: Self::start_subscription_manager(self.clients.clone()).await?, }; + info!("[{}] listening for publishers.", publisher_addrs.publisher); tokio::spawn(async move { // Because this is just an example, we know there will only be one publisher. In more // realistic code, this would be a loop to continually accept new publisher @@ -115,6 +119,7 @@ impl Publisher { .await? .filter_map(|r| future::ready(r.ok())); let new_subscriber_addr = connecting_subscribers.get_ref().local_addr(); + info!("[{}] listening for subscribers.", new_subscriber_addr); tokio::spawn(async move { while let Some(conn) = connecting_subscribers.next().await { @@ -132,7 +137,7 @@ impl Publisher { match dispatch.await { Ok(()) => info!("[{:?}] subscriber connection closed", subscriber_addr), Err(e) => info!( - "[{:?}] subscriber connection broken: {}", + "[{:?}] subscriber connection broken: {:?}", subscriber_addr, e ), } @@ -183,6 +188,12 @@ async fn main() -> io::Result<()> { .broadcast(context::current(), "hello to all".to_string()) .await?; + drop(_subscriber0); + + publisher + .broadcast(context::current(), "hello to who?".to_string()) + .await?; + info!("done."); Ok(()) diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index ec81195..3b5d191 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -452,6 +452,10 @@ where .pump_write(cx) .context("failed to write to transport")?, ) { + (Poll::Ready(None), _) => { + info!("Shutdown: read half closed, so shutting down."); + return Poll::Ready(Ok(())); + } (read, Poll::Ready(None)) => { if self.as_mut().project().in_flight_requests.is_empty() { info!("Shutdown: write half closed, and no requests in flight.");