diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 00b208b..d8a85a1 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -3,7 +3,6 @@ // Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. - use futures::{ future::{self, AbortHandle}, prelude::*, @@ -47,7 +46,7 @@ struct Subscriber { #[tarpc::server] impl subscriber::Subscriber for Subscriber { async fn receive(self, _: context::Context, message: String) { - info!("{} received message: {}", self.local_addr, message); + info!("[{}] received message: {}", self.local_addr, message); } } @@ -68,7 +67,11 @@ impl Subscriber { .respond_with(Subscriber { local_addr }.serve()) .execute(), ); - tokio::spawn(handler); + tokio::spawn(async move { + match handler.await { + Ok(()) | Err(future::Aborted) => info!("[{}] subscriber shutdown.", local_addr), + } + }); Ok(SubscriberHandle(abort_handle)) } } @@ -92,6 +95,7 @@ impl Publisher { subscriptions: Self::start_subscription_manager(self.clients.clone()).await?, }; + info!("[{}] listening for publishers.", publisher_addrs.publisher); tokio::spawn(async move { // Because this is just an example, we know there will only be one publisher. In more // realistic code, this would be a loop to continually accept new publisher @@ -115,6 +119,7 @@ impl Publisher { .await? .filter_map(|r| future::ready(r.ok())); let new_subscriber_addr = connecting_subscribers.get_ref().local_addr(); + info!("[{}] listening for subscribers.", new_subscriber_addr); tokio::spawn(async move { while let Some(conn) = connecting_subscribers.next().await { @@ -132,7 +137,7 @@ impl Publisher { match dispatch.await { Ok(()) => info!("[{:?}] subscriber connection closed", subscriber_addr), Err(e) => info!( - "[{:?}] subscriber connection broken: {}", + "[{:?}] subscriber connection broken: {:?}", subscriber_addr, e ), } @@ -183,6 +188,12 @@ async fn main() -> io::Result<()> { .broadcast(context::current(), "hello to all".to_string()) .await?; + drop(_subscriber0); + + publisher + .broadcast(context::current(), "hello to who?".to_string()) + .await?; + info!("done."); Ok(()) diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index ec81195..3b5d191 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -452,6 +452,10 @@ where .pump_write(cx) .context("failed to write to transport")?, ) { + (Poll::Ready(None), _) => { + info!("Shutdown: read half closed, so shutting down."); + return Poll::Ready(Ok(())); + } (read, Poll::Ready(None)) => { if self.as_mut().project().in_flight_requests.is_empty() { info!("Shutdown: write half closed, and no requests in flight.");