From 026083d653050ed56a2d8d2aa161cf7cfb895020 Mon Sep 17 00:00:00 2001 From: Johann Hemmann Date: Sun, 18 Oct 2020 02:33:08 +0200 Subject: [PATCH] Bump `tokio` from 0.2 to 0.3 (#319) # Bump `tokio` from 0.2 to 0.3 * `Cargo.toml`: * bump `tokio` from 0.2 to 0.3 * bump `tokio-util` from 0.3 to 0.4 * remove feature `time` from `tokio` * fix alphabetical order of dependencies * `tarpc::rpc`: * `client, server`: `tokio::time::Elapsed` -> `tokio::time::error::Elapsed` * `client, transport`, `::tests`: Fix `#[tokio::test]` macro usage * `tarpc::serde_transport`: * `TcpListener.incoming().poll_next(...)` -> `TcpListener.poll_accept(...)` -> https://github.com/tokio-rs/tokio/discussions/2983 * Adapt `AsyncRead`, `AsynWrite` implements in tests * `README.md`, `tarpc::lib`: Adapt tokio version in docs # Satisfy clippy * replace `match`-statements with `matches!(...)`-macro --- README.md | 2 +- example-service/Cargo.toml | 4 ++-- tarpc/Cargo.toml | 6 +++--- tarpc/src/lib.rs | 2 +- tarpc/src/rpc/client/channel.rs | 14 +++++++------- tarpc/src/rpc/server.rs | 8 ++------ tarpc/src/rpc/server/testing.rs | 5 +---- tarpc/src/rpc/transport/channel.rs | 2 +- tarpc/src/serde_transport.rs | 19 +++++++++++-------- tarpc/tests/service_functional.rs | 10 +++++----- 10 files changed, 34 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 58371b3..9818942 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ your `Cargo.toml`: ```toml futures = "0.3" -tokio = "0.2" +tokio = "0.3" ``` In the following example, we use an in-process channel for communication between diff --git a/example-service/Cargo.toml b/example-service/Cargo.toml index 2163324..02a5677 100644 --- a/example-service/Cargo.toml +++ b/example-service/Cargo.toml @@ -17,9 +17,9 @@ clap = "2.0" futures = "0.3" serde = { version = "1.0" } tarpc = { version = "0.22", path = "../tarpc", features = ["full"] } -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } tokio-serde = { version = "0.6", features = ["json"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio-util = { version = "0.4", features = ["codec"] } env_logger = "0.6" [lib] diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index 8c51827..4c43f4e 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -33,11 +33,11 @@ humantime = "1.0" log = "0.4" pin-project = "0.4.17" rand = "0.7" -tokio = { version = "0.2", features = ["time"] } serde = { optional = true, version = "1.0", features = ["derive"] } static_assertions = "1.1.0" tarpc-plugins = { path = "../plugins", version = "0.8" } -tokio-util = { optional = true, version = "0.3" } +tokio = { version = "0.3" } +tokio-util = { optional = true, version = "0.4" } tokio-serde = { optional = true, version = "0.6" } [dev-dependencies] @@ -51,7 +51,7 @@ humantime = "1.0" log = "0.4" pin-utils = "0.1.0-alpha" serde_bytes = "0.11" -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } tokio-serde = { version = "0.6", features = ["json", "bincode"] } trybuild = "1.0" diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index a12a671..2686bb5 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -61,7 +61,7 @@ //! //! ```toml //! futures = "0.3" -//! tokio = "0.2" +//! tokio = "0.3" //! ``` //! //! In the following example, we use an in-process channel for communication between diff --git a/tarpc/src/rpc/client/channel.rs b/tarpc/src/rpc/client/channel.rs index 3b5d191..74e5594 100644 --- a/tarpc/src/rpc/client/channel.rs +++ b/tarpc/src/rpc/client/channel.rs @@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> { let resp = ready!(self.as_mut().project().fut.poll(cx)); Poll::Ready(match resp { Ok(resp) => resp, - Err(tokio::time::Elapsed { .. }) => Err(io::Error::new( + Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new( io::ErrorKind::TimedOut, "Client dropped expired request.".to_string(), )), @@ -723,7 +723,7 @@ mod tests { }; use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn dispatch_response_cancels_on_drop() { let (cancellation, mut canceled_requests) = cancellations(); let (_, response) = oneshot::channel(); @@ -738,7 +738,7 @@ mod tests { assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3)); } - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn stage_request() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); @@ -755,7 +755,7 @@ mod tests { } // Regression test for https://github.com/google/tarpc/issues/220 - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn stage_request_channel_dropped_doesnt_panic() { let (mut dispatch, mut channel, mut server_channel) = set_up(); let mut dispatch = Pin::new(&mut dispatch); @@ -776,7 +776,7 @@ mod tests { dispatch.await.unwrap(); } - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn stage_request_response_future_dropped_is_canceled_before_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); @@ -791,7 +791,7 @@ mod tests { assert!(dispatch.poll_next_request(cx).ready().is_none()); } - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn stage_request_response_future_dropped_is_canceled_after_sending() { let (mut dispatch, mut channel, _server_channel) = set_up(); let cx = &mut Context::from_waker(&noop_waker_ref()); @@ -813,7 +813,7 @@ mod tests { assert!(dispatch.project().in_flight_requests.is_empty()); } - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn stage_request_response_closed_skipped() { let (mut dispatch, mut channel, _server_channel) = set_up(); let dispatch = Pin::new(&mut dispatch); diff --git a/tarpc/src/rpc/server.rs b/tarpc/src/rpc/server.rs index d3fd59f..962d0ca 100644 --- a/tarpc/src/rpc/server.rs +++ b/tarpc/src/rpc/server.rs @@ -565,7 +565,7 @@ where request_id: self.request_id, message: match result { Ok(message) => Ok(message), - Err(tokio::time::Elapsed { .. }) => { + Err(tokio::time::error::Elapsed { .. }) => { debug!( "[{}] Response did not complete before deadline of {}s.", self.ctx.trace_id(), @@ -624,11 +624,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { let read = self.as_mut().pump_read(cx)?; - let read_closed = if let Poll::Ready(None) = read { - true - } else { - false - }; + let read_closed = matches!(read, Poll::Ready(None)); match (read, self.as_mut().pump_write(cx, read_closed)?) { (Poll::Ready(None), Poll::Ready(None)) => { return Poll::Ready(None); diff --git a/tarpc/src/rpc/server/testing.rs b/tarpc/src/rpc/server/testing.rs index bbcfcd8..8de8ef0 100644 --- a/tarpc/src/rpc/server/testing.rs +++ b/tarpc/src/rpc/server/testing.rs @@ -117,10 +117,7 @@ pub trait PollExt { impl PollExt for Poll> { fn is_done(&self) -> bool { - match self { - Poll::Ready(None) => true, - _ => false, - } + matches!(self, Poll::Ready(None)) } } diff --git a/tarpc/src/rpc/transport/channel.rs b/tarpc/src/rpc/transport/channel.rs index fb9b591..41a8ccd 100644 --- a/tarpc/src/rpc/transport/channel.rs +++ b/tarpc/src/rpc/transport/channel.rs @@ -90,7 +90,7 @@ mod tests { use std::io; #[cfg(feature = "tokio1")] - #[tokio::test(threaded_scheduler)] + #[tokio::test] async fn integration() -> io::Result<()> { let _ = env_logger::try_init(); diff --git a/tarpc/src/serde_transport.rs b/tarpc/src/serde_transport.rs index 940d62f..170a4b5 100644 --- a/tarpc/src/serde_transport.rs +++ b/tarpc/src/serde_transport.rs @@ -269,9 +269,12 @@ pub mod tcp { type Item = io::Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let next = - ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?); - Poll::Ready(next.map(|conn| Ok(new(self.config.new_framed(conn), (self.codec_fn)())))) + let conn: TcpStream = + ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0; + Poll::Ready(Some(Ok(new( + self.config.new_framed(conn), + (self.codec_fn)(), + )))) } } } @@ -286,7 +289,7 @@ mod tests { io::{self, Cursor}, pin::Pin, }; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_serde::formats::SymmetricalJson; fn ctx() -> Context<'static> { @@ -301,8 +304,8 @@ mod tests { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf) } } @@ -345,8 +348,8 @@ mod tests { fn poll_read( self: Pin<&mut Self>, _cx: &mut Context<'_>, - _buf: &mut [u8], - ) -> Poll> { + _buf: &mut ReadBuf<'_>, + ) -> Poll> { unreachable!() } } diff --git a/tarpc/tests/service_functional.rs b/tarpc/tests/service_functional.rs index 19721e2..6853b52 100644 --- a/tarpc/tests/service_functional.rs +++ b/tarpc/tests/service_functional.rs @@ -36,7 +36,7 @@ impl Service for Server { } } -#[tokio::test(threaded_scheduler)] +#[tokio::test] async fn sequential() -> io::Result<()> { let _ = env_logger::try_init(); @@ -59,7 +59,7 @@ async fn sequential() -> io::Result<()> { } #[cfg(feature = "serde1")] -#[tokio::test(threaded_scheduler)] +#[tokio::test] async fn serde() -> io::Result<()> { let _ = env_logger::try_init(); @@ -83,7 +83,7 @@ async fn serde() -> io::Result<()> { Ok(()) } -#[tokio::test(threaded_scheduler)] +#[tokio::test] async fn concurrent() -> io::Result<()> { let _ = env_logger::try_init(); @@ -112,7 +112,7 @@ async fn concurrent() -> io::Result<()> { Ok(()) } -#[tokio::test(threaded_scheduler)] +#[tokio::test] async fn concurrent_join() -> io::Result<()> { let _ = env_logger::try_init(); @@ -142,7 +142,7 @@ async fn concurrent_join() -> io::Result<()> { Ok(()) } -#[tokio::test(threaded_scheduler)] +#[tokio::test] async fn concurrent_join_all() -> io::Result<()> { let _ = env_logger::try_init();