Update to Tokio 0.2 and futures 0.3 (#277)

This commit is contained in:
Artem Vorotnikov
2019-11-27 19:53:44 -08:00
committed by Tim
parent 5e19b79aa4
commit 709b966150
15 changed files with 106 additions and 212 deletions

View File

@@ -60,8 +60,8 @@ For this example, in addition to tarpc, also add two other dependencies to
your `Cargo.toml`:
```toml
futures-preview = "0.3.0-alpha.18"
tokio = "0.2.0-alpha.3"
futures = "0.3"
tokio = "0.2"
```
In the following example, we use an in-process channel for communication between

View File

@@ -14,10 +14,10 @@ description = "An example server built on tarpc."
[dependencies]
clap = "2.0"
futures-preview = { version = "0.3.0-alpha.18" }
futures = "0.3"
serde = { version = "1.0" }
tarpc = { version = "0.18", path = "../tarpc", features = ["json-transport", "serde1"] }
tokio = "0.2.0-alpha.3"
tarpc = { version = "0.18", path = "../tarpc", features = ["full"] }
tokio = "0.2"
env_logger = "0.6"
[lib]

View File

@@ -27,6 +27,6 @@ proc-macro2 = "0.4"
proc-macro = true
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.18" }
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" }

View File

@@ -17,8 +17,8 @@ default = []
serde1 = ["tarpc-plugins/serde1", "serde", "serde/derive"]
tokio1 = ["tokio"]
bincode-transport = ["async-bincode", "futures-legacy", "futures-test-preview", "futures-preview/compat", "tokio-io", "tokio-tcp"]
json-transport = ["tokio/codec", "tokio/io", "tokio/net", "tokio-net", "tokio-serde-json"]
bincode-transport = ["async-bincode", "futures-legacy", "futures-test", "futures/compat", "tokio-io", "tokio-tcp"]
json-transport = ["tokio/net", "tokio-serde/json", "tokio-util/codec"]
full = ["serde1", "tokio1", "bincode-transport", "json-transport"]
@@ -27,35 +27,34 @@ travis-ci = { repository = "google/tarpc" }
[dependencies]
fnv = "1.0"
futures-preview = { version = "0.3.0-alpha.18" }
futures = "0.3"
humantime = "1.0"
log = "0.4"
pin-project = "0.4"
raii-counter = "0.2"
rand = "0.7"
tokio-timer = "0.3.0-alpha"
serde = { optional = true, version = "1.0", features = ["derive"] }
tokio = { optional = true, version = "0.2.0-alpha" }
tokio = { optional = true, version = "0.2", features = ["time"] }
tokio-util = { optional = true, version = "0.2" }
tarpc-plugins = { path = "../plugins" }
async-bincode = { optional = true, version = "0.4" }
futures-legacy = { optional = true, version = "0.1", package = "futures" }
futures-test-preview = { optional = true, version = "0.3.0-alpha" }
futures-test = { optional = true, version = "0.3" }
tokio-io = { optional = true, version = "0.1" }
tokio-tcp = { optional = true, version = "0.1" }
tokio-net = { optional = true, version = "0.2.0-alpha" }
tokio-serde-json = { optional = true, version = "0.3" }
tokio-serde = { optional = true, version = "0.5" }
[dev-dependencies]
assert_matches = "1.0"
bytes = { version = "0.4", features = ["serde"] }
bytes = { version = "0.5", features = ["serde"] }
env_logger = "0.6"
futures-preview = { version = "0.3.0-alpha" }
futures = "0.3"
humantime = "1.0"
log = "0.4"
pin-utils = "0.1.0-alpha"
tokio = "0.2.0-alpha"
tokio = { version = "0.2", features = ["full"] }
[[example]]
name = "server_calling_server"
@@ -63,7 +62,7 @@ required-features = ["serde1"]
[[example]]
name = "readme"
required-features = ["serde1"]
required-features = ["serde1", "tokio1"]
[[example]]
name = "pubsub"

View File

@@ -9,17 +9,10 @@
#![deny(missing_docs, missing_debug_implementations)]
use async_bincode::{AsyncBincodeStream, AsyncDestination};
use futures::{compat::*, prelude::*, ready};
use futures::{compat::*, prelude::*, ready, task::*};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
error::Error,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream};
@@ -183,13 +176,9 @@ where
mod tests {
use super::Transport;
use assert_matches::assert_matches;
use futures::{Sink, Stream};
use futures_test::task::noop_waker_ref;
use futures::{task::*, Sink, Stream};
use pin_utils::pin_mut;
use std::{
io::Cursor,
task::{Context, Poll},
};
use std::io::Cursor;
fn ctx() -> Context<'static> {
Context::from_waker(&noop_waker_ref())

View File

@@ -8,28 +8,24 @@
#![deny(missing_docs)]
use futures::{prelude::*, ready};
use futures::{prelude::*, ready, task::*};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
error::Error,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::codec::{length_delimited::LengthDelimitedCodec, Framed};
use std::{error::Error, io, marker::PhantomData, net::SocketAddr, pin::Pin};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_net::ToSocketAddrs;
use tokio_serde_json::*;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_serde::{formats::*, *};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, Framed};
/// A transport that serializes to, and deserializes from, a [`TcpStream`].
#[pin_project]
pub struct Transport<S, Item, SinkItem> {
#[pin]
inner: ReadJson<WriteJson<Framed<S, LengthDelimitedCodec>, SinkItem>, Item>,
inner: FramedRead<
FramedWrite<Framed<S, LengthDelimitedCodec>, SinkItem, Json<SinkItem>>,
Item,
Json<Item>,
>,
}
impl<S, Item, SinkItem> Stream for Transport<S, Item, SinkItem>
@@ -115,10 +111,13 @@ impl<S: AsyncWrite + AsyncRead, Item: serde::de::DeserializeOwned, SinkItem: Ser
{
fn from(inner: S) -> Self {
Transport {
inner: ReadJson::new(WriteJson::new(Framed::new(
inner,
LengthDelimitedCodec::new(),
))),
inner: FramedRead::new(
FramedWrite::new(
Framed::new(inner, LengthDelimitedCodec::new()),
Json::default(),
),
Json::default(),
),
}
}
}
@@ -142,23 +141,18 @@ where
{
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
let incoming = Box::pin(listener.incoming());
Ok(Incoming {
incoming,
listener,
local_addr,
ghost: PhantomData,
})
}
trait IncomingTrait: Stream<Item = io::Result<TcpStream>> + std::fmt::Debug + Send {}
impl<T: Stream<Item = io::Result<TcpStream>> + std::fmt::Debug + Send> IncomingTrait for T {}
/// A [`TcpListener`] that wraps connections in JSON transports.
#[pin_project]
#[derive(Debug)]
pub struct Incoming<Item, SinkItem> {
#[pin]
incoming: Pin<Box<dyn IncomingTrait>>,
listener: TcpListener,
local_addr: SocketAddr,
ghost: PhantomData<(Item, SinkItem)>,
}
@@ -178,7 +172,7 @@ where
type Item = io::Result<Transport<TcpStream, Item, SinkItem>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = ready!(self.project().incoming.poll_next(cx)?);
let next = ready!(Pin::new(&mut self.project().listener.incoming()).poll_next(cx)?);
Poll::Ready(next.map(|conn| Ok(new(conn))))
}
}
@@ -187,13 +181,11 @@ where
mod tests {
use super::Transport;
use assert_matches::assert_matches;
use futures::task::noop_waker_ref;
use futures::{Sink, Stream};
use futures::{task::*, Sink, Stream};
use pin_utils::pin_mut;
use std::{
io::{self, Cursor},
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};

View File

@@ -63,8 +63,8 @@
//! your `Cargo.toml`:
//!
//! ```toml
//! futures-preview = "0.3.0-alpha.18"
//! tokio = "0.2.0-alpha.3"
//! futures = "0.3"
//! tokio = "0.2"
//! ```
//!
//! In the following example, we use an in-process channel for communication between

View File

@@ -16,8 +16,7 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::Context,
Poll,
task::*,
};
use log::{debug, info, trace};
use pin_project::{pin_project, pinned_drop};
@@ -29,7 +28,6 @@ use std::{
Arc,
},
};
use tokio_timer::{timeout, Timeout};
use super::{Config, NewClient};
@@ -118,7 +116,7 @@ impl<Req, Resp> Channel<Req, Resp> {
response_completion,
})),
DispatchResponse {
response: Timeout::new(response, timeout),
response: tokio::time::timeout(timeout, response),
complete: false,
request_id,
cancellation,
@@ -142,7 +140,7 @@ impl<Req, Resp> Channel<Req, Resp> {
#[pin_project(PinnedDrop)]
#[derive(Debug)]
struct DispatchResponse<Resp> {
response: Timeout<oneshot::Receiver<Response<Resp>>>,
response: tokio::time::Timeout<oneshot::Receiver<Response<Resp>>>,
ctx: context::Context,
complete: bool,
cancellation: RequestCancellation,
@@ -168,7 +166,7 @@ impl<Resp> Future for DispatchResponse<Resp> {
}
}
}
Err(timeout::Elapsed { .. }) => Err(io::Error::new(
Err(tokio::time::Elapsed { .. }) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client dropped expired request.".to_string(),
)),
@@ -716,48 +714,35 @@ mod tests {
use futures::{
channel::{mpsc, oneshot},
prelude::*,
task::Context,
Poll,
task::*,
};
use futures_test::task::noop_waker_ref;
use std::time::Duration;
use std::{pin::Pin, sync::atomic::AtomicU64, sync::Arc};
use tokio::runtime::current_thread;
use tokio_timer::Timeout;
#[test]
fn dispatch_response_cancels_on_timeout() {
#[tokio::test(threaded_scheduler)]
async fn dispatch_response_cancels_on_timeout() {
let (_response_completion, response) = oneshot::channel();
let (cancellation, mut canceled_requests) = cancellations();
let resp = DispatchResponse::<u64> {
// Timeout in the past should cause resp to error out when polled.
response: Timeout::new(response, Duration::from_secs(0)),
response: tokio::time::timeout(Duration::from_secs(0), response),
complete: false,
request_id: 3,
cancellation,
ctx: context::current(),
};
{
pin_utils::pin_mut!(resp);
let timer = tokio_timer::Timer::default();
let handle = timer.handle();
let _guard = tokio_timer::set_default(&handle);
let _ = resp
.as_mut()
.poll(&mut Context::from_waker(&noop_waker_ref()));
// End of block should cause resp.drop() to run, which should send a cancel message.
}
let _ = futures::poll!(resp);
// resp's drop() is run, which should send a cancel message.
assert!(canceled_requests.0.try_next().unwrap() == Some(3));
}
#[test]
fn stage_request() {
#[tokio::test(threaded_scheduler)]
async fn stage_request() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
let _resp = send_request(&mut channel, "hi");
let _resp = send_request(&mut channel, "hi").await;
let req = dispatch.poll_next_request(cx).ready();
assert!(req.is_some());
@@ -767,18 +752,14 @@ mod tests {
assert_eq!(req.request, "hi".to_string());
}
fn block_on<F: Future>(f: F) -> F::Output {
current_thread::Runtime::new().unwrap().block_on(f)
}
// Regression test for https://github.com/google/tarpc/issues/220
#[test]
fn stage_request_channel_dropped_doesnt_panic() {
#[tokio::test(threaded_scheduler)]
async fn stage_request_channel_dropped_doesnt_panic() {
let (mut dispatch, mut channel, mut server_channel) = set_up();
let mut dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
let _ = send_request(&mut channel, "hi");
let _ = send_request(&mut channel, "hi").await;
drop(channel);
assert!(dispatch.as_mut().poll(cx).is_ready());
@@ -789,17 +770,18 @@ mod tests {
message: Ok("hello".into()),
_non_exhaustive: (),
},
);
block_on(dispatch).unwrap();
)
.await;
dispatch.await.unwrap();
}
#[test]
fn stage_request_response_future_dropped_is_canceled_before_sending() {
#[tokio::test(threaded_scheduler)]
async fn stage_request_response_future_dropped_is_canceled_before_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
let _ = send_request(&mut channel, "hi");
let _ = send_request(&mut channel, "hi").await;
// Drop the channel so polling returns none if no requests are currently ready.
drop(channel);
@@ -808,13 +790,13 @@ mod tests {
assert!(dispatch.poll_next_request(cx).ready().is_none());
}
#[test]
fn stage_request_response_future_dropped_is_canceled_after_sending() {
#[tokio::test(threaded_scheduler)]
async fn stage_request_response_future_dropped_is_canceled_after_sending() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let cx = &mut Context::from_waker(&noop_waker_ref());
let mut dispatch = Pin::new(&mut dispatch);
let req = send_request(&mut channel, "hi");
let req = send_request(&mut channel, "hi").await;
assert!(dispatch.as_mut().pump_write(cx).ready().is_some());
assert!(!dispatch.as_mut().project().in_flight_requests.is_empty());
@@ -830,8 +812,8 @@ mod tests {
assert!(dispatch.project().in_flight_requests.is_empty());
}
#[test]
fn stage_request_response_closed_skipped() {
#[tokio::test(threaded_scheduler)]
async fn stage_request_response_closed_skipped() {
let (mut dispatch, mut channel, _server_channel) = set_up();
let dispatch = Pin::new(&mut dispatch);
let cx = &mut Context::from_waker(&noop_waker_ref());
@@ -839,7 +821,7 @@ mod tests {
// Test that a request future that's closed its receiver but not yet canceled its request --
// i.e. still in `drop fn` -- will cause the request to not be added to the in-flight request
// map.
let mut resp = send_request(&mut channel, "hi");
let mut resp = send_request(&mut channel, "hi").await;
resp.response.get_mut().close();
assert!(dispatch.poll_next_request(cx).is_pending());
@@ -874,18 +856,21 @@ mod tests {
(dispatch, channel, server_channel)
}
fn send_request(
async fn send_request(
channel: &mut Channel<String, String>,
request: &str,
) -> DispatchResponse<String> {
block_on(channel.send(context::current(), request.to_string())).unwrap()
channel
.send(context::current(), request.to_string())
.await
.unwrap()
}
fn send_response(
async fn send_response(
channel: &mut UnboundedChannel<ClientMessage<String>, Response<String>>,
response: Response<String>,
) {
block_on(channel.send(response)).unwrap();
channel.send(response).await.unwrap();
}
trait PollTest {

View File

@@ -32,7 +32,7 @@ pub(crate) mod util;
pub use crate::{client::Client, server::Server, trace, transport::sealed::Transport};
use futures::task::Poll;
use futures::task::*;
use std::{io, time::SystemTime};
/// A message from a client to a server.

View File

@@ -9,14 +9,7 @@ use crate::{
util::Compact,
};
use fnv::FnvHashMap;
use futures::{
channel::mpsc,
future::AbortRegistration,
prelude::*,
ready,
stream::Fuse,
task::{Context, Poll},
};
use futures::{channel::mpsc, future::AbortRegistration, prelude::*, ready, stream::Fuse, task::*};
use log::{debug, info, trace};
use pin_project::pin_project;
use raii_counter::{Counter, WeakCounter};
@@ -301,7 +294,7 @@ where
#[cfg(test)]
fn ctx() -> Context<'static> {
use futures_test::task::noop_waker_ref;
use futures::task::*;
Context::from_waker(&noop_waker_ref())
}

View File

@@ -17,13 +17,13 @@ use futures::{
prelude::*,
ready,
stream::Fuse,
task::{Context, Poll},
task::*,
};
use humantime::format_rfc3339;
use log::{debug, trace};
use pin_project::pin_project;
use std::{fmt, hash::Hash, io, marker::PhantomData, pin::Pin, time::SystemTime};
use tokio_timer::{timeout, Timeout};
use tokio::time::Timeout;
mod filter;
#[cfg(test)]
@@ -487,7 +487,7 @@ where
request_id,
ctx,
deadline,
f: Timeout::new(response, timeout),
f: tokio::time::timeout(timeout, response),
response: None,
response_tx: self.as_mut().project().responses_tx.clone(),
};
@@ -554,7 +554,7 @@ where
request_id: self.request_id,
message: match result {
Ok(message) => Ok(message),
Err(timeout::Elapsed { .. }) => {
Err(tokio::time::Elapsed { .. }) => {
debug!(
"[{}] Response did not complete before deadline of {}s.",
self.ctx.trace_id(),

View File

@@ -1,14 +1,15 @@
use crate::server::{Channel, Config};
use crate::{context, Request, Response};
use fnv::FnvHashSet;
use futures::future::{AbortHandle, AbortRegistration};
use futures::{Sink, Stream};
use futures_test::task::noop_waker_ref;
use futures::{
future::{AbortHandle, AbortRegistration},
task::*,
Sink, Stream,
};
use pin_project::pin_project;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::SystemTime;
#[pin_project]
@@ -27,8 +28,8 @@ where
{
type Item = In;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(self.project().stream.pop_front())
}
}
@@ -98,10 +99,10 @@ impl<Req, Resp> FakeChannel<io::Result<Request<Req>>, Response<Resp>> {
impl FakeChannel<(), ()> {
pub fn default<Req, Resp>() -> FakeChannel<io::Result<Request<Req>>, Response<Resp>> {
FakeChannel {
stream: VecDeque::default(),
sink: VecDeque::default(),
config: Config::default(),
in_flight_requests: FnvHashSet::default(),
stream: Default::default(),
sink: Default::default(),
config: Default::default(),
in_flight_requests: Default::default(),
}
}
}

View File

@@ -1,11 +1,6 @@
use super::{Channel, Config};
use crate::{Response, ServerError};
use futures::{
future::AbortRegistration,
prelude::*,
ready,
task::{Context, Poll},
};
use futures::{future::AbortRegistration, prelude::*, ready, task::*};
use log::debug;
use pin_project::pin_project;
use std::{io, pin::Pin};

View File

@@ -7,7 +7,7 @@
//! Transports backed by in-memory channels.
use crate::PollIo;
use futures::{channel::mpsc, task::Context, Poll, Sink, Stream};
use futures::{channel::mpsc, task::*, Sink, Stream};
use pin_project::pin_project;
use std::io;
use std::pin::Pin;
@@ -89,7 +89,8 @@ mod tests {
use log::trace;
use std::io;
#[tokio::test]
#[cfg(feature = "tokio1")]
#[tokio::test(threaded_scheduler)]
async fn integration() -> io::Result<()> {
let _ = env_logger::try_init();

View File

@@ -3,9 +3,9 @@ use futures::{
future::{ready, Ready},
prelude::*,
};
use std::{io, rc::Rc};
use std::io;
use tarpc::{
client::{self, NewClient},
client::{self},
context, json_transport,
server::{self, BaseChannel, Channel, Handler},
transport::channel,
@@ -34,7 +34,7 @@ impl Service for Server {
}
}
#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn sequential() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -57,7 +57,7 @@ async fn sequential() -> io::Result<()> {
}
#[cfg(feature = "serde1")]
#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn serde() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -81,7 +81,7 @@ async fn serde() -> io::Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(threaded_scheduler)]
async fn concurrent() -> io::Result<()> {
let _ = env_logger::try_init();
@@ -109,64 +109,3 @@ async fn concurrent() -> io::Result<()> {
Ok(())
}
#[tarpc::service(derive_serde = false)]
trait InMemory {
async fn strong_count(rc: Rc<()>) -> usize;
async fn weak_count(rc: Rc<()>) -> usize;
}
impl InMemory for () {
type StrongCountFut = Ready<usize>;
fn strong_count(self, _: context::Context, rc: Rc<()>) -> Self::StrongCountFut {
ready(Rc::strong_count(&rc))
}
type WeakCountFut = Ready<usize>;
fn weak_count(self, _: context::Context, rc: Rc<()>) -> Self::WeakCountFut {
ready(Rc::weak_count(&rc))
}
}
#[test]
fn in_memory_single_threaded() -> io::Result<()> {
use log::warn;
let _ = env_logger::try_init();
let mut runtime = tokio::runtime::current_thread::Runtime::new()?;
let (tx, rx) = channel::unbounded();
let server = BaseChannel::new(server::Config::default(), rx)
.respond_with(().serve())
.try_for_each(|r| async move { Ok(r.await) });
runtime.spawn(async {
if let Err(e) = server.await {
warn!("Error while running server: {}", e);
}
});
let NewClient {
mut client,
dispatch,
} = InMemoryClient::new(client::Config::default(), tx);
runtime.spawn(async move {
if let Err(e) = dispatch.await {
warn!("Error while running client dispatch: {}", e)
}
});
let rc = Rc::new(());
assert_matches!(
runtime.block_on(client.strong_count(context::current(), rc.clone())),
Ok(2)
);
let _weak = Rc::downgrade(&rc);
assert_matches!(
runtime.block_on(client.weak_count(context::current(), rc)),
Ok(1)
);
Ok(())
}