Minor refactor

This commit is contained in:
Tim Kuehn
2020-07-30 12:02:14 -07:00
parent 258193c932
commit ff55080193

View File

@@ -14,7 +14,7 @@
/// subscriber.
///
/// - Publishers connect to the server on the "publisher" port and, once connected, they send
/// toopical messages via Publisher service to the server. The server then broadcasts each
/// topical messages via Publisher service to the server. The server then broadcasts each
/// messages to all clients subscribed to the topic of that message.
///
/// Subscriber Publisher PubSub Server
@@ -32,7 +32,6 @@
/// T9 |-----(OK) Receive------------------------------------------------->|
/// T10 | | |
/// T11 | |<--------------(OK) Publish------|
use anyhow::anyhow;
use futures::{
channel::oneshot,
@@ -171,7 +170,7 @@ impl Publisher {
Ok(publisher_addrs)
}
async fn start_subscription_manager(self) -> io::Result<SocketAddr> {
async fn start_subscription_manager(mut self) -> io::Result<SocketAddr> {
let mut connecting_subscribers = tcp::listen("localhost:0", Json::default)
.await?
.filter_map(|r| future::ready(r.ok()));
@@ -183,7 +182,7 @@ impl Publisher {
let subscriber_addr = conn.peer_addr().unwrap();
let tarpc::client::NewClient {
client: mut subscriber,
client: subscriber,
dispatch,
} = subscriber::SubscriberClient::new(client::Config::default(), conn);
let (ready_tx, ready) = oneshot::channel();
@@ -191,24 +190,9 @@ impl Publisher {
.start_subscriber_gc(subscriber_addr, dispatch, ready);
// Populate the topics
if let Ok(topics) = subscriber.topics(context::current()).await {
self.clients.lock().unwrap().insert(
subscriber_addr,
Subscription {
subscriber: subscriber.clone(),
topics: topics.clone(),
},
);
self.initialize_subscription(subscriber_addr, subscriber)
.await;
info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in topics {
subscriptions
.entry(topic)
.or_insert_with(HashMap::new)
.insert(subscriber_addr, subscriber.clone());
}
}
// Signal that initialization is done.
ready_tx.send(()).unwrap();
}
@@ -217,6 +201,32 @@ impl Publisher {
Ok(new_subscriber_addr)
}
async fn initialize_subscription(
&mut self,
subscriber_addr: SocketAddr,
mut subscriber: subscriber::SubscriberClient,
) {
// Populate the topics
if let Ok(topics) = subscriber.topics(context::current()).await {
self.clients.lock().unwrap().insert(
subscriber_addr,
Subscription {
subscriber: subscriber.clone(),
topics: topics.clone(),
},
);
info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in topics {
subscriptions
.entry(topic)
.or_insert_with(HashMap::new)
.insert(subscriber_addr, subscriber.clone());
}
}
}
fn start_subscriber_gc(
self,
subscriber_addr: SocketAddr,
@@ -225,12 +235,18 @@ impl Publisher {
) {
tokio::spawn(async move {
if let Err(e) = client_dispatch.await {
info!("[{}] subscriber connection broken: {:?}", subscriber_addr, e)
info!(
"[{}] subscriber connection broken: {:?}",
subscriber_addr, e
)
}
// Don't clean up the subscriber until initialization is done.
let _ = subscriber_ready.await;
if let Some(subscription) = self.clients.lock().unwrap().remove(&subscriber_addr) {
info!("[{} unsubscribing from topics: {:?}", subscriber_addr, subscription.topics);
info!(
"[{} unsubscribing from topics: {:?}",
subscriber_addr, subscription.topics
);
let mut subscriptions = self.subscriptions.write().unwrap();
for topic in subscription.topics {
let subscribers = subscriptions.get_mut(&topic).unwrap();
@@ -322,7 +338,6 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
info!("done.");
Ok(())