diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index 334b99a..d4001aa 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -14,7 +14,7 @@ /// subscriber. /// /// - Publishers connect to the server on the "publisher" port and, once connected, they send -/// toopical messages via Publisher service to the server. The server then broadcasts each +/// topical messages via Publisher service to the server. The server then broadcasts each /// messages to all clients subscribed to the topic of that message. /// /// Subscriber Publisher PubSub Server @@ -32,7 +32,6 @@ /// T9 |-----(OK) Receive------------------------------------------------->| /// T10 | | | /// T11 | |<--------------(OK) Publish------| - use anyhow::anyhow; use futures::{ channel::oneshot, @@ -171,7 +170,7 @@ impl Publisher { Ok(publisher_addrs) } - async fn start_subscription_manager(self) -> io::Result { + async fn start_subscription_manager(mut self) -> io::Result { let mut connecting_subscribers = tcp::listen("localhost:0", Json::default) .await? .filter_map(|r| future::ready(r.ok())); @@ -183,7 +182,7 @@ impl Publisher { let subscriber_addr = conn.peer_addr().unwrap(); let tarpc::client::NewClient { - client: mut subscriber, + client: subscriber, dispatch, } = subscriber::SubscriberClient::new(client::Config::default(), conn); let (ready_tx, ready) = oneshot::channel(); @@ -191,24 +190,9 @@ impl Publisher { .start_subscriber_gc(subscriber_addr, dispatch, ready); // Populate the topics - if let Ok(topics) = subscriber.topics(context::current()).await { - self.clients.lock().unwrap().insert( - subscriber_addr, - Subscription { - subscriber: subscriber.clone(), - topics: topics.clone(), - }, - ); + self.initialize_subscription(subscriber_addr, subscriber) + .await; - info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics); - let mut subscriptions = self.subscriptions.write().unwrap(); - for topic in topics { - subscriptions - .entry(topic) - .or_insert_with(HashMap::new) - .insert(subscriber_addr, subscriber.clone()); - } - } // Signal that initialization is done. ready_tx.send(()).unwrap(); } @@ -217,6 +201,32 @@ impl Publisher { Ok(new_subscriber_addr) } + async fn initialize_subscription( + &mut self, + subscriber_addr: SocketAddr, + mut subscriber: subscriber::SubscriberClient, + ) { + // Populate the topics + if let Ok(topics) = subscriber.topics(context::current()).await { + self.clients.lock().unwrap().insert( + subscriber_addr, + Subscription { + subscriber: subscriber.clone(), + topics: topics.clone(), + }, + ); + + info!("[{}] subscribed to topics: {:?}", subscriber_addr, topics); + let mut subscriptions = self.subscriptions.write().unwrap(); + for topic in topics { + subscriptions + .entry(topic) + .or_insert_with(HashMap::new) + .insert(subscriber_addr, subscriber.clone()); + } + } + } + fn start_subscriber_gc( self, subscriber_addr: SocketAddr, @@ -225,12 +235,18 @@ impl Publisher { ) { tokio::spawn(async move { if let Err(e) = client_dispatch.await { - info!("[{}] subscriber connection broken: {:?}", subscriber_addr, e) + info!( + "[{}] subscriber connection broken: {:?}", + subscriber_addr, e + ) } // Don't clean up the subscriber until initialization is done. let _ = subscriber_ready.await; if let Some(subscription) = self.clients.lock().unwrap().remove(&subscriber_addr) { - info!("[{} unsubscribing from topics: {:?}", subscriber_addr, subscription.topics); + info!( + "[{} unsubscribing from topics: {:?}", + subscriber_addr, subscription.topics + ); let mut subscriptions = self.subscriptions.write().unwrap(); for topic in subscription.topics { let subscribers = subscriptions.get_mut(&topic).unwrap(); @@ -322,7 +338,6 @@ async fn main() -> anyhow::Result<()> { ) .await?; - info!("done."); Ok(())