Shut down client dispatch immediately when read half of transport is closed.

Clients can't receive any responses when the read half is closed, which means they can't verify if their requests were served. Therefore, there is no point in writing further requests after the read half is closed.
This commit is contained in:
Tim Kuehn
2020-07-29 12:09:44 -07:00
parent e6597fab03
commit 6a01e32a2d
2 changed files with 19 additions and 4 deletions

View File

@@ -3,7 +3,6 @@
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.
use futures::{
future::{self, AbortHandle},
prelude::*,
@@ -47,7 +46,7 @@ struct Subscriber {
#[tarpc::server]
impl subscriber::Subscriber for Subscriber {
async fn receive(self, _: context::Context, message: String) {
info!("{} received message: {}", self.local_addr, message);
info!("[{}] received message: {}", self.local_addr, message);
}
}
@@ -68,7 +67,11 @@ impl Subscriber {
.respond_with(Subscriber { local_addr }.serve())
.execute(),
);
tokio::spawn(handler);
tokio::spawn(async move {
match handler.await {
Ok(()) | Err(future::Aborted) => info!("[{}] subscriber shutdown.", local_addr),
}
});
Ok(SubscriberHandle(abort_handle))
}
}
@@ -92,6 +95,7 @@ impl Publisher {
subscriptions: Self::start_subscription_manager(self.clients.clone()).await?,
};
info!("[{}] listening for publishers.", publisher_addrs.publisher);
tokio::spawn(async move {
// Because this is just an example, we know there will only be one publisher. In more
// realistic code, this would be a loop to continually accept new publisher
@@ -115,6 +119,7 @@ impl Publisher {
.await?
.filter_map(|r| future::ready(r.ok()));
let new_subscriber_addr = connecting_subscribers.get_ref().local_addr();
info!("[{}] listening for subscribers.", new_subscriber_addr);
tokio::spawn(async move {
while let Some(conn) = connecting_subscribers.next().await {
@@ -132,7 +137,7 @@ impl Publisher {
match dispatch.await {
Ok(()) => info!("[{:?}] subscriber connection closed", subscriber_addr),
Err(e) => info!(
"[{:?}] subscriber connection broken: {}",
"[{:?}] subscriber connection broken: {:?}",
subscriber_addr, e
),
}
@@ -183,6 +188,12 @@ async fn main() -> io::Result<()> {
.broadcast(context::current(), "hello to all".to_string())
.await?;
drop(_subscriber0);
publisher
.broadcast(context::current(), "hello to who?".to_string())
.await?;
info!("done.");
Ok(())

View File

@@ -452,6 +452,10 @@ where
.pump_write(cx)
.context("failed to write to transport")?,
) {
(Poll::Ready(None), _) => {
info!("Shutdown: read half closed, so shutting down.");
return Poll::Ready(Ok(()));
}
(read, Poll::Ready(None)) => {
if self.as_mut().project().in_flight_requests.is_empty() {
info!("Shutdown: write half closed, and no requests in flight.");