From 4d2d3f24c616d290ea3bef0aa17095d2b86a870e Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Tue, 6 Nov 2018 17:00:15 -0800 Subject: [PATCH] Address Clippy lints --- rpc/src/client/channel.rs | 23 ++++++++++++----------- rpc/src/lib.rs | 4 +++- rpc/src/server/filter.rs | 5 +++-- rpc/src/server/mod.rs | 18 +++++++++--------- rpc/src/transport/channel.rs | 4 ++-- rpc/src/util/serde.rs | 1 + trace/src/lib.rs | 2 +- 7 files changed, 31 insertions(+), 26 deletions(-) diff --git a/rpc/src/client/channel.rs b/rpc/src/client/channel.rs index f31fd3f..1a14190 100644 --- a/rpc/src/client/channel.rs +++ b/rpc/src/client/channel.rs @@ -6,6 +6,7 @@ use crate::{ context, + PollIo, util::{deadline_compat, AsDuration, Compact}, ClientMessage, ClientMessageKind, Request, Response, Transport, }; @@ -62,12 +63,12 @@ impl Clone for Channel { #[derive(Debug)] #[must_use = "futures do nothing unless polled"] struct Send<'a, Req, Resp> { - fut: MapOkDispatchResponse< - MapErrConnectionReset>>>, - Resp, - >, + fut: MapOkDispatchResponse, Resp>, } +type SendMapErrConnectionReset<'a, Req, Resp> = + MapErrConnectionReset>>>; + impl<'a, Req, Resp> Send<'a, Req, Resp> { unsafe_pinned!( fut: MapOkDispatchResponse< @@ -109,7 +110,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { impl Channel { /// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that /// resolves when the request is sent (not when the response is received). - fn send<'a>(&'a mut self, mut ctx: context::Context, request: Req) -> Send<'a, Req, Resp> { + fn send(&mut self, mut ctx: context::Context, request: Req) -> Send { // Convert the context to the call context. ctx.trace_context.parent_id = Some(ctx.trace_context.span_id); ctx.trace_context.span_id = SpanId::random(&mut rand::thread_rng()); @@ -150,7 +151,7 @@ impl Channel { /// Sends a request to the dispatch task to forward to the server, returning a [`Future`] that /// resolves to the response. - pub fn call<'a>(&'a mut self, context: context::Context, request: Req) -> Call<'a, Req, Resp> { + pub fn call(&mut self, context: context::Context, request: Req) -> Call { Call { fut: AndThenIdent::new(self.send(context, request)), } @@ -317,7 +318,7 @@ where unsafe_pinned!(pending_requests: Fuse>>); unsafe_pinned!(transport: Fuse); - fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { + fn pump_read(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> { Poll::Ready(match ready!(self.transport().poll_next(waker)?) { Some(response) => { self.complete(response); @@ -330,7 +331,7 @@ where }) } - fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> Poll>> { + fn pump_write(self: &mut Pin<&mut Self>, waker: &LocalWaker) -> PollIo<()> { enum ReceiverStatus { NotReady, Closed, @@ -374,7 +375,7 @@ where fn poll_next_request( self: &mut Pin<&mut Self>, waker: &LocalWaker, - ) -> Poll>>> { + ) -> PollIo> { if self.in_flight_requests().len() >= self.config.max_in_flight_requests { info!( "At in-flight request capacity ({}/{}).", @@ -417,7 +418,7 @@ where fn poll_next_cancellation( self: &mut Pin<&mut Self>, waker: &LocalWaker, - ) -> Poll>> { + ) -> PollIo<(context::Context, u64)> { while let Poll::Pending = self.transport().poll_ready(waker)? { ready!(self.transport().poll_flush(waker)?); } @@ -481,7 +482,7 @@ where }; self.transport().start_send(cancel)?; trace!("[{}/{}] Cancel message sent.", trace_id, self.server_addr()); - return Ok(()); + Ok(()) } /// Sends a server response to the client task that initiated the associated request. diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 02e8aa8..fd451dc 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -43,7 +43,7 @@ pub(crate) mod util; pub use crate::{client::Client, server::Server, transport::Transport}; use futures::{ - task::{Spawn, SpawnError, SpawnExt}, + task::{Poll, Spawn, SpawnError, SpawnExt}, Future, }; use std::{cell::RefCell, io, sync::Once, time::SystemTime}; @@ -162,6 +162,8 @@ impl Request { } } +pub(crate) type PollIo = Poll>>; + static INIT: Once = Once::new(); static mut SEED_SPAWN: Option> = None; thread_local! { diff --git a/rpc/src/server/filter.rs b/rpc/src/server/filter.rs index ae75a55..dd8e963 100644 --- a/rpc/src/server/filter.rs +++ b/rpc/src/server/filter.rs @@ -5,6 +5,7 @@ // https://opensource.org/licenses/MIT. use crate::{ + PollIo, server::{Channel, Config}, util::Compact, ClientMessage, Response, Transport, @@ -200,7 +201,7 @@ impl ConnectionFilter { fn poll_listener( self: &mut Pin<&mut Self>, cx: &LocalWaker, - ) -> Poll>>> + ) -> PollIo> where S: Stream>, C: Transport, SinkItem = Response> + Send, @@ -232,7 +233,7 @@ where fn poll_next( mut self: Pin<&mut Self>, cx: &LocalWaker, - ) -> Poll>>> { + ) -> PollIo> { loop { match (self.poll_listener(cx)?, self.poll_closed_connections(cx)?) { (Poll::Ready(Some(NewConnection::Accepted(channel))), _) => { diff --git a/rpc/src/server/mod.rs b/rpc/src/server/mod.rs index 897c08d..5d5a9eb 100644 --- a/rpc/src/server/mod.rs +++ b/rpc/src/server/mod.rs @@ -8,7 +8,7 @@ use crate::{ context::Context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage, - ClientMessageKind, Request, Response, ServerError, Transport, + ClientMessageKind, PollIo, Request, Response, ServerError, Transport, }; use fnv::FnvHashMap; use futures::{ @@ -150,7 +150,7 @@ where } } info!("Server shutting down."); - return Poll::Ready(()); + Poll::Ready(()) } } @@ -247,7 +247,7 @@ where pub(crate) fn poll_next( self: &mut Pin<&mut Self>, cx: &LocalWaker, - ) -> Poll>>> { + ) -> PollIo> { self.transport().poll_next(cx) } @@ -336,7 +336,7 @@ where Poll::Ready(Ok(())) } - fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> Poll>> { + fn pump_read(self: &mut Pin<&mut Self>, cx: &LocalWaker) -> PollIo<()> { ready!(self.poll_ready_if_throttling(cx)?); Poll::Ready(match ready!(self.channel().poll_next(cx)?) { @@ -362,7 +362,7 @@ where self: &mut Pin<&mut Self>, cx: &LocalWaker, read_half_closed: bool, - ) -> Poll>> { + ) -> PollIo<()> { match self.poll_next_response(cx)? { Poll::Ready(Some((_, response))) => { self.channel().start_send(response)?; @@ -392,7 +392,7 @@ where fn poll_next_response( self: &mut Pin<&mut Self>, cx: &LocalWaker, - ) -> Poll)>>> { + ) -> PollIo<(Context, Response)> { // Ensure there's room to write a response. while let Poll::Pending = self.channel().poll_ready(cx)? { ready!(self.channel().poll_flush(cx)?); @@ -402,7 +402,7 @@ where match ready!(self.pending_responses().poll_next(cx)) { Some((ctx, response)) => { - if let Some(_) = self.in_flight_requests().remove(&response.request_id) { + if self.in_flight_requests().remove(&response.request_id).is_some() { self.in_flight_requests().compact(0.1); } trace!( @@ -411,7 +411,7 @@ where peer, self.in_flight_requests().len(), ); - return Poll::Ready(Some(Ok((ctx, response)))); + Poll::Ready(Some(Ok((ctx, response)))) } None => { // This branch likely won't happen, since the ClientHandler is holding a Sender. @@ -467,7 +467,7 @@ where let mut response_tx = self.responses_tx().clone(); let trace_id = *ctx.trace_id(); - let response = self.f().clone()(ctx.clone(), request); + let response = self.f().clone()(ctx, request); let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then( async move |result| { let response = Response { diff --git a/rpc/src/transport/channel.rs b/rpc/src/transport/channel.rs index 9b2ec1a..97cf7f0 100644 --- a/rpc/src/transport/channel.rs +++ b/rpc/src/transport/channel.rs @@ -6,7 +6,7 @@ //! Transports backed by in-memory channels. -use crate::Transport; +use crate::{PollIo, Transport}; use futures::{channel::mpsc, task::LocalWaker, Poll, Sink, Stream}; use pin_utils::unsafe_pinned; use std::pin::Pin; @@ -45,7 +45,7 @@ impl UnboundedChannel { impl Stream for UnboundedChannel { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> Poll>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &LocalWaker) -> PollIo { self.rx().poll_next(cx).map(|option| option.map(Ok)) } } diff --git a/rpc/src/util/serde.rs b/rpc/src/util/serde.rs index f0851fa..85af4d4 100644 --- a/rpc/src/util/serde.rs +++ b/rpc/src/util/serde.rs @@ -31,6 +31,7 @@ where } /// Serializes [`io::ErrorKind`] as a `u32`. +#[allow(clippy::trivially_copy_pass_by_ref)] // Exact fn signature required by serde derive pub fn serialize_io_error_kind_as_u32( kind: &io::ErrorKind, serializer: S, diff --git a/trace/src/lib.rs b/trace/src/lib.rs index abe7400..5c67a0b 100644 --- a/trace/src/lib.rs +++ b/trace/src/lib.rs @@ -80,7 +80,7 @@ impl TraceId { /// Returns a random trace ID that can be assumed to be globally unique if `rng` generates /// actually-random numbers. pub fn random(rng: &mut R) -> Self { - TraceId((rng.next_u64() as u128) << mem::size_of::() | rng.next_u64() as u128) + TraceId(u128::from(rng.next_u64()) << mem::size_of::() | u128::from(rng.next_u64())) } }