3 Commits

Author SHA1 Message Date
Tim Kuehn
b9868250f8 Prepare release of v0.23.0 2020-10-19 11:12:43 -07:00
Urhengulas
a3f1064efe Cargo.toml: Clean + update dependencies 2020-10-18 16:03:04 -07:00
Johann Hemmann
026083d653 Bump tokio from 0.2 to 0.3 (#319)
# Bump `tokio` from 0.2 to 0.3

* `Cargo.toml`:
    * bump `tokio` from 0.2 to 0.3
    * bump `tokio-util` from 0.3 to 0.4
    * remove feature `time` from `tokio`
    * fix alphabetical order of dependencies
* `tarpc::rpc`:
    * `client, server`: `tokio::time::Elapsed` -> `tokio::time::error::Elapsed`
    * `client, transport`, `::tests`: Fix `#[tokio::test]` macro usage
* `tarpc::serde_transport`:
    * `TcpListener.incoming().poll_next(...)` -> `TcpListener.poll_accept(...)`
      -> https://github.com/tokio-rs/tokio/discussions/2983
    * Adapt `AsyncRead`, `AsynWrite` implements in tests
* `README.md`, `tarpc::lib`: Adapt tokio version in docs

# Satisfy clippy

* replace `match`-statements with `matches!(...)`-macro
2020-10-17 17:33:08 -07:00
12 changed files with 55 additions and 55 deletions

View File

@@ -59,7 +59,7 @@ Some other features of tarpc:
Add to your `Cargo.toml` dependencies: Add to your `Cargo.toml` dependencies:
```toml ```toml
tarpc = "0.22.0" tarpc = "0.23.0"
``` ```
The `tarpc::service` attribute expands to a collection of items that form an rpc service. The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -73,7 +73,7 @@ your `Cargo.toml`:
```toml ```toml
futures = "0.3" futures = "0.3"
tokio = "0.2" tokio = "0.3"
``` ```
In the following example, we use an in-process channel for communication between In the following example, we use an in-process channel for communication between

View File

@@ -1,3 +1,9 @@
## 0.23.0 (2020-10-19)
### Breaking Changes
Upgrades tokio to 0.3.
## 0.22.0 (2020-08-02) ## 0.22.0 (2020-08-02)
This release adds some flexibility and consistency to `serde_transport`, with one new feature and This release adds some flexibility and consistency to `serde_transport`, with one new feature and

View File

@@ -13,14 +13,14 @@ readme = "../README.md"
description = "An example server built on tarpc." description = "An example server built on tarpc."
[dependencies] [dependencies]
clap = "2.0" clap = "2.33"
env_logger = "0.8"
futures = "0.3" futures = "0.3"
serde = { version = "1.0" } serde = { version = "1.0" }
tarpc = { version = "0.22", path = "../tarpc", features = ["full"] } tarpc = { version = "0.23", path = "../tarpc", features = ["full"] }
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json"] } tokio-serde = { version = "0.6", features = ["json"] }
tokio-util = { version = "0.3", features = ["codec"] } tokio-util = { version = "0.4", features = ["codec"] }
env_logger = "0.6"
[lib] [lib]
name = "service" name = "service"

View File

@@ -19,15 +19,15 @@ serde1 = []
travis-ci = { repository = "google/tarpc" } travis-ci = { repository = "google/tarpc" }
[dependencies] [dependencies]
syn = { version = "1.0.11", features = ["full"] } proc-macro2 = "1.0"
quote = "1.0.2" quote = "1.0"
proc-macro2 = "1.0.6" syn = { version = "1.0", features = ["full"] }
[lib] [lib]
proc-macro = true proc-macro = true
[dev-dependencies] [dev-dependencies]
assert-type-eq = "0.1.0"
futures = "0.3" futures = "0.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" } tarpc = { path = "../tarpc" }
assert-type-eq = "0.1.0"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "tarpc" name = "tarpc"
version = "0.22.0" version = "0.23.0"
authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"] authors = ["Adam Wright <adam.austin.wright@gmail.com>", "Tim Kuehn <timothy.j.kuehn@gmail.com>"]
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
@@ -29,29 +29,27 @@ travis-ci = { repository = "google/tarpc" }
anyhow = "1.0" anyhow = "1.0"
fnv = "1.0" fnv = "1.0"
futures = "0.3" futures = "0.3"
humantime = "1.0" humantime = "2.0"
log = "0.4" log = "0.4"
pin-project = "0.4.17" pin-project = "1.0"
rand = "0.7" rand = "0.7"
tokio = { version = "0.2", features = ["time"] }
serde = { optional = true, version = "1.0", features = ["derive"] } serde = { optional = true, version = "1.0", features = ["derive"] }
static_assertions = "1.1.0" static_assertions = "1.1.0"
tarpc-plugins = { path = "../plugins", version = "0.8" } tarpc-plugins = { path = "../plugins", version = "0.8" }
tokio-util = { optional = true, version = "0.3" } tokio = { version = "0.3" }
tokio-util = { optional = true, version = "0.4" }
tokio-serde = { optional = true, version = "0.6" } tokio-serde = { optional = true, version = "0.6" }
[dev-dependencies] [dev-dependencies]
assert_matches = "1.0" assert_matches = "1.4"
bincode = "1.3" bincode = "1.3"
bytes = { version = "0.5", features = ["serde"] } bytes = { version = "0.5", features = ["serde"] }
env_logger = "0.6" env_logger = "0.8"
flate2 = "1.0.16" flate2 = "1.0"
futures = "0.3"
humantime = "1.0"
log = "0.4" log = "0.4"
pin-utils = "0.1.0-alpha" pin-utils = "0.1.0-alpha"
serde_bytes = "0.11" serde_bytes = "0.11"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.3", features = ["full"] }
tokio-serde = { version = "0.6", features = ["json", "bincode"] } tokio-serde = { version = "0.6", features = ["json", "bincode"] }
trybuild = "1.0" trybuild = "1.0"

View File

@@ -47,7 +47,7 @@
//! Add to your `Cargo.toml` dependencies: //! Add to your `Cargo.toml` dependencies:
//! //!
//! ```toml //! ```toml
//! tarpc = "0.22.0" //! tarpc = "0.23.0"
//! ``` //! ```
//! //!
//! The `tarpc::service` attribute expands to a collection of items that form an rpc service. //! The `tarpc::service` attribute expands to a collection of items that form an rpc service.
@@ -61,7 +61,7 @@
//! //!
//! ```toml //! ```toml
//! futures = "0.3" //! futures = "0.3"
//! tokio = "0.2" //! tokio = "0.3"
//! ``` //! ```
//! //!
//! In the following example, we use an in-process channel for communication between //! In the following example, we use an in-process channel for communication between

View File

@@ -88,7 +88,7 @@ impl<'a, Req, Resp> Future for Call<'a, Req, Resp> {
let resp = ready!(self.as_mut().project().fut.poll(cx)); let resp = ready!(self.as_mut().project().fut.poll(cx));
Poll::Ready(match resp { Poll::Ready(match resp {
Ok(resp) => resp, Ok(resp) => resp,
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new( Err(tokio::time::error::Elapsed { .. }) => Err(io::Error::new(
io::ErrorKind::TimedOut, io::ErrorKind::TimedOut,
"Client dropped expired request.".to_string(), "Client dropped expired request.".to_string(),
)), )),
@@ -723,7 +723,7 @@ mod tests {
}; };
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc}; use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn dispatch_response_cancels_on_drop() { async fn dispatch_response_cancels_on_drop() {
let (cancellation, mut canceled_requests) = cancellations(); let (cancellation, mut canceled_requests) = cancellations();
let (_, response) = oneshot::channel(); let (_, response) = oneshot::channel();
@@ -738,7 +738,7 @@ mod tests {
assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3)); assert_eq!(canceled_requests.0.try_next().unwrap(), Some(3));
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request() { async fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);
@@ -755,7 +755,7 @@ mod tests {
} }
// Regression test for https://github.com/google/tarpc/issues/220 // Regression test for https://github.com/google/tarpc/issues/220
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_channel_dropped_doesnt_panic() { async fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, mut server_channel) = set_up(); let (mut dispatch, mut channel, mut server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch); let mut dispatch = Pin::new(&mut dispatch);
@@ -776,7 +776,7 @@ mod tests {
dispatch.await.unwrap(); dispatch.await.unwrap();
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_before_sending() { async fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);
@@ -791,7 +791,7 @@ mod tests {
assert!(dispatch.poll_next_request(cx).ready().is_none()); assert!(dispatch.poll_next_request(cx).ready().is_none());
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_future_dropped_is_canceled_after_sending() { async fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref()); let cx = &mut Context::from_waker(&noop_waker_ref());
@@ -813,7 +813,7 @@ mod tests {
assert!(dispatch.project().in_flight_requests.is_empty()); assert!(dispatch.project().in_flight_requests.is_empty());
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn stage_request_response_closed_skipped() { async fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up(); let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch); let dispatch = Pin::new(&mut dispatch);

View File

@@ -565,7 +565,7 @@ where
request_id: self.request_id, request_id: self.request_id,
message: match result { message: match result {
Ok(message) => Ok(message), Ok(message) => Ok(message),
Err(tokio::time::Elapsed { .. }) => { Err(tokio::time::error::Elapsed { .. }) => {
debug!( debug!(
"[{}] Response did not complete before deadline of {}s.", "[{}] Response did not complete before deadline of {}s.",
self.ctx.trace_id(), self.ctx.trace_id(),
@@ -624,11 +624,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
let read = self.as_mut().pump_read(cx)?; let read = self.as_mut().pump_read(cx)?;
let read_closed = if let Poll::Ready(None) = read { let read_closed = matches!(read, Poll::Ready(None));
true
} else {
false
};
match (read, self.as_mut().pump_write(cx, read_closed)?) { match (read, self.as_mut().pump_write(cx, read_closed)?) {
(Poll::Ready(None), Poll::Ready(None)) => { (Poll::Ready(None), Poll::Ready(None)) => {
return Poll::Ready(None); return Poll::Ready(None);

View File

@@ -117,10 +117,7 @@ pub trait PollExt {
impl<T> PollExt for Poll<Option<T>> { impl<T> PollExt for Poll<Option<T>> {
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
match self { matches!(self, Poll::Ready(None))
Poll::Ready(None) => true,
_ => false,
}
} }
} }

View File

@@ -90,7 +90,7 @@ mod tests {
use std::io; use std::io;
#[cfg(feature = "tokio1")] #[cfg(feature = "tokio1")]
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn integration() -> io::Result<()> { async fn integration() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();

View File

@@ -269,9 +269,12 @@ pub mod tcp {
type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>; type Item = io::Result<Transport<TcpStream, Item, SinkItem, Codec>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = let conn: TcpStream =
ready!(Pin::new(&mut self.as_mut().project().listener.incoming()).poll_next(cx)?); ready!(Pin::new(&mut self.as_mut().project().listener).poll_accept(cx)?).0;
Poll::Ready(next.map(|conn| Ok(new(self.config.new_framed(conn), (self.codec_fn)())))) Poll::Ready(Some(Ok(new(
self.config.new_framed(conn),
(self.codec_fn)(),
))))
} }
} }
} }
@@ -286,7 +289,7 @@ mod tests {
io::{self, Cursor}, io::{self, Cursor},
pin::Pin, pin::Pin,
}; };
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_serde::formats::SymmetricalJson; use tokio_serde::formats::SymmetricalJson;
fn ctx() -> Context<'static> { fn ctx() -> Context<'static> {
@@ -301,8 +304,8 @@ mod tests {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<()>> {
AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf) AsyncRead::poll_read(Pin::new(self.0.get_mut()), cx, buf)
} }
} }
@@ -345,8 +348,8 @@ mod tests {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_buf: &mut [u8], _buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<()>> {
unreachable!() unreachable!()
} }
} }

View File

@@ -36,7 +36,7 @@ impl Service for Server {
} }
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn sequential() -> io::Result<()> { async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -59,7 +59,7 @@ async fn sequential() -> io::Result<()> {
} }
#[cfg(feature = "serde1")] #[cfg(feature = "serde1")]
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn serde() -> io::Result<()> { async fn serde() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -83,7 +83,7 @@ async fn serde() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent() -> io::Result<()> { async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -112,7 +112,7 @@ async fn concurrent() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent_join() -> io::Result<()> { async fn concurrent_join() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -142,7 +142,7 @@ async fn concurrent_join() -> io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test(threaded_scheduler)] #[tokio::test]
async fn concurrent_join_all() -> io::Result<()> { async fn concurrent_join_all() -> io::Result<()> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();