From 39737b720ac23ba91f295410567af5c6931e5147 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Thu, 17 Jan 2019 10:36:32 -0800 Subject: [PATCH] Cargo fmt --- rpc/src/client/channel.rs | 21 +++++++++++++---- rpc/src/server/filter.rs | 12 +++++++--- rpc/src/server/mod.rs | 36 ++++++++++++++++-------------- tarpc/examples/service_registry.rs | 9 +++++--- tarpc/src/lib.rs | 7 +----- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index 4312364..d987541 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -425,7 +425,9 @@ where loop { match ready!(self.as_mut().canceled_requests().poll_next_unpin(waker)) { Some(request_id) => { - if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&request_id) { + if let Some(in_flight_data) = + self.as_mut().in_flight_requests().remove(&request_id) + { self.as_mut().in_flight_requests().compact(0.1); debug!( @@ -438,7 +440,10 @@ where } } None => { - trace!("[{}] canceled_requests closed.", self.as_mut().server_addr()); + trace!( + "[{}] canceled_requests closed.", + self.as_mut().server_addr() + ); return Poll::Ready(None); } } @@ -480,13 +485,21 @@ where message: ClientMessageKind::Cancel { request_id }, }; self.as_mut().transport().start_send(cancel)?; - trace!("[{}/{}] Cancel message sent.", trace_id, self.as_mut().server_addr()); + trace!( + "[{}/{}] Cancel message sent.", + trace_id, + self.as_mut().server_addr() + ); Ok(()) } /// Sends a server response to the client task that initiated the associated request. fn complete(self: &mut Pin<&mut Self>, response: Response) -> bool { - if let Some(in_flight_data) = self.as_mut().in_flight_requests().remove(&response.request_id) { + if let Some(in_flight_data) = self + .as_mut() + .in_flight_requests() + .remove(&response.request_id) + { self.as_mut().in_flight_requests().compact(0.1); trace!( diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index dacec81..3107d05 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -231,12 +231,18 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo> { loop { - match (self.as_mut().poll_listener(cx)?, self.poll_closed_connections(cx)?) { + match ( + self.as_mut().poll_listener(cx)?, + self.poll_closed_connections(cx)?, + ) { (Poll::Ready(Some(NewConnection::Accepted(channel))), _) => { - return Poll::Ready(Some(Ok(channel))) + return Poll::Ready(Some(Ok(channel))); } (Poll::Ready(Some(NewConnection::Filtered)), _) | (_, Poll::Ready(())) => { - trace!("Filtered a connection; {} open.", self.as_mut().open_connections()); + trace!( + "Filtered a connection; {} open.", + self.as_mut().open_connections() + ); continue; } (Poll::Pending, Poll::Pending) => return Poll::Pending, diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 63bf209..52d3dae 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -230,10 +230,7 @@ where Req: Send, Resp: Send, { - pub(crate) fn start_send( - mut self: Pin<&mut Self>, - response: Response, - ) -> io::Result<()> { + pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response) -> io::Result<()> { self.as_mut().transport().start_send(response) } @@ -316,10 +313,7 @@ where { /// If at max in-flight requests, check that there's room to immediately write a throttled /// response. - fn poll_ready_if_throttling( - mut self: Pin<&mut Self>, - cx: &LocalWaker, - ) -> Poll> { + fn poll_ready_if_throttling(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll> { if self.in_flight_requests.len() >= self.channel.config.max_in_flight_requests_per_connection { @@ -359,11 +353,7 @@ where }) } - fn pump_write( - mut self: Pin<&mut Self>, - cx: &LocalWaker, - read_half_closed: bool, - ) -> PollIo<()> { + fn pump_write(mut self: Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool) -> PollIo<()> { match self.as_mut().poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { self.as_mut().channel().start_send(response)?; @@ -441,14 +431,21 @@ where let request = request.message; if self.as_mut().in_flight_requests().len() - >= self.as_mut().channel().config.max_in_flight_requests_per_connection + >= self + .as_mut() + .channel() + .config + .max_in_flight_requests_per_connection { debug!( "[{}/{}] Client has reached in-flight request limit ({}/{}).", ctx.trace_id(), peer, self.as_mut().in_flight_requests().len(), - self.as_mut().channel().config.max_in_flight_requests_per_connection + self.as_mut() + .channel() + .config + .max_in_flight_requests_per_connection ); self.as_mut().channel().start_send(Response { @@ -497,7 +494,9 @@ where ), ) })?; - self.as_mut().in_flight_requests().insert(request_id, abort_handle); + self.as_mut() + .in_flight_requests() + .insert(request_id, abort_handle); Ok(()) } @@ -540,7 +539,10 @@ where trace!("[{}] ClientHandler::poll", self.channel.client_addr); loop { let read = self.as_mut().pump_read(cx)?; - match (read, self.as_mut().pump_write(cx, read == Poll::Ready(None))?) { + match ( + read, + self.as_mut().pump_write(cx, read == Poll::Ready(None))?, + ) { (Poll::Ready(None), Poll::Ready(None)) => { info!("[{}] Client disconnected.", self.channel.client_addr); return Poll::Ready(Ok(())); diff --git a/tarpc/examples/service_registry.rs b/tarpc/examples/service_registry.rs index 2396c7f..7cee6fb 100644 --- a/tarpc/examples/service_registry.rs +++ b/tarpc/examples/service_registry.rs @@ -53,8 +53,10 @@ mod registry { /// Returns a function that serves requests for the registered services. pub fn serve( self, - ) -> impl FnOnce(context::Context, ServiceRequest) - -> Either>> + ) -> impl FnOnce( + context::Context, + ServiceRequest, + ) -> Either>> + Clone { let registrations = Arc::new(self.registrations); move |cx, req: ServiceRequest| match registrations.serve(cx, &req) { @@ -327,7 +329,8 @@ impl BincodeRegistry { fn serve( self, ) -> impl FnOnce( - context::Context, registry::ServiceRequest + context::Context, + registry::ServiceRequest, ) -> registry::Either< Services::Future, Ready>, diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index fde50a0..d9b3f54 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -9,12 +9,7 @@ #![feature(async_await, external_doc)] #![cfg_attr( test, - feature( - futures_api, - await_macro, - proc_macro_hygiene, - arbitrary_self_types - ) + feature(futures_api, await_macro, proc_macro_hygiene, arbitrary_self_types) )] #[doc(hidden)]